clickhouse_http_client/
client.rs

1use core::ops::{Deref, DerefMut};
2
3use clickhouse_format::{format_name::FormatName, input::Input, output::Output};
4use isahc::{
5    http::{response::Parts as ResponseParts, Method, Request, Response, StatusCode},
6    AsyncBody, AsyncReadResponseExt as _, HttpClient, HttpClientBuilder,
7};
8
9use crate::{
10    client_config::{
11        ClientConfig, FORMAT_KEY_HEADER, FORMAT_KEY_URL_PARAMETER, QUERY_KEY_URL_PARAMETER,
12    },
13    error::{ClientExecuteError, ClientInsertWithFormatError, ClientSelectWithFormatError, Error},
14};
15
16pub type Settings<'a> = Vec<(&'a str, &'a str)>;
17
18#[derive(Debug)]
19pub struct ClientBuilder {
20    http_client_builder: HttpClientBuilder,
21    client_config: ClientConfig,
22}
23impl Default for ClientBuilder {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28impl Deref for ClientBuilder {
29    type Target = ClientConfig;
30
31    fn deref(&self) -> &Self::Target {
32        &self.client_config
33    }
34}
35impl DerefMut for ClientBuilder {
36    fn deref_mut(&mut self) -> &mut Self::Target {
37        &mut self.client_config
38    }
39}
40impl ClientBuilder {
41    pub fn new() -> Self {
42        Self {
43            http_client_builder: HttpClientBuilder::new(),
44            client_config: Default::default(),
45        }
46    }
47    pub fn configurable<F>(mut self, func: F) -> Self
48    where
49        F: FnOnce(HttpClientBuilder) -> HttpClientBuilder,
50    {
51        self.http_client_builder = func(self.http_client_builder);
52        self
53    }
54    pub fn build(self) -> Result<Client, Error> {
55        Ok(Client {
56            http_client: self.http_client_builder.build()?,
57            client_config: self.client_config,
58        })
59    }
60}
61
62#[derive(Debug, Clone)]
63pub struct Client {
64    http_client: HttpClient,
65    client_config: ClientConfig,
66}
67impl Deref for Client {
68    type Target = ClientConfig;
69
70    fn deref(&self) -> &Self::Target {
71        &self.client_config
72    }
73}
74impl DerefMut for Client {
75    fn deref_mut(&mut self) -> &mut Self::Target {
76        &mut self.client_config
77    }
78}
79impl Client {
80    pub fn new() -> Result<Self, Error> {
81        ClientBuilder::default().build()
82    }
83
84    pub async fn ping(&self) -> Result<bool, Error> {
85        let url = self.get_url();
86        let mut req = self.get_request();
87
88        let url = url.join("ping")?;
89
90        *req.method_mut() = Method::GET;
91        *req.uri_mut() = url.as_str().parse()?;
92
93        let mut resp = self.http_client.send_async(req).await?;
94
95        if resp.status() != StatusCode::OK {
96            return Ok(false);
97        }
98
99        let resp_body_text = resp.text().await?;
100        Ok(resp_body_text == self.get_http_server_default_response())
101    }
102
103    //
104    //
105    //
106    pub async fn execute(
107        &self,
108        sql: impl AsRef<str>,
109        settings: impl Into<Option<Settings<'_>>>,
110    ) -> Result<(), Error> {
111        let resp = self.respond_execute(sql, settings, |req| req).await?;
112
113        if !resp.status().is_success() {
114            return Err(ClientExecuteError::StatusCodeMismatch(resp.status()).into());
115        }
116
117        Ok(())
118    }
119
120    pub async fn respond_execute<PreRF>(
121        &self,
122        sql: impl AsRef<str>,
123        settings: impl Into<Option<Settings<'_>>>,
124        mut pre_respond_fn: PreRF,
125    ) -> Result<Response<AsyncBody>, Error>
126    where
127        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
128    {
129        let mut url = self.get_url().to_owned();
130        let mut req = self.get_request();
131
132        if let Some(settings) = settings.into() {
133            settings.iter().for_each(|(k, v)| {
134                url.query_pairs_mut().append_pair(k, v);
135            });
136        }
137
138        *req.method_mut() = Method::POST;
139        *req.uri_mut() = url.as_str().parse()?;
140
141        let (parts, _) = req.into_parts();
142        let req = Request::from_parts(parts, sql.as_ref().as_bytes().to_owned());
143
144        let req = pre_respond_fn(req);
145
146        let resp = self.http_client.send_async(req).await?;
147
148        Ok(resp)
149    }
150
151    //
152    //
153    //
154    pub async fn insert_with_format<I: Input>(
155        &self,
156        sql_prefix: impl AsRef<str>,
157        input: I,
158        settings: impl Into<Option<Settings<'_>>>,
159    ) -> Result<(), Error> {
160        let resp = self
161            .respond_insert_with_format(sql_prefix, input, settings, |req| req)
162            .await?;
163
164        if !resp.status().is_success() {
165            return Err(ClientInsertWithFormatError::StatusCodeMismatch(resp.status()).into());
166        }
167
168        Ok(())
169    }
170
171    pub async fn respond_insert_with_format<I: Input, PreRF>(
172        &self,
173        sql_prefix: impl AsRef<str>,
174        input: I,
175        settings: impl Into<Option<Settings<'_>>>,
176        pre_respond_fn: PreRF,
177    ) -> Result<Response<AsyncBody>, Error>
178    where
179        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
180    {
181        let format_name = I::format_name();
182        let format_bytes = input
183            .serialize()
184            .map_err(|err| ClientInsertWithFormatError::FormatSerError(err.to_string()))?;
185
186        self.respond_insert_with_format_bytes(
187            sql_prefix,
188            format_name,
189            format_bytes,
190            settings,
191            pre_respond_fn,
192        )
193        .await
194    }
195
196    pub async fn insert_with_format_bytes(
197        &self,
198        sql_prefix: impl AsRef<str>,
199        format_name: FormatName,
200        format_bytes: Vec<u8>,
201        settings: impl Into<Option<Settings<'_>>>,
202    ) -> Result<(), Error> {
203        let resp = self
204            .respond_insert_with_format_bytes(
205                sql_prefix,
206                format_name,
207                format_bytes,
208                settings,
209                |req| req,
210            )
211            .await?;
212
213        if !resp.status().is_success() {
214            return Err(ClientInsertWithFormatError::StatusCodeMismatch(resp.status()).into());
215        }
216
217        Ok(())
218    }
219
220    pub async fn respond_insert_with_format_bytes<PreRF>(
221        &self,
222        sql_prefix: impl AsRef<str>,
223        format_name: FormatName,
224        format_bytes: Vec<u8>,
225        settings: impl Into<Option<Settings<'_>>>,
226        mut pre_respond_fn: PreRF,
227    ) -> Result<Response<AsyncBody>, Error>
228    where
229        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
230    {
231        let mut url = self.get_url().to_owned();
232        let mut req = self.get_request();
233
234        let mut sql = sql_prefix.as_ref().to_owned();
235        let sql_suffix = format!(" FORMAT {format_name}");
236        if !sql.ends_with(sql_suffix.as_str()) {
237            sql.push_str(sql_suffix.as_str());
238        }
239
240        url.query_pairs_mut()
241            .append_pair(QUERY_KEY_URL_PARAMETER, sql.as_str());
242
243        if let Some(settings) = settings.into() {
244            settings.iter().for_each(|(k, v)| {
245                url.query_pairs_mut().append_pair(k, v);
246            });
247        }
248
249        *req.method_mut() = Method::POST;
250        *req.uri_mut() = url.as_str().parse()?;
251
252        let (parts, _) = req.into_parts();
253        let req = Request::from_parts(parts, format_bytes);
254
255        let req = pre_respond_fn(req);
256
257        let resp = self.http_client.send_async(req).await?;
258
259        Ok(resp)
260    }
261
262    //
263    //
264    //
265    pub async fn select_with_format<O: Output>(
266        &self,
267        sql: impl AsRef<str>,
268        output: O,
269        settings: impl Into<Option<Settings<'_>>>,
270    ) -> Result<(Vec<O::Row>, O::Info), Error> {
271        self.internal_select_with_format(sql, output, settings, |req| req)
272            .await
273            .map(|(_, x)| x)
274    }
275
276    pub async fn internal_select_with_format<O: Output, PreRF>(
277        &self,
278        sql: impl AsRef<str>,
279        output: O,
280        settings: impl Into<Option<Settings<'_>>>,
281        mut pre_respond_fn: PreRF,
282    ) -> Result<(ResponseParts, (Vec<O::Row>, O::Info)), Error>
283    where
284        PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
285    {
286        let mut url = self.get_url().to_owned();
287        let mut req = self.get_request();
288
289        url.query_pairs_mut().append_pair(
290            FORMAT_KEY_URL_PARAMETER,
291            O::format_name().to_string().as_str(),
292        );
293
294        if let Some(settings) = settings.into() {
295            settings.iter().for_each(|(k, v)| {
296                url.query_pairs_mut().append_pair(k, v);
297            });
298        }
299
300        *req.method_mut() = Method::POST;
301        *req.uri_mut() = url.as_str().parse()?;
302
303        let (parts, _) = req.into_parts();
304        let req = Request::from_parts(parts, sql.as_ref().as_bytes().to_owned());
305
306        let req = pre_respond_fn(req);
307
308        let resp = self.http_client.send_async(req).await?;
309
310        if !resp.status().is_success() {
311            return Err(ClientSelectWithFormatError::StatusCodeMismatch(resp.status()).into());
312        }
313
314        let resp_format = resp.headers().get(FORMAT_KEY_HEADER);
315        if resp_format.is_some() && resp_format.unwrap() != O::format_name().to_string().as_str() {
316            return Err(ClientSelectWithFormatError::FormatMismatch(
317                resp_format
318                    .unwrap()
319                    .to_str()
320                    .unwrap_or("Unknown")
321                    .to_string(),
322            )
323            .into());
324        }
325
326        let (parts, body) = resp.into_parts();
327        let (mut resp_parts, _) = Response::new(()).into_parts();
328        resp_parts.status = parts.status;
329        resp_parts.version = parts.version;
330        resp_parts.headers = parts.headers.to_owned();
331        let mut resp = Response::from_parts(parts, body);
332
333        let mut resp_body_buf = Vec::with_capacity(4096);
334        resp.copy_to(&mut resp_body_buf).await?;
335
336        let rows_and_info = output
337            .deserialize(&resp_body_buf[..])
338            .map_err(|err| ClientSelectWithFormatError::FormatDeError(err.to_string()))?;
339
340        Ok((resp_parts, rows_and_info))
341    }
342}