1use std::any::Any;
7use std::fmt;
8use std::fmt::{Display, Formatter};
9use std::sync::{Arc, LazyLock};
10
11use async_trait::async_trait;
12use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
13use datafusion::catalog::Session;
14use datafusion::datasource::TableProvider;
15use datafusion::error::Result;
16use datafusion::execution::TaskContext;
17use datafusion::logical_expr::logical_plan::builder::LogicalTableSource;
18use datafusion::logical_expr::{
19 Expr, LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableType,
20};
21use datafusion::physical_expr::EquivalenceProperties;
22use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
23use datafusion::physical_plan::{
24 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
25 SendableRecordBatchStream,
26};
27use datafusion::sql::TableReference;
28use datafusion::sql::unparser::Unparser;
29use datafusion::sql::unparser::dialect::{DefaultDialect, Dialect};
30
31use crate::connection::ClickHouseConnectionPool;
32use crate::dialect::ClickHouseDialect;
33use crate::stream::RecordBatchStream;
34
35pub fn execute_sql_query(
37 sql: impl Into<String>,
38 pool: Arc<ClickHouseConnectionPool>,
39 schema: SchemaRef,
40 coerce_schema: bool,
41) -> SendableRecordBatchStream {
42 let sql = sql.into();
43 tracing::debug!("Running sql: {sql}");
44 Box::pin(RecordBatchStream::new_from_query(sql, pool, schema, coerce_schema))
45}
46
47#[derive(Clone, Debug, PartialEq, Eq, Hash)]
51pub enum JoinPushDown {
52 Disallow,
55 AllowedFor(String),
59}
60
61#[derive(Clone)]
63pub struct SqlTable {
64 pub table_reference: TableReference,
65 pub(crate) coerce_schema: bool,
66 pool: Arc<ClickHouseConnectionPool>,
67 name: String,
68 schema: SchemaRef,
69 dialect: Option<Arc<ClickHouseDialect>>,
70 exprs: Option<Vec<Expr>>,
71}
72
73impl fmt::Debug for SqlTable {
74 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
75 f.debug_struct("SqlTable")
76 .field("name", &self.name)
77 .field("schema", &self.schema)
78 .field("table_reference", &self.table_reference)
79 .field("pool", &"ClickHouseConnectionPool")
80 .field("dialect", &self.dialect)
81 .field("exprs", &self.exprs)
82 .field("coerce_schema", &self.coerce_schema)
83 .finish()
84 }
85}
86
87impl SqlTable {
88 pub async fn try_new(
93 name: &str,
94 pool: Arc<ClickHouseConnectionPool>,
95 table_reference: impl Into<TableReference>,
96 ) -> Result<Self> {
97 let table_reference = table_reference.into();
98 let schema = pool.connect().await?.get_schema(&table_reference).await?;
99 Ok(Self::new_with_schema_unchecked(name, pool, schema, table_reference))
100 }
101
102 pub fn new_with_schema_unchecked(
104 name: &str,
105 pool: Arc<ClickHouseConnectionPool>,
106 schema: impl Into<SchemaRef>,
107 table_reference: impl Into<TableReference>,
108 ) -> Self {
109 Self {
110 name: name.to_owned(),
111 pool,
112 schema: schema.into(),
113 table_reference: table_reference.into(),
114 dialect: Some(Arc::new(ClickHouseDialect)),
115 exprs: None,
116 coerce_schema: false,
117 }
118 }
119
120 #[must_use]
121 pub fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
122 self.exprs = Some(exprs);
123 self
124 }
125
126 #[must_use]
129 pub fn with_coercion(mut self, coerce: bool) -> Self {
130 self.coerce_schema = coerce;
131 self
132 }
133
134 pub fn pool(&self) -> &Arc<ClickHouseConnectionPool> { &self.pool }
136
137 pub(crate) fn unique_id(&self) -> usize { std::ptr::from_ref(self) as usize }
139
140 fn dialect(&self) -> &(dyn Dialect + Send + Sync) {
141 match &self.dialect {
142 Some(dialect) => dialect.as_ref(),
143 None => &DefaultDialect {},
144 }
145 }
146
147 #[cfg(feature = "federation")]
148 fn arc_dialect(&self) -> Arc<dyn Dialect + Send + Sync> {
149 match &self.dialect {
150 Some(dialect) => Arc::clone(dialect) as Arc<dyn Dialect>,
151 None => Arc::new(DefaultDialect {}),
152 }
153 }
154}
155
156impl SqlTable {
158 fn scan_to_sql(
161 &self,
162 projection: Option<&Vec<usize>>,
163 filters: &[Expr],
164 limit: Option<usize>,
165 ) -> Result<String> {
166 let logical_plan = self.create_logical_plan(projection, filters, limit)?;
167 let sql = Unparser::new(self.dialect()).plan_to_sql(&logical_plan)?.to_string();
168 Ok(sql)
169 }
170
171 fn create_logical_plan(
172 &self,
173 projection: Option<&Vec<usize>>,
174 filters: &[Expr],
175 limit: Option<usize>,
176 ) -> Result<LogicalPlan> {
177 let table_ref = match self.table_reference.clone() {
179 TableReference::Full { schema, table, .. } => TableReference::partial(schema, table),
180 table_ref => table_ref,
181 };
182 let table_source = LogicalTableSource::new(self.schema());
183 let mut builder = LogicalPlanBuilder::scan_with_filters(
184 table_ref,
185 Arc::new(table_source),
186 projection.cloned(),
187 filters.to_vec(),
188 )?;
189 if let Some(exprs) = &self.exprs {
190 builder = builder.project(exprs.clone())?;
191 }
192 builder.limit(0, limit)?.build()
193 }
194
195 fn create_physical_plan(
196 &self,
197 projection: Option<&Vec<usize>>,
198 sql: String,
199 ) -> Result<Arc<dyn ExecutionPlan>> {
200 Ok(Arc::new(
201 ClickHouseSqlExec::try_new(projection, &self.schema(), Arc::clone(&self.pool), sql)?
202 .with_coercion(self.coerce_schema),
203 ))
204 }
205}
206
207#[async_trait]
208impl TableProvider for SqlTable {
209 fn as_any(&self) -> &dyn Any { self }
210
211 fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) }
212
213 fn table_type(&self) -> TableType { TableType::Base }
214
215 fn supports_filters_pushdown(
216 &self,
217 filters: &[&Expr],
218 ) -> Result<Vec<TableProviderFilterPushDown>> {
219 let filter_push_down: Vec<TableProviderFilterPushDown> = filters
220 .iter()
221 .map(|f| match Unparser::new(self.dialect()).expr_to_sql(f) {
222 Ok(_) => TableProviderFilterPushDown::Exact,
223 Err(_) => TableProviderFilterPushDown::Unsupported,
224 })
225 .collect();
226
227 Ok(filter_push_down)
228 }
229
230 async fn scan(
231 &self,
232 _state: &dyn Session,
233 projection: Option<&Vec<usize>>,
234 filters: &[Expr],
235 limit: Option<usize>,
236 ) -> Result<Arc<dyn ExecutionPlan>> {
237 let sql = self.scan_to_sql(projection, filters, limit)?;
238 return self.create_physical_plan(projection, sql);
239 }
240}
241
242impl Display for SqlTable {
243 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
244 write!(f, "ClickHouseSqlTable {}", self.name)
245 }
246}
247
248static ONE_COLUMN_SCHEMA: LazyLock<SchemaRef> =
249 LazyLock::new(|| Arc::new(Schema::new(vec![Field::new("1", DataType::Int64, true)])));
250
251pub fn project_schema_safe(
256 schema: &SchemaRef,
257 projection: Option<&Vec<usize>>,
258) -> Result<SchemaRef> {
259 let schema = match projection {
260 Some(columns) => {
261 if columns.is_empty() {
262 ONE_COLUMN_SCHEMA.clone()
267 } else {
268 Arc::new(schema.project(columns)?)
269 }
270 }
271 None => Arc::clone(schema),
272 };
273 Ok(schema)
274}
275
276#[derive(Clone)]
278pub struct ClickHouseSqlExec {
279 projected_schema: SchemaRef,
280 pool: Arc<ClickHouseConnectionPool>,
281 sql: String,
282 properties: PlanProperties,
283 coerce_schema: bool,
284}
285
286impl ClickHouseSqlExec {
287 pub fn try_new(
292 projection: Option<&Vec<usize>>,
293 schema: &SchemaRef,
294 pool: Arc<ClickHouseConnectionPool>,
295 sql: String,
296 ) -> Result<Self> {
297 let projected_schema = project_schema_safe(schema, projection)?;
298 Ok(Self {
299 projected_schema: Arc::clone(&projected_schema),
300 pool,
301 sql,
302 properties: PlanProperties::new(
303 EquivalenceProperties::new(projected_schema),
304 Partitioning::UnknownPartitioning(1),
305 EmissionType::Incremental,
306 Boundedness::Bounded,
307 ),
308 coerce_schema: false,
309 })
310 }
311
312 #[must_use]
315 pub fn with_coercion(mut self, coerce: bool) -> Self {
316 self.coerce_schema = coerce;
317 self
318 }
319
320 pub fn sql(&self) -> Result<String> { Ok(self.sql.clone()) }
325}
326
327impl fmt::Debug for ClickHouseSqlExec {
328 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
329 let sql = self.sql().unwrap_or_default();
330 write!(f, "ClickHouseSqlExec sql={sql}")
331 }
332}
333
334impl DisplayAs for ClickHouseSqlExec {
335 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter<'_>) -> fmt::Result {
336 let sql = self.sql().unwrap_or_default();
337 write!(f, "ClickHouseSqlExec sql={sql}")
338 }
339}
340
341impl ExecutionPlan for ClickHouseSqlExec {
342 fn name(&self) -> &'static str { "SqlExec" }
343
344 fn as_any(&self) -> &dyn Any { self }
345
346 fn schema(&self) -> SchemaRef { Arc::clone(&self.projected_schema) }
347
348 fn properties(&self) -> &PlanProperties { &self.properties }
349
350 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { vec![] }
351
352 fn with_new_children(
353 self: Arc<Self>,
354 _children: Vec<Arc<dyn ExecutionPlan>>,
355 ) -> Result<Arc<dyn ExecutionPlan>> {
356 Ok(self)
357 }
358
359 fn execute(
360 &self,
361 _partition: usize,
362 _context: Arc<TaskContext>,
363 ) -> Result<SendableRecordBatchStream> {
364 let sql = self.sql()?;
365 Ok(execute_sql_query(sql, Arc::clone(&self.pool), self.schema(), self.coerce_schema))
366 }
367}
368
369#[cfg(feature = "federation")]
370pub mod federation {
371
372 use std::sync::Arc;
373
374 use async_trait::async_trait;
375 use datafusion::arrow::datatypes::SchemaRef;
376 use datafusion::catalog::TableProvider;
377 use datafusion::common::exec_err;
378 use datafusion::error::Result;
379 use datafusion::physical_plan::SendableRecordBatchStream;
380 use datafusion::sql::TableReference;
381 use datafusion::sql::unparser::dialect::Dialect;
382 use datafusion_federation::sql::{
383 RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
384 };
385 use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
386
387 use super::{JoinPushDown, SqlTable, execute_sql_query};
388
389 impl SqlTable {
390 fn create_federated_table_source(self: Arc<Self>) -> Arc<dyn FederatedTableSource> {
392 let table_ref = RemoteTableRef::from(self.table_reference.clone());
393 let schema = self.schema();
394 let fed_provider = Arc::new(SQLFederationProvider::new(self));
395 Arc::new(SQLTableSource::new_with_schema(fed_provider, table_ref, schema))
396 }
397
398 pub fn create_federated_table_provider(
403 self: Arc<Self>,
404 ) -> Result<FederatedTableProviderAdaptor> {
405 let table_source = Self::create_federated_table_source(Arc::clone(&self));
406 Ok(FederatedTableProviderAdaptor::new_with_provider(table_source, self))
407 }
408 }
409
410 #[async_trait]
411 impl SQLExecutor for SqlTable {
412 fn name(&self) -> &str { &self.name }
413
414 fn compute_context(&self) -> Option<String> {
415 match self.pool.join_push_down() {
416 JoinPushDown::AllowedFor(context) => Some(context),
417 JoinPushDown::Disallow => Some(format!("{}", self.unique_id())),
422 }
423 }
424
425 fn dialect(&self) -> Arc<dyn Dialect> { self.arc_dialect() }
426
427 fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
428 let sql = sql.to_string();
429 let pool = Arc::clone(&self.pool);
430 let coerce_schema = self.coerce_schema;
431 Ok(execute_sql_query(sql, pool, schema, coerce_schema))
432 }
433
434 async fn table_names(&self) -> Result<Vec<String>> {
435 let Some(schema) = self.table_reference.schema() else {
436 return exec_err!("Cannot fetch tables without a schema");
437 };
438 self.pool.connect().await?.tables(schema).await
439 }
440
441 async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef> {
442 let table_ref = self
443 .table_reference
444 .schema()
445 .as_ref()
446 .map(|s| TableReference::partial(*s, table_name))
447 .unwrap_or(TableReference::from(table_name));
448 self.pool.connect().await?.get_schema(&table_ref).await
449 }
450 }
451}