1use std::any::Any;
7use std::fmt;
8use std::fmt::{Display, Formatter};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use datafusion::arrow::datatypes::{Schema, SchemaRef};
13use datafusion::catalog::Session;
14use datafusion::datasource::TableProvider;
15use datafusion::error::Result;
16use datafusion::execution::TaskContext;
17use datafusion::logical_expr::logical_plan::builder::LogicalTableSource;
18use datafusion::logical_expr::{
19 Expr, LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableType,
20};
21use datafusion::physical_expr::EquivalenceProperties;
22use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
23use datafusion::physical_plan::{
24 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
25 SendableRecordBatchStream,
26};
27use datafusion::sql::TableReference;
28use datafusion::sql::unparser::Unparser;
29use datafusion::sql::unparser::dialect::{DefaultDialect, Dialect};
30
31use crate::connection::ClickHouseConnectionPool;
32use crate::dialect::ClickHouseDialect;
33use crate::stream::RecordBatchStream;
34
35pub fn execute_sql_query(
37 sql: impl Into<String>,
38 pool: Arc<ClickHouseConnectionPool>,
39 schema: SchemaRef,
40 coerce_schema: bool,
41) -> SendableRecordBatchStream {
42 let sql = sql.into();
43 tracing::debug!("Running sql: {sql}");
44 Box::pin(RecordBatchStream::new_from_query(sql, pool, schema, coerce_schema))
45}
46
47#[derive(Clone, Debug, PartialEq, Eq, Hash)]
51pub enum JoinPushDown {
52 Disallow,
55 AllowedFor(String),
59}
60
61#[derive(Clone)]
63pub struct SqlTable {
64 pub table_reference: TableReference,
65 pub(crate) coerce_schema: bool,
66 pool: Arc<ClickHouseConnectionPool>,
67 name: String,
68 schema: SchemaRef,
69 dialect: Option<Arc<ClickHouseDialect>>,
70 exprs: Option<Vec<Expr>>,
71}
72
73impl fmt::Debug for SqlTable {
74 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
75 f.debug_struct("SqlTable")
76 .field("name", &self.name)
77 .field("schema", &self.schema)
78 .field("table_reference", &self.table_reference)
79 .field("pool", &"ClickHouseConnectionPool")
80 .field("dialect", &self.dialect)
81 .field("exprs", &self.exprs)
82 .field("coerce_schema", &self.coerce_schema)
83 .finish()
84 }
85}
86
87impl SqlTable {
88 pub async fn try_new(
93 name: &str,
94 pool: Arc<ClickHouseConnectionPool>,
95 table_reference: impl Into<TableReference>,
96 ) -> Result<Self> {
97 let table_reference = table_reference.into();
98 let schema = pool.connect().await?.get_schema(&table_reference).await?;
99 Ok(Self::new_with_schema_unchecked(name, pool, schema, table_reference))
100 }
101
102 pub fn new_with_schema_unchecked(
104 name: &str,
105 pool: Arc<ClickHouseConnectionPool>,
106 schema: impl Into<SchemaRef>,
107 table_reference: impl Into<TableReference>,
108 ) -> Self {
109 Self {
110 name: name.to_owned(),
111 pool,
112 schema: schema.into(),
113 table_reference: table_reference.into(),
114 dialect: Some(Arc::new(ClickHouseDialect)),
115 exprs: None,
116 coerce_schema: false,
117 }
118 }
119
120 #[must_use]
121 pub fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
122 self.exprs = Some(exprs);
123 self
124 }
125
126 #[must_use]
129 pub fn with_coercion(mut self, coerce: bool) -> Self {
130 self.coerce_schema = coerce;
131 self
132 }
133
134 pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
136
137 pub(crate) fn unique_id(&self) -> usize { std::ptr::from_ref(self) as usize }
139
140 fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
141 match &self.dialect {
142 Some(dialect) => dialect.as_ref(),
143 None => &DefaultDialect {},
144 }
145 }
146
147 #[cfg(feature = "federation")]
148 fn arc_dialect(&self) -> Arc<dyn Dialect + Send + Sync> {
149 match &self.dialect {
150 Some(dialect) => Arc::clone(dialect) as Arc<dyn Dialect>,
151 None => Arc::new(DefaultDialect {}),
152 }
153 }
154}
155
156impl SqlTable {
158 fn scan_to_sql(
161 &self,
162 projection: Option<&Vec<usize>>,
163 filters: &[Expr],
164 limit: Option<usize>,
165 ) -> Result<String> {
166 let logical_plan = self.create_logical_plan(projection, filters, limit)?;
167 let sql = Unparser::new(self.dialect()).plan_to_sql(&logical_plan)?.to_string();
168 Ok(sql)
169 }
170
171 fn create_logical_plan(
172 &self,
173 projection: Option<&Vec<usize>>,
174 filters: &[Expr],
175 limit: Option<usize>,
176 ) -> Result<LogicalPlan> {
177 let table_ref = match self.table_reference.clone() {
179 TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
180 table_ref => table_ref,
181 };
182 let table_source = LogicalTableSource::new(self.schema());
183 let mut builder = LogicalPlanBuilder::scan_with_filters(
184 table_ref,
185 Arc::new(table_source),
186 projection.cloned(),
187 filters.to_vec(),
188 )?;
189 if let Some(exprs) = &self.exprs {
190 builder = builder.project(exprs.clone())?;
191 }
192 builder.limit(0, limit)?.build()
193 }
194
195 fn create_physical_plan(
196 &self,
197 projection: Option<&Vec<usize>>,
198 sql: String,
199 ) -> Result<Arc<dyn ExecutionPlan>> {
200 Ok(Arc::new(
201 ClickHouseSqlExec::try_new(projection, &self.schema(), Arc::clone(&self.pool), sql)?
202 .with_coercion(self.coerce_schema),
203 ))
204 }
205}
206
207#[async_trait]
208impl TableProvider for SqlTable {
209 fn as_any(&self) -> &dyn Any { self }
210
211 fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
212
213 fn table_type(&self) -> TableType { TableType::Base }
214
215 fn supports_filters_pushdown(
216 &self,
217 filters: &[&Expr],
218 ) -> Result<Vec<TableProviderFilterPushDown>> {
219 let filter_push_down: Vec<TableProviderFilterPushDown> = filters
220 .iter()
221 .map(|f| match Unparser::new(self.dialect()).expr_to_sql(f) {
222 Ok(_) => TableProviderFilterPushDown::Exact,
223 Err(_) => TableProviderFilterPushDown::Unsupported,
224 })
225 .collect();
226
227 Ok(filter_push_down)
228 }
229
230 async fn scan(
231 &self,
232 _state: &dyn Session,
233 projection: Option<&Vec<usize>>,
234 filters: &[Expr],
235 limit: Option<usize>,
236 ) -> Result<Arc<dyn ExecutionPlan>> {
237 let sql = self.scan_to_sql(projection, filters, limit)?;
238 return self.create_physical_plan(projection, sql);
239 }
240}
241
242impl Display for SqlTable {
243 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
244 write!(f, "ClickHouseSqlTable {}", self.name)
245 }
246}
247
248pub fn project_schema_safe(
260 schema: &SchemaRef,
261 projection: Option<&Vec<usize>>,
262) -> Result<SchemaRef> {
263 let schema = match projection {
264 Some(columns) => {
265 if columns.is_empty() {
266 Arc::new(Schema::empty())
269 } else {
270 Arc::new(schema.project(columns)?)
271 }
272 }
273 None => Arc::clone(schema),
274 };
275 Ok(schema)
276}
277
278#[derive(Clone)]
280pub struct ClickHouseSqlExec {
281 projected_schema: SchemaRef,
282 pool: Arc<ClickHouseConnectionPool>,
283 sql: String,
284 properties: PlanProperties,
285 coerce_schema: bool,
286}
287
288impl ClickHouseSqlExec {
289 pub fn try_new(
294 projection: Option<&Vec<usize>>,
295 schema: &SchemaRef,
296 pool: Arc<ClickHouseConnectionPool>,
297 sql: String,
298 ) -> Result<Self> {
299 let projected_schema = project_schema_safe(schema, projection)?;
300 Ok(Self {
301 projected_schema: Arc::clone(&projected_schema),
302 pool,
303 sql,
304 properties: PlanProperties::new(
305 EquivalenceProperties::new(projected_schema),
306 Partitioning::UnknownPartitioning(1),
307 EmissionType::Incremental,
308 Boundedness::Bounded,
309 ),
310 coerce_schema: false,
311 })
312 }
313
314 #[must_use]
317 pub fn with_coercion(mut self, coerce: bool) -> Self {
318 self.coerce_schema = coerce;
319 self
320 }
321
322 pub fn sql(&self) -> Result<String> { Ok(self.sql.clone()) }
327}
328
329impl fmt::Debug for ClickHouseSqlExec {
330 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
331 let sql = self.sql().unwrap_or_default();
332 write!(f, "ClickHouseSqlExec sql={sql}")
333 }
334}
335
336impl DisplayAs for ClickHouseSqlExec {
337 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
338 let sql = self.sql().unwrap_or_default();
339 write!(f, "ClickHouseSqlExec sql={sql}")
340 }
341}
342
343impl ExecutionPlan for ClickHouseSqlExec {
344 fn name(&self) -> &'static str { "SqlExec" }
345
346 fn as_any(&self) -> &dyn Any { self }
347
348 fn schema(&self) -> SchemaRef { Arc::clone(&self.projected_schema) }
349
350 fn properties(&self) -> &PlanProperties { &self.properties }
351
352 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] }
353
354 fn with_new_children(
355 self: Arc<Self>,
356 _children: Vec<Arc<dyn ExecutionPlan>>,
357 ) -> Result<Arc<dyn ExecutionPlan>> {
358 Ok(self)
359 }
360
361 fn execute(
362 &self,
363 _partition: usize,
364 _context: Arc<TaskContext>,
365 ) -> Result<SendableRecordBatchStream> {
366 let sql = self.sql()?;
367 Ok(execute_sql_query(sql, Arc::clone(&self.pool), self.schema(), self.coerce_schema))
368 }
369}
370
371#[cfg(feature = "federation")]
372pub mod federation {
373
374 use std::sync::Arc;
375
376 use async_trait::async_trait;
377 use datafusion::arrow::datatypes::SchemaRef;
378 use datafusion::catalog::TableProvider;
379 use datafusion::common::exec_err;
380 use datafusion::error::Result;
381 use datafusion::physical_plan::SendableRecordBatchStream;
382 use datafusion::sql::TableReference;
383 use datafusion::sql::unparser::dialect::Dialect;
384 use datafusion_federation::sql::{
385 RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
386 };
387 use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
388
389 use super::{JoinPushDown, SqlTable, execute_sql_query};
390
391 impl SqlTable {
392 fn create_federated_table_source(self: Arc<Self>) -> Arc<dyn FederatedTableSource> {
394 let table_ref = RemoteTableRef::from(self.table_reference.clone());
395 let schema = self.schema();
396 let fed_provider = Arc::new(SQLFederationProvider::new(self));
397 Arc::new(SQLTableSource::new_with_schema(fed_provider, table_ref, schema))
398 }
399
400 pub fn create_federated_table_provider(
405 self: Arc<Self>,
406 ) -> Result<FederatedTableProviderAdaptor> {
407 let table_source = Self::create_federated_table_source(Arc::clone(&self));
408 Ok(FederatedTableProviderAdaptor::new_with_provider(table_source, self))
409 }
410 }
411
412 #[async_trait]
413 impl SQLExecutor for SqlTable {
414 fn name(&self) -> &str { &self.name }
415
416 fn compute_context(&self) -> Option<String> {
417 match self.pool.join_push_down() {
418 JoinPushDown::AllowedFor(context) => Some(context),
419 JoinPushDown::Disallow => Some(format!("{}", self.unique_id())),
424 }
425 }
426
427 fn dialect(&self) -> Arc<dyn Dialect> { self.arc_dialect() }
428
429 fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
430 let sql = sql.to_string();
431 let pool = Arc::clone(&self.pool);
432 let coerce_schema = self.coerce_schema;
433 Ok(execute_sql_query(sql, pool, schema, coerce_schema))
434 }
435
436 async fn table_names(&self) -> Result<Vec<String>> {
437 let Some(schema) = self.table_reference.schema() else {
438 return exec_err!("Cannot fetch tables without a schema");
439 };
440 self.pool.connect().await?.tables(schema).await
441 }
442
443 async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef> {
444 let table_ref = self
445 .table_reference
446 .schema()
447 .as_ref()
448 .map(|s| TableReference::partial(*s, table_name))
449 .unwrap_or(TableReference::from(table_name));
450 self.pool.connect().await?.get_schema(&table_ref).await
451 }
452 }
453}