1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::str::FromStr;
4use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
5use reqwest::StatusCode;
6use serde::de::DeserializeOwned;
7use serde::Deserialize;
8use crate::error::Error;
9
10pub struct ClickhouseClient {
11 url: String,
12 client: reqwest::Client
13}
14
15impl ClickhouseClient {
16 pub fn from_env() -> Result<Self, Error> {
17 let username = std::env::var("CLICKHOUSE_USERNAME")?;
18 let password = std::env::var("CLICKHOUSE_PASSWORD")?;
19 let database = std::env::var("CLICKHOUSE_DATABASE")?;
20 let url = std::env::var("CLICKHOUSE_URL")?;
21 let url = format!("{url}/?database={database}&enable_http_compression=1");
22 let client = reqwest::Client::builder()
23 .default_headers(HeaderMap::from_iter([
24 (HeaderName::from_str("X-ClickHouse-User")?, HeaderValue::from_str(&username)?),
25 (HeaderName::from_str("X-ClickHouse-Key")?, HeaderValue::from_str(&password)?)
26 ]))
27 .gzip(true)
28 .build()?;
29 Ok(Self {
30 client,
31 url
32 })
33 }
34 pub async fn fetch_one<R: DeserializeOwned + Debug>(&self, query: &str) -> Result<Option<R>, Error> {
35 let response = self.fetch(query).await?;
36 Ok(response.into_iter().next())
37 }
38 pub async fn fetch_many<R: DeserializeOwned + Debug>(&self, query: &str) -> Result<Vec<R>, Error> {
39 let response = self.fetch(query).await?;
40 Ok(response)
41 }
42 async fn fetch<R: DeserializeOwned + Debug>(&self, query: &str) -> Result<Vec<R>, Error> {
43 let Self {client, url, .. } = self;
44 let query = format!("{query} format JSON");
45 let response = client.post(url).body(query.clone()).send().await?;
46 let status = response.status();
47 if !status.is_success() {
48 let message = response.text().await?;
49 return Err(Error::Database(DatabaseError {status, message, failed_query: query}))
50 }
51 let response = response.bytes().await?;
52 let Response {data} = match serde_json::from_slice(&response) {
53 Ok(response) => response,
54 Err(error) => {
55 let body = String::from_utf8_lossy(&response).to_string();
56 let error: DeserializeError<R> = DeserializeError {
57 failed_kind: PhantomData,
58 failed_query: query,
59 error,
60 body
61 };
62 let error = format!("{:?}", error);
63 return Err(Error::DeserializeError(error))
64 }
65 };
66 Ok(data)
67 }
68}
69
70#[derive(Debug)]
71pub struct DeserializeError<T> {
72 pub failed_kind: PhantomData<T>,
73 pub failed_query: String,
74 pub error: serde_json::Error,
75 pub body: String
76}
77
78#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
79pub struct Response<T> {
80 pub data: Vec<T>,
81}
82
83#[derive(thiserror::Error, Debug)]
84#[error("Database Error")]
85pub struct DatabaseError {
86 message: String,
87 failed_query: String,
88 status: StatusCode,
89}