clickhouse-http-client 0.3.1

ClickHouse HTTP Client
Documentation
use core::ops::{Deref, DerefMut};

use clickhouse_format::{format_name::FormatName, input::Input, output::Output};
use isahc::{
    http::{response::Parts as ResponseParts, Method, Request, Response, StatusCode},
    AsyncBody, AsyncReadResponseExt as _, HttpClient, HttpClientBuilder,
};

use crate::{
    client_config::{
        ClientConfig, FORMAT_KEY_HEADER, FORMAT_KEY_URL_PARAMETER, QUERY_KEY_URL_PARAMETER,
    },
    error::{ClientExecuteError, ClientInsertWithFormatError, ClientSelectWithFormatError, Error},
};

pub type Settings<'a> = Vec<(&'a str, &'a str)>;

#[derive(Debug)]
pub struct ClientBuilder {
    http_client_builder: HttpClientBuilder,
    client_config: ClientConfig,
}
impl Default for ClientBuilder {
    fn default() -> Self {
        Self::new()
    }
}
impl Deref for ClientBuilder {
    type Target = ClientConfig;

    fn deref(&self) -> &Self::Target {
        &self.client_config
    }
}
impl DerefMut for ClientBuilder {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.client_config
    }
}
impl ClientBuilder {
    pub fn new() -> Self {
        Self {
            http_client_builder: HttpClientBuilder::new(),
            client_config: Default::default(),
        }
    }
    pub fn configurable<F>(mut self, func: F) -> Self
    where
        F: FnOnce(HttpClientBuilder) -> HttpClientBuilder,
    {
        self.http_client_builder = func(self.http_client_builder);
        self
    }
    pub fn build(self) -> Result<Client, Error> {
        Ok(Client {
            http_client: self.http_client_builder.build()?,
            client_config: self.client_config,
        })
    }
}

#[derive(Debug, Clone)]
pub struct Client {
    http_client: HttpClient,
    client_config: ClientConfig,
}
impl Deref for Client {
    type Target = ClientConfig;

    fn deref(&self) -> &Self::Target {
        &self.client_config
    }
}
impl DerefMut for Client {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.client_config
    }
}
impl Client {
    pub fn new() -> Result<Self, Error> {
        ClientBuilder::default().build()
    }

    pub async fn ping(&self) -> Result<bool, Error> {
        let url = self.get_url();
        let mut req = self.get_request();

        let url = url.join("ping")?;

        *req.method_mut() = Method::GET;
        *req.uri_mut() = url.as_str().parse()?;

        let mut resp = self.http_client.send_async(req).await?;

        if resp.status() != StatusCode::OK {
            return Ok(false);
        }

        let resp_body_text = resp.text().await?;
        Ok(resp_body_text == self.get_http_server_default_response())
    }

    //
    //
    //
    pub async fn execute(
        &self,
        sql: impl AsRef<str>,
        settings: impl Into<Option<Settings<'_>>>,
    ) -> Result<(), Error> {
        let resp = self.respond_execute(sql, settings, |req| req).await?;

        if !resp.status().is_success() {
            return Err(ClientExecuteError::StatusCodeMismatch(resp.status()).into());
        }

        Ok(())
    }

    pub async fn respond_execute<PreRF>(
        &self,
        sql: impl AsRef<str>,
        settings: impl Into<Option<Settings<'_>>>,
        mut pre_respond_fn: PreRF,
    ) -> Result<Response<AsyncBody>, Error>
    where
        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
    {
        let mut url = self.get_url().to_owned();
        let mut req = self.get_request();

        if let Some(settings) = settings.into() {
            settings.iter().for_each(|(k, v)| {
                url.query_pairs_mut().append_pair(k, v);
            });
        }

        *req.method_mut() = Method::POST;
        *req.uri_mut() = url.as_str().parse()?;

        let (parts, _) = req.into_parts();
        let req = Request::from_parts(parts, sql.as_ref().as_bytes().to_owned());

        let req = pre_respond_fn(req);

        let resp = self.http_client.send_async(req).await?;

        Ok(resp)
    }

    //
    //
    //
    pub async fn insert_with_format<I: Input>(
        &self,
        sql_prefix: impl AsRef<str>,
        input: I,
        settings: impl Into<Option<Settings<'_>>>,
    ) -> Result<(), Error> {
        let resp = self
            .respond_insert_with_format(sql_prefix, input, settings, |req| req)
            .await?;

        if !resp.status().is_success() {
            return Err(ClientInsertWithFormatError::StatusCodeMismatch(resp.status()).into());
        }

        Ok(())
    }

