datafusion_table_providers/
clickhouse.rs1use clickhouse::Client;
18use datafusion::arrow::datatypes::SchemaRef;
19use datafusion::catalog::TableProvider;
20use datafusion::sql::sqlparser::ast::{
21 Expr, FunctionArg, FunctionArgExpr, FunctionArgOperator, Ident, Value,
22};
23use datafusion::sql::unparser;
24use datafusion::{common::Constraints, sql::TableReference};
25use std::sync::Arc;
26
27use crate::sql::db_connection_pool::clickhousepool::ClickHouseConnectionPool;
28use crate::sql::db_connection_pool::dbconnection::AsyncDbConnection;
29
30#[cfg(feature = "clickhouse-federation")]
31mod federation;
32mod sql_table;
33
34pub struct ClickHouseTableFactory {
35 pool: Arc<ClickHouseConnectionPool>,
36}
37
38impl ClickHouseTableFactory {
39 pub fn new(pool: impl Into<Arc<ClickHouseConnectionPool>>) -> Self {
40 Self { pool: pool.into() }
41 }
42
43 pub async fn table_provider(
44 &self,
45 table_reference: TableReference,
46 args: Option<Vec<(String, Arg)>>,
47 ) -> Result<Arc<dyn TableProvider + 'static>, Box<dyn std::error::Error + Send + Sync + 'static>>
48 {
49 let client: &dyn AsyncDbConnection<Client, ()> = &self.pool.client();
50 let schema = client.get_schema(&table_reference).await?;
51 let table_provider = Arc::new(ClickHouseTable::new(
52 table_reference,
53 args,
54 self.pool.clone(),
55 schema,
56 Constraints::default(),
57 ));
58
59 #[cfg(feature = "clickhouse-federation")]
60 let table_provider = Arc::new(
61 table_provider
62 .create_federated_table_provider()
63 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
64 );
65
66 Ok(table_provider)
67 }
68}
69
70#[derive(Debug, Clone)]
71pub enum Arg {
72 Unsigned(u64),
73 Signed(i64),
74 String(String),
75}
76
77impl From<String> for Arg {
78 fn from(value: String) -> Self {
79 Self::String(value)
80 }
81}
82
83impl From<u64> for Arg {
84 fn from(value: u64) -> Self {
85 Self::Unsigned(value)
86 }
87}
88
89impl From<i64> for Arg {
90 fn from(value: i64) -> Self {
91 Self::Signed(value)
92 }
93}
94
95impl From<Arg> for Expr {
96 fn from(value: Arg) -> Self {
97 Expr::value(match value {
98 Arg::Unsigned(x) => Value::Number(x.to_string(), false),
99 Arg::Signed(x) => Value::Number(x.to_string(), false),
100 Arg::String(x) => Value::SingleQuotedString(x),
101 })
102 }
103}
104
105fn into_table_args(args: Vec<(String, Arg)>) -> Vec<FunctionArg> {
106 args.into_iter()
107 .map(|(name, value)| FunctionArg::Named {
108 name: Ident::new(name),
109 arg: FunctionArgExpr::Expr(value.into()),
110 operator: FunctionArgOperator::Equals,
111 })
112 .collect()
113}
114
115pub struct ClickHouseTable {
116 table_reference: TableReference,
117 args: Option<Vec<(String, Arg)>>,
118 pool: Arc<ClickHouseConnectionPool>,
119 schema: SchemaRef,
120 constraints: Constraints,
121 dialect: Arc<dyn unparser::dialect::Dialect>,
122}
123
124impl std::fmt::Debug for ClickHouseTable {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("ClickHouseTable")
127 .field("table_name", &self.table_reference)
128 .field("schema", &self.schema)
129 .field("constraints", &self.constraints)
130 .finish()
131 }
132}
133
134impl ClickHouseTable {
135 pub fn new(
136 table_reference: TableReference,
137 args: Option<Vec<(String, Arg)>>,
138 pool: Arc<ClickHouseConnectionPool>,
139 schema: SchemaRef,
140 constraints: Constraints,
141 ) -> Self {
142 Self {
143 table_reference,
144 args,
145 pool,
146 schema,
147 constraints,
148 dialect: Arc::new(unparser::dialect::DefaultDialect {}),
149 }
150 }
151}