clickhouse_client/intf/http/
mod.rs

1//! HTTP interface
2//!
3//! The HTTP interface is documented at: [https://clickhouse.com/docs/en/interfaces/http](https://clickhouse.com/docs/en/interfaces/http).
4
5#[cfg(test)]
6mod tests;
7
8use async_trait::async_trait;
9use hyper::{Body, Request, Uri};
10use tracing::{error, trace};
11
12use crate::{
13    error::Error,
14    query::{Format, Query, QueryResponse},
15};
16
17use super::Interface;
18
19type HyperHttpsClient = hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>;
20
21/// HTTP interface
22#[derive(Debug)]
23pub struct Http {
24    /// HTTP client
25    http_client: HyperHttpsClient,
26    /// URI
27    uri: Uri,
28}
29
30impl Http {
31    /// Creates a new HTTP interface
32    pub fn new(url: &str) -> Self {
33        let url: Uri = url.parse().unwrap();
34        let https_conn = hyper_rustls::HttpsConnectorBuilder::new()
35            .with_native_roots()
36            .https_or_http()
37            .enable_http1()
38            .build();
39        let client = hyper::Client::<_, hyper::Body>::builder().build(https_conn);
40        Self {
41            http_client: client,
42            uri: url,
43        }
44    }
45}
46
47/// Default query format for HTTP
48const HTTP_DEFAULT_FORMAT: Format = Format::TabSep;
49
50#[async_trait]
51impl Interface for Http {
52    #[tracing::instrument(skip(self))]
53    async fn ping(&self) -> bool {
54        let req = Request::builder()
55            .uri(&self.uri)
56            .method("GET")
57            .body(Body::empty())
58            .unwrap();
59        match self.http_client.request(req).await {
60            Ok(res) => res.status().is_success(),
61            Err(_) => false,
62        }
63    }
64
65    #[tracing::instrument(skip(self))]
66    async fn send(&self, query: Query) -> Result<QueryResponse, Error> {
67        let mut req_builder = hyper::Request::builder();
68
69        if let Some(db) = &query.db {
70            const HEADER_DEFAULT_DB: &str = "X-ClickHouse-Database";
71            req_builder = req_builder.header(HEADER_DEFAULT_DB, db);
72        }
73
74        if let Some((username, password)) = &query.credentials {
75            const HEADER_USER: &str = "X-ClickHouse-User";
76            const HEADER_PASSWORD: &str = "X-ClickHouse-Key";
77            req_builder = req_builder.header(HEADER_USER, username);
78            req_builder = req_builder.header(HEADER_PASSWORD, password);
79        }
80
81        if let Some(format) = &query.format {
82            const HEADER_FORMAT: &str = "X-ClickHouse-Format";
83            req_builder = req_builder.header(HEADER_FORMAT, format.to_string());
84        }
85
86        if let Some(compression) = &query.compress_request {
87            const HEADER_CONTENT_ENC: &str = "Content-Encoding";
88            req_builder = req_builder.header(HEADER_CONTENT_ENC, compression.to_string());
89        }
90
91        if let Some(compression) = &query.compress_response {
92            const HEADER_ACCEPT_ENC: &str = "Accept-Encoding";
93            req_builder = req_builder.header(HEADER_ACCEPT_ENC, compression.to_string());
94        }
95
96        let uri = {
97            let uri = &self.uri;
98            let scheme = uri.scheme().ok_or(Error::new("misisng scheme"))?.clone();
99            let auth = uri
100                .authority()
101                .ok_or(Error::new("missing authority"))?
102                .clone();
103            let pq = format!("/?query={}", urlencoding::encode(&query.statement));
104            Uri::builder()
105                .scheme(scheme)
106                .authority(auth)
107                .path_and_query(pq)
108                .build()?
109        };
110        let body = if let Some(data) = query.data {
111            let format = query.format.unwrap_or(HTTP_DEFAULT_FORMAT);
112            let bytes: Vec<u8> = data.to_bytes(format)?;
113            req_builder = req_builder.header("Content-Length", bytes.len());
114            Body::from(bytes)
115        } else {
116            req_builder = req_builder.header("Content-Length", 0);
117            Body::empty()
118        };
119        let req = req_builder.method("POST").uri(uri).body(body)?;
120
121        trace!(request = ?req, "sending HTTP request");
122        let res = self.http_client.request(req).await?;
123        let res_status = res.status();
124        let res_body = hyper::body::to_bytes(res.into_body()).await?;
125
126        if res_status.is_success() {
127            let res = QueryResponse::new(
128                query.format.unwrap_or(HTTP_DEFAULT_FORMAT),
129                res_body.to_vec(),
130            );
131            Ok(res)
132        } else {
133            let res_body_str = String::from_utf8(res_body.to_vec())?;
134            error!(error = res_body_str, "query failed");
135            Err(Error::new(res_body_str.as_str()))
136        }
137    }
138}