datafusion_table_providers/sql/db_connection_pool/
clickhousepool.rs1use std::collections::HashMap;
2
3use clickhouse::{Client, Compression};
4use secrecy::{ExposeSecret, SecretString};
5use snafu::{ResultExt, Snafu};
6
7use super::{dbconnection::DbConnection, DbConnectionPool, JoinPushDown};
8
9#[derive(Debug, Snafu)]
10pub enum Error {
11 #[snafu(display("ClickHouse connection failed. {source}"))]
12 ConnectionError { source: clickhouse::error::Error },
13
14 #[snafu(display("Invalid connection string for ClickHouse. {source}"))]
15 InvalidConnectionString { source: url::ParseError },
16
17 #[snafu(display("Invalid value for parameter {parameter_name}. Ensure the value is valid for parameter {parameter_name}"))]
18 InvalidParameterError { parameter_name: String },
19}
20
21#[derive(Clone)]
22pub struct ClickHouseConnectionPool {
23 pub client: Client,
24 pub join_push_down: JoinPushDown,
25}
26
27impl std::fmt::Debug for ClickHouseConnectionPool {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("ClickHouseConnectionPool")
30 .field("join_push_down", &self.join_push_down)
31 .finish()
32 }
33}
34
35impl ClickHouseConnectionPool {
36 pub async fn new(params: HashMap<String, SecretString>) -> Result<Self, Error> {
37 let mut client = Client::default();
38 let mut url = None;
39 let mut database = None;
40
41 for (key, value) in ¶ms {
42 let value = value.expose_secret();
43 match key.as_str() {
44 "url" => {
45 client = client.with_url(value);
46 url = Some(value)
47 }
48 "database" => {
49 client = client.with_database(value);
50 database = Some(value)
51 }
52 "user" => {
53 client = client.with_user(value);
54 }
55 "password" => {
56 client = client.with_password(value);
57 }
58 "access_token" => {
59 client = client.with_access_token(value);
60 }
61 "compression" => {
62 client = match value.to_lowercase().as_str() {
63 "lz4" => client.with_compression(Compression::Lz4),
64 "none" => client.with_compression(Compression::None),
65 other => {
66 return Err(Error::InvalidParameterError {
67 parameter_name: format!("compression = {}", other),
68 });
69 }
70 };
71 }
72 key if key.starts_with("option_") => {
73 let opt = &key["option_".len()..];
74 client = client.with_option(opt, value);
75 }
76 key if key.starts_with("header_") => {
77 let header = &key["header_".len()..];
78 client = client.with_header(header, value);
79 }
80 key if key.starts_with("product_") => {
81 let name = &key["product_".len()..];
82 client = client.with_product_info(name, value);
83 }
84 _ => {
85 }
87 }
88 }
89
90 client
91 .query("SELECT 1")
92 .fetch_all::<u8>()
93 .await
94 .context(ConnectionSnafu)?;
95
96 let join_push_down = {
97 let mut ctx = format!("url={}", url.unwrap_or("default"));
98 if let Some(db) = database {
99 ctx.push_str(&format!(",db={}", db));
100 }
101 JoinPushDown::AllowedFor(ctx)
102 };
103
104 Ok(Self {
105 client,
106 join_push_down,
107 })
108 }
109
110 pub fn client(&self) -> Client {
111 self.client.clone()
112 }
113}
114
115#[async_trait::async_trait]
116impl DbConnectionPool<Client, ()> for ClickHouseConnectionPool {
117 async fn connect(&self) -> super::Result<Box<dyn DbConnection<Client, ()>>> {
118 Ok(Box::new(self.client()))
119 }
120
121 fn join_push_down(&self) -> JoinPushDown {
122 self.join_push_down.clone()
123 }
124}