clickhouse_orm/
client.rs

1use std::fmt::Debug;
2use std::marker::PhantomData;
3use std::str::FromStr;
4use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
5use reqwest::StatusCode;
6use serde::de::DeserializeOwned;
7use serde::Deserialize;
8use crate::error::Error;
9
10pub struct ClickhouseClient {
11    url: String,
12    client: reqwest::Client
13}
14
15impl ClickhouseClient {
16    pub fn from_env() -> Result<Self, Error> {
17        let username = std::env::var("CLICKHOUSE_USERNAME")?;
18        let password = std::env::var("CLICKHOUSE_PASSWORD")?;
19        let database = std::env::var("CLICKHOUSE_DATABASE")?;
20        let url = std::env::var("CLICKHOUSE_URL")?;
21        let url = format!("{url}/?database={database}&enable_http_compression=1");
22        let client = reqwest::Client::builder()
23            .default_headers(HeaderMap::from_iter([
24                (HeaderName::from_str("X-ClickHouse-User")?, HeaderValue::from_str(&username)?),
25                (HeaderName::from_str("X-ClickHouse-Key")?, HeaderValue::from_str(&password)?)
26        ]))
27            .gzip(true)
28            .build()?;
29        Ok(Self {
30            client,
31            url
32        })
33    }
34    pub async fn fetch_one<R: DeserializeOwned + Debug>(&self, query: &str) -> Result<Option<R>, Error> {
35        let response = self.fetch(query).await?;
36        Ok(response.into_iter().next())
37    }
38    pub async fn fetch_many<R: DeserializeOwned + Debug>(&self, query: &str) -> Result<Vec<R>, Error> {
39        let response = self.fetch(query).await?;
40        Ok(response)
41    }
42    async fn fetch<R: DeserializeOwned + Debug>(&self, query: &str) -> Result<Vec<R>, Error> {
43        let Self {client, url, .. } = self;
44        let query = format!("{query} format JSON");
45        let response = client.post(url).body(query.clone()).send().await?;
46        let status = response.status();
47        if !status.is_success() {
48            let message = response.text().await?;
49            return Err(Error::Database(DatabaseError {status, message, failed_query: query}))
50        }
51        let response = response.bytes().await?;
52        let Response {data}  = match serde_json::from_slice(&response) {
53            Ok(response) => response,
54            Err(error) => {
55                let body = String::from_utf8_lossy(&response).to_string();
56                let error: DeserializeError<R> = DeserializeError {
57                    failed_kind: PhantomData,
58                    failed_query: query,
59                    error,
60                    body
61                };
62                let error = format!("{:?}", error);
63                return Err(Error::DeserializeError(error))
64            }
65        };
66        Ok(data)
67    }
68}
69
70#[derive(Debug)]
71pub struct DeserializeError<T> {
72    pub failed_kind: PhantomData<T>,
73    pub failed_query: String,
74    pub error: serde_json::Error,
75    pub body: String
76}
77
78#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
79pub struct Response<T> {
80    pub data: Vec<T>,
81}
82
83#[derive(thiserror::Error, Debug)]
84#[error("Database Error")]
85pub struct DatabaseError {
86    message: String,
87    failed_query: String,
88    status: StatusCode,
89}