datafusion_table_providers/
clickhouse.rs

1/*
2Copyright 2024 The Spice.ai OSS Authors
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8     https://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17use clickhouse::Client;
18use datafusion::arrow::datatypes::SchemaRef;
19use datafusion::catalog::TableProvider;
20use datafusion::sql::sqlparser::ast::{
21    Expr, FunctionArg, FunctionArgExpr, FunctionArgOperator, Ident, Value,
22};
23use datafusion::sql::unparser;
24use datafusion::{common::Constraints, sql::TableReference};
25use std::sync::Arc;
26
27use crate::sql::db_connection_pool::clickhousepool::ClickHouseConnectionPool;
28use crate::sql::db_connection_pool::dbconnection::AsyncDbConnection;
29
30#[cfg(feature = "clickhouse-federation")]
31mod federation;
32mod sql_table;
33
34pub struct ClickHouseTableFactory {
35    pool: Arc<ClickHouseConnectionPool>,
36}
37
38impl ClickHouseTableFactory {
39    pub fn new(pool: impl Into<Arc<ClickHouseConnectionPool>>) -> Self {
40        Self { pool: pool.into() }
41    }
42
43    pub async fn table_provider(
44        &self,
45        table_reference: TableReference,
46        args: Option<Vec<(String, Arg)>>,
47    ) -> Result<Arc<dyn TableProvider + 'static>, Box<dyn std::error::Error + Send + Sync + 'static>>
48    {
49        let client: &dyn AsyncDbConnection<Client, ()> = &self.pool.client();
50        let schema = client.get_schema(&table_reference).await?;
51        let table_provider = Arc::new(ClickHouseTable::new(
52            table_reference,
53            args,
54            self.pool.clone(),
55            schema,
56            Constraints::default(),
57        ));
58
59        #[cfg(feature = "clickhouse-federation")]
60        let table_provider = Arc::new(
61            table_provider
62                .create_federated_table_provider()
63                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
64        );
65
66        Ok(table_provider)
67    }
68}
69
70#[derive(Debug, Clone)]
71pub enum Arg {
72    Unsigned(u64),
73    Signed(i64),
74    String(String),
75}
76
77impl From<String> for Arg {
78    fn from(value: String) -> Self {
79        Self::String(value)
80    }
81}
82
83impl From<u64> for Arg {
84    fn from(value: u64) -> Self {
85        Self::Unsigned(value)
86    }
87}
88
89impl From<i64> for Arg {
90    fn from(value: i64) -> Self {
91        Self::Signed(value)
92    }
93}
94
95impl From<Arg> for Expr {
96    fn from(value: Arg) -> Self {
97        Expr::value(match value {
98            Arg::Unsigned(x) => Value::Number(x.to_string(), false),
99            Arg::Signed(x) => Value::Number(x.to_string(), false),
100            Arg::String(x) => Value::SingleQuotedString(x),
101        })
102    }
103}
104
105fn into_table_args(args: Vec<(String, Arg)>) -> Vec<FunctionArg> {
106    args.into_iter()
107        .map(|(name, value)| FunctionArg::Named {
108            name: Ident::new(name),
109            arg: FunctionArgExpr::Expr(value.into()),
110            operator: FunctionArgOperator::Equals,
111        })
112        .collect()
113}
114
115pub struct ClickHouseTable {
116    table_reference: TableReference,
117    args: Option<Vec<(String, Arg)>>,
118    pool: Arc<ClickHouseConnectionPool>,
119    schema: SchemaRef,
120    constraints: Constraints,
121    dialect: Arc<dyn unparser::dialect::Dialect>,
122}
123
124impl std::fmt::Debug for ClickHouseTable {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("ClickHouseTable")
127            .field("table_name", &self.table_reference)
128            .field("schema", &self.schema)
129            .field("constraints", &self.constraints)
130            .finish()
131    }
132}
133
134impl ClickHouseTable {
135    pub fn new(
136        table_reference: TableReference,
137        args: Option<Vec<(String, Arg)>>,
138        pool: Arc<ClickHouseConnectionPool>,
139        schema: SchemaRef,
140        constraints: Constraints,
141    ) -> Self {
142        Self {
143            table_reference,
144            args,
145            pool,
146            schema,
147            constraints,
148            dialect: Arc::new(unparser::dialect::DefaultDialect {}),
149        }
150    }
151}