Skip to main content

datafusion_session/
session.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use async_trait::async_trait;
19use datafusion_common::config::{ConfigOptions, TableOptions};
20use datafusion_common::{DFSchema, Result};
21use datafusion_execution::TaskContext;
22use datafusion_execution::config::SessionConfig;
23use datafusion_execution::runtime_env::RuntimeEnv;
24use datafusion_expr::execution_props::ExecutionProps;
25use datafusion_expr::registry::ExtensionTypeRegistryRef;
26use datafusion_expr::{
27    AggregateUDF, Expr, HigherOrderUDF, LogicalPlan, ScalarUDF, WindowUDF,
28};
29use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
30use parking_lot::{Mutex, RwLock};
31use std::any::Any;
32use std::collections::HashMap;
33use std::sync::{Arc, Weak};
34
35/// Interface for accessing [`SessionState`] from the catalog and data source.
36///
37/// This trait provides access to the information needed to plan and execute
38/// queries, such as configuration, functions, and runtime environment. See the
39/// documentation on [`SessionState`] for more information.
40///
41/// Historically, the `SessionState` struct was passed directly to catalog
42/// traits such as [`TableProvider`], which required a direct dependency on the
43/// DataFusion core. The interface required is now defined by this trait. See
44/// [#10782] for more details.
45///
46/// [#10782]: https://github.com/apache/datafusion/issues/10782
47///
48/// # Migration from `SessionState`
49///
50/// Using trait methods is preferred, as the implementation may change in future
51/// versions. However, you can downcast a `Session` to a `SessionState` as shown
52/// in the example below. If you find yourself needing to do this, please open
53/// an issue on the DataFusion repository so we can extend the trait to provide
54/// the required information.
55///
56/// ```
57/// # use datafusion_session::Session;
58/// # use datafusion_common::{Result, exec_datafusion_err};
59/// # struct SessionState {}
60/// // Given a `Session` reference, get the concrete `SessionState` reference
61/// // Note: this may stop working in future versions,
62/// fn session_state_from_session(session: &dyn Session) -> Result<&SessionState> {
63///     session
64///         .as_any()
65///         .downcast_ref::<SessionState>()
66///         .ok_or_else(|| {
67///             exec_datafusion_err!("Failed to downcast Session to SessionState")
68///         })
69/// }
70/// ```
71///
72/// [`SessionState`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
73/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
74#[async_trait]
75pub trait Session: Send + Sync {
76    /// Return the session ID
77    fn session_id(&self) -> &str;
78
79    /// Return the [`SessionConfig`]
80    fn config(&self) -> &SessionConfig;
81
82    /// return the [`ConfigOptions`]
83    fn config_options(&self) -> &ConfigOptions {
84        self.config().options()
85    }
86
87    /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
88    ///
89    /// Note: this will optimize the provided plan first.
90    ///
91    /// This function will error for [`LogicalPlan`]s such as catalog DDL like
92    /// `CREATE TABLE`, which do not have corresponding physical plans and must
93    /// be handled by another layer, typically the `SessionContext`.
94    async fn create_physical_plan(
95        &self,
96        logical_plan: &LogicalPlan,
97    ) -> Result<Arc<dyn ExecutionPlan>>;
98
99    /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
100    /// coercion, and function rewrites.
101    ///
102    /// Note: The expression is not simplified or otherwise optimized:  `a = 1
103    /// + 2` will not be simplified to `a = 3` as this is a more involved process.
104    /// See the [expr_api] example for how to simplify expressions.
105    ///
106    /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/query_planning/expr_api.rs
107    fn create_physical_expr(
108        &self,
109        expr: Expr,
110        df_schema: &DFSchema,
111    ) -> Result<Arc<dyn PhysicalExpr>>;
112
113    /// Return reference to scalar_functions
114    fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>>;
115
116    /// Return reference to higher_order_functions
117    fn higher_order_functions(&self) -> &HashMap<String, Arc<HigherOrderUDF>>;
118
119    /// Return reference to aggregate_functions
120    fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>>;
121
122    /// Return reference to window functions
123    fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>>;
124
125    /// Return a reference to the extension type registry
126    fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef;
127
128    /// Return the runtime env
129    fn runtime_env(&self) -> &Arc<RuntimeEnv>;
130
131    /// Return the execution properties
132    fn execution_props(&self) -> &ExecutionProps;
133
134    fn as_any(&self) -> &dyn Any;
135
136    /// Return the table options
137    fn table_options(&self) -> &TableOptions;
138
139    /// return the TableOptions options with its extensions
140    fn default_table_options(&self) -> TableOptions {
141        self.table_options()
142            .combine_with_session_config(self.config_options())
143    }
144
145    /// Returns a mutable reference to [`TableOptions`]
146    fn table_options_mut(&mut self) -> &mut TableOptions;
147
148    /// Get a new TaskContext to run in this session
149    fn task_ctx(&self) -> Arc<TaskContext>;
150}
151
152/// Create a new task context instance from Session
153impl From<&dyn Session> for TaskContext {
154    fn from(state: &dyn Session) -> Self {
155        let task_id = None;
156        TaskContext::new(
157            task_id,
158            state.session_id().to_string(),
159            state.config().clone(),
160            state.scalar_functions().clone(),
161            state.higher_order_functions().clone(),
162            state.aggregate_functions().clone(),
163            state.window_functions().clone(),
164            Arc::clone(state.runtime_env()),
165        )
166    }
167}
168type SessionRefLock = Arc<Mutex<Option<Weak<RwLock<dyn Session>>>>>;
169/// The state store that stores the reference of the runtime session state.
170#[derive(Debug)]
171pub struct SessionStore {
172    session: SessionRefLock,
173}
174
175impl SessionStore {
176    /// Create a new [SessionStore]
177    pub fn new() -> Self {
178        Self {
179            session: Arc::new(Mutex::new(None)),
180        }
181    }
182
183    /// Set the session state of the store
184    pub fn with_state(&self, state: Weak<RwLock<dyn Session>>) {
185        let mut lock = self.session.lock();
186        *lock = Some(state);
187    }
188
189    /// Get the current session of the store
190    pub fn get_session(&self) -> Weak<RwLock<dyn Session>> {
191        self.session.lock().clone().unwrap()
192    }
193}
194
195impl Default for SessionStore {
196    fn default() -> Self {
197        Self::new()
198    }
199}