datafusion_catalog/session.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use async_trait::async_trait;
19use datafusion_common::config::ConfigOptions;
20use datafusion_common::{DFSchema, Result};
21use datafusion_execution::config::SessionConfig;
22use datafusion_execution::runtime_env::RuntimeEnv;
23use datafusion_execution::TaskContext;
24use datafusion_expr::execution_props::ExecutionProps;
25use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
26use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
27use parking_lot::{Mutex, RwLock};
28use std::any::Any;
29use std::collections::HashMap;
30use std::sync::{Arc, Weak};
31
32/// Interface for accessing [`SessionState`] from the catalog.
33///
34/// This trait provides access to the information needed to plan and execute
35/// queries, such as configuration, functions, and runtime environment. See the
36/// documentation on [`SessionState`] for more information.
37///
38/// Historically, the `SessionState` struct was passed directly to catalog
39/// traits such as [`TableProvider`], which required a direct dependency on the
40/// DataFusion core. The interface required is now defined by this trait. See
41/// [#10782] for more details.
42///
43/// [#10782]: https://github.com/apache/datafusion/issues/10782
44///
45/// # Migration from `SessionState`
46///
47/// Using trait methods is preferred, as the implementation may change in future
48/// versions. However, you can downcast a `Session` to a `SessionState` as shown
49/// in the example below. If you find yourself needing to do this, please open
50/// an issue on the DataFusion repository so we can extend the trait to provide
51/// the required information.
52///
53/// ```
54/// # use datafusion_catalog::Session;
55/// # use datafusion_common::{Result, exec_datafusion_err};
56/// # struct SessionState {}
57/// // Given a `Session` reference, get the concrete `SessionState` reference
58/// // Note: this may stop working in future versions,
59/// fn session_state_from_session(session: &dyn Session) -> Result<&SessionState> {
60/// session.as_any()
61/// .downcast_ref::<SessionState>()
62/// .ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState"))
63/// }
64/// ```
65///
66/// [`SessionState`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
67/// [`TableProvider`]: crate::TableProvider
68#[async_trait]
69pub trait Session: Send + Sync {
70 /// Return the session ID
71 fn session_id(&self) -> &str;
72
73 /// Return the [`SessionConfig`]
74 fn config(&self) -> &SessionConfig;
75
76 /// return the [`ConfigOptions`]
77 fn config_options(&self) -> &ConfigOptions {
78 self.config().options()
79 }
80
81 /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
82 ///
83 /// Note: this will optimize the provided plan first.
84 ///
85 /// This function will error for [`LogicalPlan`]s such as catalog DDL like
86 /// `CREATE TABLE`, which do not have corresponding physical plans and must
87 /// be handled by another layer, typically the `SessionContext`.
88 async fn create_physical_plan(
89 &self,
90 logical_plan: &LogicalPlan,
91 ) -> Result<Arc<dyn ExecutionPlan>>;
92
93 /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
94 /// coercion, and function rewrites.
95 ///
96 /// Note: The expression is not simplified or otherwise optimized: `a = 1
97 /// + 2` will not be simplified to `a = 3` as this is a more involved process.
98 /// See the [expr_api] example for how to simplify expressions.
99 ///
100 /// [expr_api]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
101 fn create_physical_expr(
102 &self,
103 expr: Expr,
104 df_schema: &DFSchema,
105 ) -> Result<Arc<dyn PhysicalExpr>>;
106
107 /// Return reference to scalar_functions
108 fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>>;
109
110 /// Return reference to aggregate_functions
111 fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>>;
112
113 /// Return reference to window functions
114 fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>>;
115
116 /// Return the runtime env
117 fn runtime_env(&self) -> &Arc<RuntimeEnv>;
118
119 /// Return the execution properties
120 fn execution_props(&self) -> &ExecutionProps;
121
122 fn as_any(&self) -> &dyn Any;
123}
124
125/// Create a new task context instance from Session
126impl From<&dyn Session> for TaskContext {
127 fn from(state: &dyn Session) -> Self {
128 let task_id = None;
129 TaskContext::new(
130 task_id,
131 state.session_id().to_string(),
132 state.config().clone(),
133 state.scalar_functions().clone(),
134 state.aggregate_functions().clone(),
135 state.window_functions().clone(),
136 state.runtime_env().clone(),
137 )
138 }
139}
140type SessionRefLock = Arc<Mutex<Option<Weak<RwLock<dyn Session>>>>>;
141/// The state store that stores the reference of the runtime session state.
142#[derive(Debug)]
143pub struct SessionStore {
144 session: SessionRefLock,
145}
146
147impl SessionStore {
148 /// Create a new [SessionStore]
149 pub fn new() -> Self {
150 Self {
151 session: Arc::new(Mutex::new(None)),
152 }
153 }
154
155 /// Set the session state of the store
156 pub fn with_state(&self, state: Weak<RwLock<dyn Session>>) {
157 let mut lock = self.session.lock();
158 *lock = Some(state);
159 }
160
161 /// Get the current session of the store
162 pub fn get_session(&self) -> Weak<RwLock<dyn Session>> {
163 self.session.lock().clone().unwrap()
164 }
165}
166
167impl Default for SessionStore {
168 fn default() -> Self {
169 Self::new()
170 }
171}