    pub async fn respond_insert_with_format<I: Input, PreRF>(
        &self,
        sql_prefix: impl AsRef<str>,
        input: I,
        settings: impl Into<Option<Settings<'_>>>,
        pre_respond_fn: PreRF,
    ) -> Result<Response<AsyncBody>, Error>
    where
        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
    {
        let format_name = I::format_name();
        let format_bytes = input
            .serialize()
            .map_err(|err| ClientInsertWithFormatError::FormatSerError(err.to_string()))?;

        self.respond_insert_with_format_bytes(
            sql_prefix,
            format_name,
            format_bytes,
            settings,
            pre_respond_fn,
        )
        .await
    }

    pub async fn insert_with_format_bytes(
        &self,
        sql_prefix: impl AsRef<str>,
        format_name: FormatName,
        format_bytes: Vec<u8>,
        settings: impl Into<Option<Settings<'_>>>,
    ) -> Result<(), Error> {
        let resp = self
            .respond_insert_with_format_bytes(
                sql_prefix,
                format_name,
                format_bytes,
                settings,
                |req| req,
            )
            .await?;

        if !resp.status().is_success() {
            return Err(ClientInsertWithFormatError::StatusCodeMismatch(resp.status()).into());
        }

        Ok(())
    }

    pub async fn respond_insert_with_format_bytes<PreRF>(
        &self,
        sql_prefix: impl AsRef<str>,
        format_name: FormatName,
        format_bytes: Vec<u8>,
        settings: impl Into<Option<Settings<'_>>>,
        mut pre_respond_fn: PreRF,
    ) -> Result<Response<AsyncBody>, Error>
    where
        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
    {
        let mut url = self.get_url().to_owned();
        let mut req = self.get_request();

        let mut sql = sql_prefix.as_ref().to_owned();
        let sql_suffix = format!(" FORMAT {format_name}");
        if !sql.ends_with(sql_suffix.as_str()) {
            sql.push_str(sql_suffix.as_str());
        }

        url.query_pairs_mut()
            .append_pair(QUERY_KEY_URL_PARAMETER, sql.as_str());

        if let Some(settings) = settings.into() {
            settings.iter().for_each(|(k, v)| {
                url.query_pairs_mut().append_pair(k, v);
            });
        }

        *req.method_mut() = Method::POST;
        *req.uri_mut() = url.as_str().parse()?;

        let (parts, _) = req.into_parts();
        let req = Request::from_parts(parts, format_bytes);

        let req = pre_respond_fn(req);

        let resp = self.http_client.send_async(req).await?;

        Ok(resp)
    }

    //
    //
    //
    pub async fn select_with_format<O: Output>(
        &self,
        sql: impl AsRef<str>,
        output: O,
        settings: impl Into<Option<Settings<'_>>>,
    ) -> Result<(Vec<O::Row>, O::Info), Error> {
        self.internal_select_with_format(sql, output, settings, |req| req)
            .await
            .map(|(_, x)| x)
    }

    pub async fn internal_select_with_format<O: Output, PreRF>(
        &self,
        sql: impl AsRef<str>,
        output: O,
        settings: impl Into<Option<Settings<'_>>>,
        mut pre_respond_fn: PreRF,
    ) -> Result<(ResponseParts, (Vec<O::Row>, O::Info)), Error>
    where
        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
    {
        let mut url = self.get_url().to_owned();
        let mut req = self.get_request();

        url.query_pairs_mut().append_pair(
            FORMAT_KEY_URL_PARAMETER,
            O::format_name().to_string().as_str(),
        );

        if let Some(settings) = settings.into() {
            settings.iter().for_each(|(k, v)| {
                url.query_pairs_mut().append_pair(k, v);
            });
        }

        *req.method_mut() = Method::POST;
        *req.uri_mut() = url.as_str().parse()?;

        let (parts, _) = req.into_parts();
        let req = Request::from_parts(parts, sql.as_ref().as_bytes().to_owned());

        let req = pre_respond_fn(req);

        let resp = self.http_client.send_async(req).await?;

        if !resp.status().is_success() {
            return Err(ClientSelectWithFormatError::StatusCodeMismatch(resp.status()).into());
        }

        let resp_format = resp.headers().get(FORMAT_KEY_HEADER);
        if resp_format.is_some() && resp_format.unwrap() != O::format_name().to_string().as_str() {
            return Err(ClientSelectWithFormatError::FormatMismatch(
                resp_format
                    .unwrap()
                    .to_str()
                    .unwrap_or("Unknown")
                    .to_string(),
            )
            .into());
        }

        let (parts, body) = resp.into_parts();
        let (mut resp_parts, _) = Response::new(()).into_parts();
        resp_parts.status = parts.status;
        resp_parts.version = parts.version;
        resp_parts.headers = parts.headers.to_owned();
        let mut resp = Response::from_parts(parts, body);

        let mut resp_body_buf = Vec::with_capacity(4096);
        resp.copy_to(&mut resp_body_buf).await?;

        let rows_and_info = output
            .deserialize(&resp_body_buf[..])
            .map_err(|err| ClientSelectWithFormatError::FormatDeError(err.to_string()))?;

        Ok((resp_parts, rows_and_info))
    }
}