datafusion_expr/
execution_props.rs1use crate::var_provider::{VarProvider, VarType};
19use chrono::{DateTime, Utc};
20use datafusion_common::HashMap;
21use datafusion_common::ScalarValue;
22use datafusion_common::TableReference;
23use datafusion_common::alias::AliasGenerator;
24use datafusion_common::config::ConfigOptions;
25use datafusion_common::{Result, internal_err};
26use std::fmt;
27use std::hash::{Hash, Hasher};
28use std::sync::{Arc, Mutex};
29
30#[derive(Clone, Debug)]
57pub struct ExecutionProps {
58 pub query_execution_start_time: Option<DateTime<Utc>>,
61 pub alias_generator: Arc<AliasGenerator>,
63 pub config_options: Option<Arc<ConfigOptions>>,
65 pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
67 pub subquery_indexes: HashMap<crate::logical_plan::Subquery, SubqueryIndex>,
70 pub subquery_results: ScalarSubqueryResults,
73 pub lambda_variable_qualifier: HashMap<String, TableReference>,
77}
78
79impl Default for ExecutionProps {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl ExecutionProps {
86 pub fn new() -> Self {
88 ExecutionProps {
89 query_execution_start_time: None,
90 alias_generator: Arc::new(AliasGenerator::new()),
91 config_options: None,
92 var_providers: None,
93 subquery_indexes: HashMap::new(),
94 subquery_results: ScalarSubqueryResults::default(),
95 lambda_variable_qualifier: HashMap::new(),
96 }
97 }
98
99 pub fn with_query_execution_start_time(
101 mut self,
102 query_execution_start_time: DateTime<Utc>,
103 ) -> Self {
104 self.query_execution_start_time = Some(query_execution_start_time);
105 self
106 }
107
108 #[deprecated(since = "50.0.0", note = "Use mark_start_execution instead")]
109 pub fn start_execution(&mut self) -> &Self {
110 let default_config = Arc::new(ConfigOptions::default());
111 self.mark_start_execution(default_config)
112 }
113
114 pub fn mark_start_execution(&mut self, config_options: Arc<ConfigOptions>) -> &Self {
117 self.query_execution_start_time = Some(Utc::now());
118 self.alias_generator = Arc::new(AliasGenerator::new());
119 self.config_options = Some(config_options);
120 &*self
121 }
122
123 pub fn add_var_provider(
125 &mut self,
126 var_type: VarType,
127 provider: Arc<dyn VarProvider + Send + Sync>,
128 ) -> Option<Arc<dyn VarProvider + Send + Sync>> {
129 let mut var_providers = self.var_providers.take().unwrap_or_default();
130
131 let old_provider = var_providers.insert(var_type, provider);
132
133 self.var_providers = Some(var_providers);
134
135 old_provider
136 }
137
138 #[expect(clippy::needless_pass_by_value)]
140 pub fn get_var_provider(
141 &self,
142 var_type: VarType,
143 ) -> Option<Arc<dyn VarProvider + Send + Sync>> {
144 self.var_providers
145 .as_ref()
146 .and_then(|var_providers| var_providers.get(&var_type).cloned())
147 }
148
149 pub fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
152 self.config_options.as_ref()
153 }
154
155 pub fn with_qualified_lambda_variables(
158 mut self,
159 qualifier: &TableReference,
160 variables: &[String],
161 ) -> Self {
162 for var in variables {
163 self.lambda_variable_qualifier
164 .entry_ref(var)
165 .insert(qualifier.clone());
166 }
167
168 self
169 }
170}
171
172#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
174pub struct SubqueryIndex(usize);
175
176impl SubqueryIndex {
177 pub const fn new(index: usize) -> Self {
179 Self(index)
180 }
181
182 pub const fn as_usize(self) -> usize {
184 self.0
185 }
186}
187
188#[derive(Clone, Default)]
195pub struct ScalarSubqueryResults {
196 slots: Arc<Vec<Mutex<Option<ScalarValue>>>>,
197}
198
199impl ScalarSubqueryResults {
200 pub fn new(n: usize) -> Self {
202 Self {
203 slots: Arc::new((0..n).map(|_| Mutex::new(None)).collect()),
204 }
205 }
206
207 pub fn get(&self, index: SubqueryIndex) -> Option<ScalarValue> {
209 let slot = self.slots.get(index.as_usize())?;
210 slot.lock().unwrap().clone()
211 }
212
213 pub fn set(&self, index: SubqueryIndex, value: ScalarValue) -> Result<()> {
215 let Some(slot) = self.slots.get(index.as_usize()) else {
216 return internal_err!(
217 "ScalarSubqueryResults: result index {} is out of bounds",
218 index.as_usize()
219 );
220 };
221
222 let mut slot = slot.lock().unwrap();
223 if slot.is_some() {
224 return internal_err!(
225 "ScalarSubqueryResults: result for index {} was already populated",
226 index.as_usize()
227 );
228 }
229 *slot = Some(value);
230
231 Ok(())
232 }
233
234 pub fn clear(&self) {
236 for slot in self.slots.iter() {
237 *slot.lock().unwrap() = None;
238 }
239 }
240
241 pub fn ptr_eq(this: &Self, other: &Self) -> bool {
243 Arc::ptr_eq(&this.slots, &other.slots)
244 }
245}
246
247impl fmt::Debug for ScalarSubqueryResults {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 f.debug_list()
250 .entries(self.slots.iter().map(|slot| slot.lock().unwrap().clone()))
251 .finish()
252 }
253}
254
255impl PartialEq for ScalarSubqueryResults {
256 fn eq(&self, other: &Self) -> bool {
257 Self::ptr_eq(self, other)
258 }
259}
260
261impl Eq for ScalarSubqueryResults {}
262
263impl Hash for ScalarSubqueryResults {
264 fn hash<H: Hasher>(&self, state: &mut H) {
265 Arc::as_ptr(&self.slots).hash(state);
266 }
267}
268
269#[cfg(test)]
270mod test {
271 use super::*;
272
273 #[test]
274 fn debug() {
275 let props = ExecutionProps::new();
276 assert_eq!(
277 "ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, subquery_indexes: {}, subquery_results: [], lambda_variable_qualifier: {} }",
278 format!("{props:?}")
279 );
280 }
281
282 #[test]
283 fn scalar_subquery_results_set_and_get() -> Result<()> {
284 let results = ScalarSubqueryResults::new(1);
285 assert_eq!(results.get(SubqueryIndex::new(0)), None);
286
287 results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;
288 assert_eq!(
289 results.get(SubqueryIndex::new(0)),
290 Some(ScalarValue::Int32(Some(42)))
291 );
292 assert!(
293 results
294 .set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))
295 .is_err()
296 );
297
298 Ok(())
299 }
300
301 #[test]
302 fn scalar_subquery_results_clear() -> Result<()> {
303 let results = ScalarSubqueryResults::new(1);
304 results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;
305
306 results.clear();
307
308 assert_eq!(results.get(SubqueryIndex::new(0)), None);
309 results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))?;
310 assert_eq!(
311 results.get(SubqueryIndex::new(0)),
312 Some(ScalarValue::Int32(Some(7)))
313 );
314
315 Ok(())
316 }
317}