clickhouse_datafusion/
utils.rs

1//! Helpers for bridging between `ClickHouse`, `DataFusion`, and [`clickhouse_arrow`].
2mod create;
3mod errors;
4pub(crate) mod params;
5
6use datafusion::execution::SessionStateDefaults;
7use datafusion::prelude::SessionContext;
8
9pub use self::create::*;
10pub use self::errors::*;
11pub use self::params::*;
12
13/// Helper function to register built-in functions in a session context.
14///
15/// `DataFusion` is quite useless without these imo.
16pub fn register_builtins(ctx: &SessionContext) {
17    // Register all udf functions so that items like "make_array" are available
18    SessionStateDefaults::register_builtin_functions(&mut ctx.state_ref().write());
19    // Make sure all ch functions are available
20    super::udfs::register_clickhouse_functions(ctx);
21}
22
23pub(crate) mod analyze {
24    use std::collections::HashMap;
25
26    use datafusion::common::tree_node::{
27        Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
28    };
29    use datafusion::common::{Column, qualified_name};
30    use datafusion::logical_expr::SubqueryAlias;
31    use datafusion::prelude::Expr;
32
33    /// Helper function to rewrite exprs, replacing any subquery alias references with the table
34    /// references of its input.
35    ///
36    /// # Panics
37    /// - Does not panic, no error is returned during traversal transformation
38    pub(crate) fn push_exprs_below_subquery(
39        exprs: Vec<Expr>,
40        subquery_alias: &SubqueryAlias,
41    ) -> Vec<Expr> {
42        let mut replace_map = HashMap::new();
43        for (i, (qualifier, field)) in subquery_alias.input.schema().iter().enumerate() {
44            let (sub_qualifier, sub_field) = subquery_alias.schema.qualified_field(i);
45            drop(replace_map.insert(
46                qualified_name(sub_qualifier, sub_field.name()),
47                Expr::Column(Column::new(qualifier.cloned(), field.name())),
48            ));
49        }
50
51        exprs
52            .into_iter()
53            .map(|expr| {
54                expr.transform_up(|e| {
55                    Ok(if let Expr::Column(c) = &e {
56                        replace_map
57                            .get(&c.flat_name())
58                            .map(|new_c| {
59                                Transformed::new(new_c.clone(), true, TreeNodeRecursion::Jump)
60                            })
61                            .unwrap_or(Transformed::no(e))
62                    } else {
63                        Transformed::no(e)
64                    })
65                })
66                .data()
67                .unwrap()
68            })
69            .collect()
70    }
71}