use crate::var_provider::{VarProvider, VarType};
use chrono::{DateTime, Utc};
use datafusion_common::HashMap;
use datafusion_common::ScalarValue;
use datafusion_common::TableReference;
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Result, internal_err};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct ExecutionProps {
pub query_execution_start_time: Option<DateTime<Utc>>,
pub alias_generator: Arc<AliasGenerator>,
pub config_options: Option<Arc<ConfigOptions>>,
pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
pub subquery_indexes: HashMap<crate::logical_plan::Subquery, SubqueryIndex>,
pub subquery_results: ScalarSubqueryResults,
pub lambda_variable_qualifier: HashMap<String, TableReference>,
}
impl Default for ExecutionProps {
fn default() -> Self {
Self::new()
}
}
impl ExecutionProps {
pub fn new() -> Self {
ExecutionProps {
query_execution_start_time: None,
alias_generator: Arc::new(AliasGenerator::new()),
config_options: None,
var_providers: None,
subquery_indexes: HashMap::new(),
subquery_results: ScalarSubqueryResults::default(),
lambda_variable_qualifier: HashMap::new(),
}
}
pub fn with_query_execution_start_time(
mut self,
query_execution_start_time: DateTime<Utc>,
) -> Self {
self.query_execution_start_time = Some(query_execution_start_time);
self
}
#[deprecated(since = "50.0.0", note = "Use mark_start_execution instead")]
pub fn start_execution(&mut self) -> &Self {
let default_config = Arc::new(ConfigOptions::default());
self.mark_start_execution(default_config)
}
pub fn mark_start_execution(&mut self, config_options: Arc<ConfigOptions>) -> &Self {
self.query_execution_start_time = Some(Utc::now());
self.alias_generator = Arc::new(AliasGenerator::new());
self.config_options = Some(config_options);
&*self
}
pub fn add_var_provider(
&mut self,
var_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
let mut var_providers = self.var_providers.take().unwrap_or_default();
let old_provider = var_providers.insert(var_type, provider);
self.var_providers = Some(var_providers);
old_provider
}
#[expect(clippy::needless_pass_by_value)]
pub fn get_var_provider(
&self,
var_type: VarType,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
self.var_providers
.as_ref()
.and_then(|var_providers| var_providers.get(&var_type).cloned())
}
pub fn config_options(&self) -> Option<&Arc<ConfigOptions>> {
self.config_options.as_ref()
}
pub fn with_qualified_lambda_variables(
mut self,
qualifier: &TableReference,
variables: &[String],
) -> Self {
for var in variables {
self.lambda_variable_qualifier
.entry_ref(var)
.insert(qualifier.clone());
}
self
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct SubqueryIndex(usize);
impl SubqueryIndex {
pub const fn new(index: usize) -> Self {
Self(index)
}
pub const fn as_usize(self) -> usize {
self.0
}
}
#[derive(Clone, Default)]
pub struct ScalarSubqueryResults {
slots: Arc<Vec<Mutex<Option<ScalarValue>>>>,
}
impl ScalarSubqueryResults {
pub fn new(n: usize) -> Self {
Self {
slots: Arc::new((0..n).map(|_| Mutex::new(None)).collect()),
}
}
pub fn get(&self, index: SubqueryIndex) -> Option<ScalarValue> {
let slot = self.slots.get(index.as_usize())?;
slot.lock().unwrap().clone()
}
pub fn set(&self, index: SubqueryIndex, value: ScalarValue) -> Result<()> {
let Some(slot) = self.slots.get(index.as_usize()) else {
return internal_err!(
"ScalarSubqueryResults: result index {} is out of bounds",
index.as_usize()
);
};
let mut slot = slot.lock().unwrap();
if slot.is_some() {
return internal_err!(
"ScalarSubqueryResults: result for index {} was already populated",
index.as_usize()
);
}
*slot = Some(value);
Ok(())
}
pub fn clear(&self) {
for slot in self.slots.iter() {
*slot.lock().unwrap() = None;
}
}
pub fn ptr_eq(this: &Self, other: &Self) -> bool {
Arc::ptr_eq(&this.slots, &other.slots)
}
}
impl fmt::Debug for ScalarSubqueryResults {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list()
.entries(self.slots.iter().map(|slot| slot.lock().unwrap().clone()))
.finish()
}
}
impl PartialEq for ScalarSubqueryResults {
fn eq(&self, other: &Self) -> bool {
Self::ptr_eq(self, other)
}
}
impl Eq for ScalarSubqueryResults {}
impl Hash for ScalarSubqueryResults {
fn hash<H: Hasher>(&self, state: &mut H) {
Arc::as_ptr(&self.slots).hash(state);
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn debug() {
let props = ExecutionProps::new();
assert_eq!(
"ExecutionProps { query_execution_start_time: None, alias_generator: AliasGenerator { next_id: 1 }, config_options: None, var_providers: None, subquery_indexes: {}, subquery_results: [], lambda_variable_qualifier: {} }",
format!("{props:?}")
);
}
#[test]
fn scalar_subquery_results_set_and_get() -> Result<()> {
let results = ScalarSubqueryResults::new(1);
assert_eq!(results.get(SubqueryIndex::new(0)), None);
results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;
assert_eq!(
results.get(SubqueryIndex::new(0)),
Some(ScalarValue::Int32(Some(42)))
);
assert!(
results
.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))
.is_err()
);
Ok(())
}
#[test]
fn scalar_subquery_results_clear() -> Result<()> {
let results = ScalarSubqueryResults::new(1);
results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(42)))?;
results.clear();
assert_eq!(results.get(SubqueryIndex::new(0)), None);
results.set(SubqueryIndex::new(0), ScalarValue::Int32(Some(7)))?;
assert_eq!(
results.get(SubqueryIndex::new(0)),
Some(ScalarValue::Int32(Some(7)))
);
Ok(())
}
}