Skip to main content

datafusion_expr/
execution_props.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::var_provider::{VarProvider, VarType};
19use chrono::{DateTime, Utc};
20use datafusion_common::HashMap;
21use datafusion_common::ScalarValue;
22use datafusion_common::TableReference;
23use datafusion_common::alias::AliasGenerator;
24use datafusion_common::config::ConfigOptions;
25use datafusion_common::{Result, internal_err};
26use std::fmt;
27use std::hash::{Hash, Hasher};
28use std::sync::{Arc, Mutex};
29
30/// Holds properties and scratch state used while optimizing a [`LogicalPlan`]
31/// and translating it into an executable physical plan, such as the statement
32/// start time used during simplification.
33///
34/// An [`ExecutionProps`] is created each time a `LogicalPlan` is
35/// prepared for execution (optimized). If the same plan is optimized
36/// multiple times, a new `ExecutionProps` is created each time.
37///
38/// It is important that this structure be cheap to create as it is
39/// done so during predicate pruning and expression simplification
40///
41/// # Relationship with [`TaskContext`]
42///
43/// [`ExecutionProps`] is intentionally distinct from [`TaskContext`].
44/// It is used while optimizing a logical plan and constructing physical
45/// expressions and physical plans, before physical operators are run.
46///
47/// [`TaskContext`] is the runtime context passed to physical operators during
48/// physical-plan execution.
49///
50/// Keeping these structures separate avoids threading execution/runtime state
51/// through planning APIs, and avoids making execution depend on planner-only
52/// scratch state.
53///
54/// [`TaskContext`]: https://docs.rs/datafusion/latest/datafusion/execution/struct.TaskContext.html
55/// [`LogicalPlan`]: crate::LogicalPlan
56#[derive(Clone, Debug)]
57pub struct ExecutionProps {
58    /// The time at which the query execution started. If `None`,
59    /// functions like `now()` will not be simplified during optimization.
60    pub query_execution_start_time: Option<DateTime<Utc>>,
61    /// Alias generator used by subquery optimizer rules
62    pub alias_generator: Arc<AliasGenerator>,
63    /// Snapshot of config options when the query started
64    pub config_options: Option<Arc<ConfigOptions>>,
65    /// Providers for scalar variables
66    pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
67    /// Maps each logical `Subquery` to its index in `subquery_results`.
68    /// Populated by the physical planner before calling `create_physical_expr`.
69    pub subquery_indexes: HashMap<crate::logical_plan::Subquery, SubqueryIndex>,
70    /// Shared results container for uncorrelated scalar subquery values.
71    /// Populated at execution time by `ScalarSubqueryExec`.
72    pub subquery_results: ScalarSubqueryResults,
73    /// Maps each lambda variable name to its lambda qualifier generated
74    /// during physical planning. Populated by the physical planner for
75    /// each lambda before calling `create_physical_expr`.
76    pub lambda_variable_qualifier: HashMap<String, TableReference>,
77}
78
79impl Default for ExecutionProps {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl ExecutionProps {
86    /// Creates a new execution props
87    pub fn new() -> Self {
88        ExecutionProps {
89            query_execution_start_time: None,
90            alias_generator: Arc::new(AliasGenerator::new()),
91            config_options: None,
92            var_providers: None,
93            subquery_indexes: HashMap::new(),
94            subquery_results: ScalarSubqueryResults::default(),
95            lambda_variable_qualifier: HashMap::new(),
96        }
97    }
98
99    /// Set the query execution start time to use
100    pub fn with_query_execution_start_time(
101        mut self,
102        query_execution_start_time: DateTime<Utc>,
103    ) -> Self {
104        self.query_execution_start_time = Some(query_execution_start_time);
105        self
106    }
107
108    #[deprecated(since = "50.0.0", note = "Use mark_start_execution instead")]
109    pub fn start_execution(&mut self) -> &Self {
110        let default_config = Arc::new(ConfigOptions::default());
111        self.mark_start_execution(default_config)
112    }
113
114    /// Marks the execution of query started timestamp.
115    /// This also instantiates a new alias generator.
116    pub fn mark_start_execution(&mut self, config_options: Arc<ConfigOptions>) -> &Self {
117        self.query_execution_start_time = Some(Utc::now());
118        self.alias_generator = Arc::new(AliasGenerator::new());
119        self.config_options = Some(config_options);
120        &*self
121    }
122
123    /// Registers a variable provider, returning the existing provider, if any
124    pub fn add_var_provider(
125        &mut self,
126        var_type: VarType,
127        provider: Arc<dyn VarProvider + Send + Sync>,
128    ) -> Option<Arc<dyn VarProvider + Send + Sync>> {
129        let mut var_providers = self.var_providers.take().unwrap_or_default();
130
131        let old_provider = var_providers.insert(var_type, provider);
132
133        self.var_providers = Some(var_providers);
134
135        old_provider
136    }
137
138    /// Returns the provider for the `var_type`, if any
139    #[expect(clippy::needless_pass_by_value)]
140    pub fn get_var_provider(
141        &self,
142        var_type: VarType,
143    ) -> Option<Arc<dyn VarProvider + Send + Sync>> {
144        self.var_providers
145            .as_ref()
146            .and_then(|var_providers| var_providers.get(&var_type).cloned())
147    }
148
149    /// Returns the configuration properties for this execution
150    /// if the execution has started
151    pub fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
152        self.config_options.as_ref()
153    }
154
155    /// Adds a mapping for each variable to the given qualifier. Existing
156    /// variables with conflicting names get's shadowed
157    pub fn with_qualified_lambda_variables(
158        mut self,
159        qualifier: &TableReference,
160        variables: &[String],
161    ) -> Self {
162        for var in variables {
163            self.lambda_variable_qualifier
164                .entry_ref(var)
165                .insert(qualifier.clone());
166        }
167
168        self
169    }
170}
171
172/// Index of a scalar subquery within a [`ScalarSubqueryResults`] container.
173#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
174pub struct SubqueryIndex(usize);
175
176impl SubqueryIndex {
177    /// Creates a new subquery index.
178    pub const fn new(index: usize) -> Self {
179        Self(index)
180    }
181
182    /// Returns the underlying slot index.
183    pub const fn as_usize(self) -> usize {
184        self.0
185    }
186}
187
188/// Shared results container for uncorrelated scalar subqueries.
189///
190/// Each entry corresponds to one scalar subquery, identified by its index.
191/// Each slot is populated at execution time by `ScalarSubqueryExec`, read by
192/// `ScalarSubqueryExpr` instances that share this container, and cleared when
193/// the plan is reset for re-execution.
194#[derive(Clone, Default)]
195pub struct ScalarSubqueryResults {
196    slots: Arc<Vec<Mutex<Option<ScalarValue>>>>,
197}
198
199impl ScalarSubqueryResults {
200    /// Creates a new shared results container with `n` empty slots.
201    pub fn new(n: usize) -> Self {
202        Self {
203            slots: Arc::new((0..n).map(|_| Mutex::new(None)).collect()),
204        }
205    }
206
207    /// Returns the scalar value stored at `index`, if it has been populated.
208    pub fn get(&self, index: SubqueryIndex) -> Option<ScalarValue> {
209        let slot = self.slots.get(index.as_usize())?;
210        slot.lock().unwrap().clone()
211    }
212
213    /// Stores `value` in the slot at `index`.
214    pub fn set(&self, index: SubqueryIndex, value: ScalarValue) -> Result<()> {
215        let Some(slot) = self.slots.get(index.as_usize()) else {
216            return internal_err!(
217                "ScalarSubqueryResults: result index {} is out of bounds",
218                index.as_usize()
219            );
220        };
221
222        let mut slot = slot.lock().unwrap();
223        if slot.is_some() {
224            return internal_err!(
225                "ScalarSubqueryResults: result for index {} was already populated",
226                index.as_usize()
227            );
228        }
229        *slot = Some(value);
230
231        Ok(())
232    }
233
234    /// Clears all populated results so the container can be reused.
235    pub fn clear(&self) {
236        for slot in self.slots.iter() {
237            *slot.lock().unwrap() = None;
238        }
239    }
240
241    /// Returns true if `this` and `other` point to the same shared container.
242    pub fn ptr_eq(this: &Self, other: &Self) -> bool {
243        Arc::ptr_eq(&this.slots, &other.slots)
244    }
245}
246
247impl fmt::Debug for ScalarSubqueryResults {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        f.debug_list()
250            .entries(self.slots.iter().map(|slot| slot.lock().unwrap().clone()))
251            .finish()
252    }
253}
254
255impl PartialEq for ScalarSubqueryResults {
256    fn eq(&self, other: &Self) -> bool {
257        Self::ptr_eq(self, other)
258    }
259}
260
261impl Eq for ScalarSubqueryResults {}
262
263impl Hash for ScalarSubqueryResults {
264    fn hash<H: Hasher>(&self, state: &mut H) {
265        Arc::as_ptr(&self.slots).hash(state);
266    }
267}
268
269#[cfg(test)]
270mod test {
271    use super::*;
272
273    #[test]
274    fn debug() {
275        let props = ExecutionProps::new();
276        assert_eq!(
277            "ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, subquery_indexes: {}, subquery_results: [], lambda_variable_qualifier: {} }",
278            format!("{props:?}")
279        );
280    }
281
282    #[test]
283    fn scalar_subquery_results_set_and_get() -> Result<()> {
284        let results = ScalarSubqueryResults::new(1);
285        assert_eq!(results.get(SubqueryIndex::new(0)), None);
286
287        results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;
288        assert_eq!(
289            results.get(SubqueryIndex::new(0)),
290            Some(ScalarValue::Int32(Some(42)))
291        );
292        assert!(
293            results
294                .set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))
295                .is_err()
296        );
297
298        Ok(())
299    }
300
301    #[test]
302    fn scalar_subquery_results_clear() -> Result<()> {
303        let results = ScalarSubqueryResults::new(1);
304        results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;
305
306        results.clear();
307
308        assert_eq!(results.get(SubqueryIndex::new(0)), None);
309        results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))?;
310        assert_eq!(
311            results.get(SubqueryIndex::new(0)),
312            Some(ScalarValue::Int32(Some(7)))
313        );
314
315        Ok(())
316    }
317}