clickhouse_client/intf/http/
mod.rs1#[cfg(test)]
6mod tests;
7
8use async_trait::async_trait;
9use hyper::{Body, Request, Uri};
10use tracing::{error, trace};
11
12use crate::{
13 error::Error,
14 query::{Format, Query, QueryResponse},
15};
16
17use super::Interface;
18
19type HyperHttpsClient = hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>;
20
21#[derive(Debug)]
23pub struct Http {
24 http_client: HyperHttpsClient,
26 uri: Uri,
28}
29
30impl Http {
31 pub fn new(url: &str) -> Self {
33 let url: Uri = url.parse().unwrap();
34 let https_conn = hyper_rustls::HttpsConnectorBuilder::new()
35 .with_native_roots()
36 .https_or_http()
37 .enable_http1()
38 .build();
39 let client = hyper::Client::<_, hyper::Body>::builder().build(https_conn);
40 Self {
41 http_client: client,
42 uri: url,
43 }
44 }
45}
46
47const HTTP_DEFAULT_FORMAT: Format = Format::TabSep;
49
50#[async_trait]
51impl Interface for Http {
52 #[tracing::instrument(skip(self))]
53 async fn ping(&self) -> bool {
54 let req = Request::builder()
55 .uri(&self.uri)
56 .method("GET")
57 .body(Body::empty())
58 .unwrap();
59 match self.http_client.request(req).await {
60 Ok(res) => res.status().is_success(),
61 Err(_) => false,
62 }
63 }
64
65 #[tracing::instrument(skip(self))]
66 async fn send(&self, query: Query) -> Result<QueryResponse, Error> {
67 let mut req_builder = hyper::Request::builder();
68
69 if let Some(db) = &query.db {
70 const HEADER_DEFAULT_DB: &str = "X-ClickHouse-Database";
71 req_builder = req_builder.header(HEADER_DEFAULT_DB, db);
72 }
73
74 if let Some((username, password)) = &query.credentials {
75 const HEADER_USER: &str = "X-ClickHouse-User";
76 const HEADER_PASSWORD: &str = "X-ClickHouse-Key";
77 req_builder = req_builder.header(HEADER_USER, username);
78 req_builder = req_builder.header(HEADER_PASSWORD, password);
79 }
80
81 if let Some(format) = &query.format {
82 const HEADER_FORMAT: &str = "X-ClickHouse-Format";
83 req_builder = req_builder.header(HEADER_FORMAT, format.to_string());
84 }
85
86 if let Some(compression) = &query.compress_request {
87 const HEADER_CONTENT_ENC: &str = "Content-Encoding";
88 req_builder = req_builder.header(HEADER_CONTENT_ENC, compression.to_string());
89 }
90
91 if let Some(compression) = &query.compress_response {
92 const HEADER_ACCEPT_ENC: &str = "Accept-Encoding";
93 req_builder = req_builder.header(HEADER_ACCEPT_ENC, compression.to_string());
94 }
95
96 let uri = {
97 let uri = &self.uri;
98 let scheme = uri.scheme().ok_or(Error::new("misisng scheme"))?.clone();
99 let auth = uri
100 .authority()
101 .ok_or(Error::new("missing authority"))?
102 .clone();
103 let pq = format!("/?query={}", urlencoding::encode(&query.statement));
104 Uri::builder()
105 .scheme(scheme)
106 .authority(auth)
107 .path_and_query(pq)
108 .build()?
109 };
110 let body = if let Some(data) = query.data {
111 let format = query.format.unwrap_or(HTTP_DEFAULT_FORMAT);
112 let bytes: Vec<u8> = data.to_bytes(format)?;
113 req_builder = req_builder.header("Content-Length", bytes.len());
114 Body::from(bytes)
115 } else {
116 req_builder = req_builder.header("Content-Length", 0);
117 Body::empty()
118 };
119 let req = req_builder.method("POST").uri(uri).body(body)?;
120
121 trace!(request = ?req, "sending HTTP request");
122 let res = self.http_client.request(req).await?;
123 let res_status = res.status();
124 let res_body = hyper::body::to_bytes(res.into_body()).await?;
125
126 if res_status.is_success() {
127 let res = QueryResponse::new(
128 query.format.unwrap_or(HTTP_DEFAULT_FORMAT),
129 res_body.to_vec(),
130 );
131 Ok(res)
132 } else {
133 let res_body_str = String::from_utf8(res_body.to_vec())?;
134 error!(error = res_body_str, "query failed");
135 Err(Error::new(res_body_str.as_str()))
136 }
137 }
138}