datafusion_table_providers/sql/db_connection_pool/
clickhousepool.rs

1use std::collections::HashMap;
2
3use clickhouse::{Client, Compression};
4use secrecy::{ExposeSecret, SecretString};
5use snafu::{ResultExt, Snafu};
6
7use super::{dbconnection::DbConnection, DbConnectionPool, JoinPushDown};
8
9#[derive(Debug, Snafu)]
10pub enum Error {
11    #[snafu(display("ClickHouse connection failed. {source}"))]
12    ConnectionError { source: clickhouse::error::Error },
13
14    #[snafu(display("Invalid connection string for ClickHouse. {source}"))]
15    InvalidConnectionString { source: url::ParseError },
16
17    #[snafu(display("Invalid value for parameter {parameter_name}. Ensure the value is valid for parameter {parameter_name}"))]
18    InvalidParameterError { parameter_name: String },
19}
20
21#[derive(Clone)]
22pub struct ClickHouseConnectionPool {
23    pub client: Client,
24    pub join_push_down: JoinPushDown,
25}
26
27impl std::fmt::Debug for ClickHouseConnectionPool {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("ClickHouseConnectionPool")
30            .field("join_push_down", &self.join_push_down)
31            .finish()
32    }
33}
34
35impl ClickHouseConnectionPool {
36    pub async fn new(params: HashMap<String, SecretString>) -> Result<Self, Error> {
37        let mut client = Client::default();
38        let mut url = None;
39        let mut database = None;
40
41        for (key, value) in &params {
42            let value = value.expose_secret();
43            match key.as_str() {
44                "url" => {
45                    client = client.with_url(value);
46                    url = Some(value)
47                }
48                "database" => {
49                    client = client.with_database(value);
50                    database = Some(value)
51                }
52                "user" => {
53                    client = client.with_user(value);
54                }
55                "password" => {
56                    client = client.with_password(value);
57                }
58                "access_token" => {
59                    client = client.with_access_token(value);
60                }
61                "compression" => {
62                    client = match value.to_lowercase().as_str() {
63                        "lz4" => client.with_compression(Compression::Lz4),
64                        "none" => client.with_compression(Compression::None),
65                        other => {
66                            return Err(Error::InvalidParameterError {
67                                parameter_name: format!("compression = {}", other),
68                            });
69                        }
70                    };
71                }
72                key if key.starts_with("option_") => {
73                    let opt = &key["option_".len()..];
74                    client = client.with_option(opt, value);
75                }
76                key if key.starts_with("header_") => {
77                    let header = &key["header_".len()..];
78                    client = client.with_header(header, value);
79                }
80                key if key.starts_with("product_") => {
81                    let name = &key["product_".len()..];
82                    client = client.with_product_info(name, value);
83                }
84                _ => {
85                    // Unknown keys are ignored silently or optionally warn
86                }
87            }
88        }
89
90        client
91            .query("SELECT 1")
92            .fetch_all::<u8>()
93            .await
94            .context(ConnectionSnafu)?;
95
96        let join_push_down = {
97            let mut ctx = format!("url={}", url.unwrap_or("default"));
98            if let Some(db) = database {
99                ctx.push_str(&format!(",db={}", db));
100            }
101            JoinPushDown::AllowedFor(ctx)
102        };
103
104        Ok(Self {
105            client,
106            join_push_down,
107        })
108    }
109
110    pub fn client(&self) -> Client {
111        self.client.clone()
112    }
113}
114
115#[async_trait::async_trait]
116impl DbConnectionPool<Client, ()> for ClickHouseConnectionPool {
117    async fn connect(&self) -> super::Result<Box<dyn DbConnection<Client, ()>>> {
118        Ok(Box::new(self.client()))
119    }
120
121    fn join_push_down(&self) -> JoinPushDown {
122        self.join_push_down.clone()
123    }
124}