clickhouse_datafusion/context/
plan_node.rs1use std::sync::Arc;
4
5use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
6use datafusion::common::{DFSchemaRef, plan_err};
7use datafusion::datasource::source_as_provider;
8use datafusion::error::Result;
9use datafusion::logical_expr::{InvariantLevel, LogicalPlan, UserDefinedLogicalNodeCore};
10use datafusion::physical_plan::ExecutionPlan;
11use datafusion::prelude::Expr;
12use datafusion::sql::unparser::Unparser;
13
14use crate::ClickHouseConnectionPool;
15use crate::dialect::ClickHouseDialect;
16use crate::providers::table::ClickHouseTableProvider;
17use crate::sql::ClickHouseSqlExec;
18
19pub const CLICKHOUSE_FUNCTION_NODE_NAME: &str = "ClickHouseFunctionNode";
20
21#[derive(Clone, Debug)]
26pub struct ClickHouseFunctionNode {
27 pub(super) input: LogicalPlan,
29 schema: DFSchemaRef,
30 pool: Arc<ClickHouseConnectionPool>,
31 coerce_schema: bool,
32}
33
34impl ClickHouseFunctionNode {
35 pub fn try_new(input: LogicalPlan) -> Result<Self> {
43 let schema = Arc::clone(input.schema());
44 let mut coerce_schema = false;
46 let mut pool = None;
47 let _ = input
48 .apply(|plan| {
49 if let LogicalPlan::TableScan(scan) = plan {
50 let provider = source_as_provider(&scan.source)?;
52 if let Some(provider) =
53 provider.as_any().downcast_ref::<ClickHouseTableProvider>()
54 {
55 coerce_schema = provider.coerce_schema();
56 pool = Some(Arc::clone(provider.pool()));
57 return Ok(TreeNodeRecursion::Stop);
58 }
59 }
60 Ok(TreeNodeRecursion::Continue)
61 })
62 .unwrap();
64
65 let Some(pool) = pool else {
66 return plan_err!(
67 "ClickHouseFunctionNode: cannot execute without a connection pool, most likely a \
68 ClickHouseTableProvider was never found in the plan."
69 );
70 };
71
72 Ok(Self { input, schema, pool, coerce_schema })
73 }
74
75 pub(crate) fn execute(&self) -> Result<Arc<dyn ExecutionPlan>> {
76 let sql = Unparser::new(&ClickHouseDialect).plan_to_sql(&self.input)?.to_string();
77 ClickHouseSqlExec::try_new(None, self.input.schema().inner(), Arc::clone(&self.pool), sql)
78 .map(|ex| ex.with_coercion(self.coerce_schema))
79 .map(|ex| Arc::new(ex) as Arc<dyn ExecutionPlan>)
80 }
81}
82
83impl UserDefinedLogicalNodeCore for ClickHouseFunctionNode {
84 fn name(&self) -> &str { CLICKHOUSE_FUNCTION_NODE_NAME }
85
86 fn inputs(&self) -> Vec<&LogicalPlan> {
87 vec![]
92 }
93
94 fn schema(&self) -> &DFSchemaRef { &self.schema }
95
96 fn expressions(&self) -> Vec<Expr> { vec![] }
98
99 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "ClickHouseFunctionNode")
101 }
102
103 fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
105 if !exprs.is_empty() {
106 return plan_err!("ClickHouseFunctionNode expects no expressions");
107 }
108 if !inputs.is_empty() {
109 return plan_err!("ClickHouseFunctionNode expects no inputs");
110 }
111
112 Ok(self.clone())
113 }
140
141 fn check_invariants(&self, _check: InvariantLevel, _plan: &LogicalPlan) -> Result<()> {
142 Ok(())
144 }
145
146 fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
147 None
149 }
150
151 fn supports_limit_pushdown(&self) -> bool { false }
156}
157
158impl PartialEq for ClickHouseFunctionNode {
160 fn eq(&self, other: &Self) -> bool {
161 self.input == other.input
162 && self.name() == other.name()
163 && self.pool.join_push_down() == other.pool.join_push_down()
164 }
165}
166
167impl Eq for ClickHouseFunctionNode {}
168
169impl std::hash::Hash for ClickHouseFunctionNode {
170 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
171 self.name().hash(state);
172 self.input.hash(state);
173 self.pool.join_push_down().hash(state);
174 }
175}
176
177impl PartialOrd for ClickHouseFunctionNode {
178 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
179 self.input.partial_cmp(&other.input)
180 }
181}
182
183#[cfg(all(test, feature = "test-utils"))]
184mod tests {
185 use datafusion::arrow::datatypes::Schema;
186 use datafusion::datasource::empty::EmptyTable;
187 use datafusion::datasource::provider_as_source;
188 use datafusion::logical_expr::LogicalPlanBuilder;
189 use datafusion::sql::TableReference;
190
191 use super::*;
192
193 #[test]
194 fn test_plan_node_requires_provider() {
195 let provider = Arc::new(EmptyTable::new(Arc::new(Schema::empty())));
196 let err_plan = LogicalPlanBuilder::scan(
197 TableReference::bare("test"),
198 provider_as_source(provider),
199 None,
200 )
201 .unwrap()
202 .build()
203 .unwrap();
204 let result = ClickHouseFunctionNode::try_new(err_plan);
205 assert!(result.is_err(), "ClickHouseTableProvider required for a ClickHouseFunctionNode");
206 }
207
208 #[cfg(feature = "mocks")]
209 #[test]
210 fn test_verify_no_exprs_input() {
211 use datafusion::common::Column;
212
213 let pool = Arc::new(ClickHouseConnectionPool::new("pool".to_string(), ()));
214 let provider = Arc::new(ClickHouseTableProvider::new_with_schema_unchecked(
215 Arc::clone(&pool),
216 "table1".into(),
217 Arc::new(Schema::empty()),
218 ));
219 let plan = LogicalPlanBuilder::scan(
220 TableReference::bare("test"),
221 provider_as_source(provider),
222 None,
223 )
224 .unwrap()
225 .build()
226 .unwrap();
227
228 let node = ClickHouseFunctionNode::try_new(plan.clone()).unwrap();
229
230 let result =
231 node.with_exprs_and_inputs(vec![Expr::Column(Column::from_name("test_col"))], vec![]);
232 assert!(result.is_err(), "ClickHouseFunctionNode must take no exprs");
233
234 let result = node.with_exprs_and_inputs(vec![], vec![plan.clone()]);
235 assert!(result.is_err(), "ClickHouseFunctionNode must take no inputs");
236
237 assert!(
238 !node.supports_limit_pushdown(),
239 "ClickHouseFunctionNode does not support limit pushdown"
240 );
241
242 let other = ClickHouseFunctionNode::try_new(plan.clone()).unwrap();
243 assert_eq!(node, other, "PartialEq must be consistent");
244 assert_eq!(node.partial_cmp(&other), Some(std::cmp::Ordering::Equal), "Same node");
245 }
246}