1use core::ops::{Deref, DerefMut};
2
3use clickhouse_format::{format_name::FormatName, input::Input, output::Output};
4use isahc::{
5 http::{response::Parts as ResponseParts, Method, Request, Response, StatusCode},
6 AsyncBody, AsyncReadResponseExt as _, HttpClient, HttpClientBuilder,
7};
8
9use crate::{
10 client_config::{
11 ClientConfig, FORMAT_KEY_HEADER, FORMAT_KEY_URL_PARAMETER, QUERY_KEY_URL_PARAMETER,
12 },
13 error::{ClientExecuteError, ClientInsertWithFormatError, ClientSelectWithFormatError, Error},
14};
15
16pub type Settings<'a> = Vec<(&'a str, &'a str)>;
17
18#[derive(Debug)]
19pub struct ClientBuilder {
20 http_client_builder: HttpClientBuilder,
21 client_config: ClientConfig,
22}
23impl Default for ClientBuilder {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28impl Deref for ClientBuilder {
29 type Target = ClientConfig;
30
31 fn deref(&self) -> &Self::Target {
32 &self.client_config
33 }
34}
35impl DerefMut for ClientBuilder {
36 fn deref_mut(&mut self) -> &mut Self::Target {
37 &mut self.client_config
38 }
39}
40impl ClientBuilder {
41 pub fn new() -> Self {
42 Self {
43 http_client_builder: HttpClientBuilder::new(),
44 client_config: Default::default(),
45 }
46 }
47 pub fn configurable<F>(mut self, func: F) -> Self
48 where
49 F: FnOnce(HttpClientBuilder) -> HttpClientBuilder,
50 {
51 self.http_client_builder = func(self.http_client_builder);
52 self
53 }
54 pub fn build(self) -> Result<Client, Error> {
55 Ok(Client {
56 http_client: self.http_client_builder.build()?,
57 client_config: self.client_config,
58 })
59 }
60}
61
62#[derive(Debug, Clone)]
63pub struct Client {
64 http_client: HttpClient,
65 client_config: ClientConfig,
66}
67impl Deref for Client {
68 type Target = ClientConfig;
69
70 fn deref(&self) -> &Self::Target {
71 &self.client_config
72 }
73}
74impl DerefMut for Client {
75 fn deref_mut(&mut self) -> &mut Self::Target {
76 &mut self.client_config
77 }
78}
79impl Client {
80 pub fn new() -> Result<Self, Error> {
81 ClientBuilder::default().build()
82 }
83
84 pub async fn ping(&self) -> Result<bool, Error> {
85 let url = self.get_url();
86 let mut req = self.get_request();
87
88 let url = url.join("ping")?;
89
90 *req.method_mut() = Method::GET;
91 *req.uri_mut() = url.as_str().parse()?;
92
93 let mut resp = self.http_client.send_async(req).await?;
94
95 if resp.status() != StatusCode::OK {
96 return Ok(false);
97 }
98
99 let resp_body_text = resp.text().await?;
100 Ok(resp_body_text == self.get_http_server_default_response())
101 }
102
103 pub async fn execute(
107 &self,
108 sql: impl AsRef<str>,
109 settings: impl Into<Option<Settings<'_>>>,
110 ) -> Result<(), Error> {
111 let resp = self.respond_execute(sql, settings, |req| req).await?;
112
113 if !resp.status().is_success() {
114 return Err(ClientExecuteError::StatusCodeMismatch(resp.status()).into());
115 }
116
117 Ok(())
118 }
119
120 pub async fn respond_execute<PreRF>(
121 &self,
122 sql: impl AsRef<str>,
123 settings: impl Into<Option<Settings<'_>>>,
124 mut pre_respond_fn: PreRF,
125 ) -> Result<Response<AsyncBody>, Error>
126 where
127 PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
128 {
129 let mut url = self.get_url().to_owned();
130 let mut req = self.get_request();
131
132 if let Some(settings) = settings.into() {
133 settings.iter().for_each(|(k, v)| {
134 url.query_pairs_mut().append_pair(k, v);
135 });
136 }
137
138 *req.method_mut() = Method::POST;
139 *req.uri_mut() = url.as_str().parse()?;
140
141 let (parts, _) = req.into_parts();
142 let req = Request::from_parts(parts, sql.as_ref().as_bytes().to_owned());
143
144 let req = pre_respond_fn(req);
145
146 let resp = self.http_client.send_async(req).await?;
147
148 Ok(resp)
149 }
150
151 pub async fn insert_with_format<I: Input>(
155 &self,
156 sql_prefix: impl AsRef<str>,
157 input: I,
158 settings: impl Into<Option<Settings<'_>>>,
159 ) -> Result<(), Error> {
160 let resp = self
161 .respond_insert_with_format(sql_prefix, input, settings, |req| req)
162 .await?;
163
164 if !resp.status().is_success() {
165 return Err(ClientInsertWithFormatError::StatusCodeMismatch(resp.status()).into());
166 }
167
168 Ok(())
169 }
170
171 pub async fn respond_insert_with_format<I: Input, PreRF>(
172 &self,
173 sql_prefix: impl AsRef<str>,
174 input: I,
175 settings: impl Into<Option<Settings<'_>>>,
176 pre_respond_fn: PreRF,
177 ) -> Result<Response<AsyncBody>, Error>
178 where
179 PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
180 {
181 let format_name = I::format_name();
182 let format_bytes = input
183 .serialize()
184 .map_err(|err| ClientInsertWithFormatError::FormatSerError(err.to_string()))?;
185
186 self.respond_insert_with_format_bytes(
187 sql_prefix,
188 format_name,
189 format_bytes,
190 settings,
191 pre_respond_fn,
192 )
193 .await
194 }
195
196 pub async fn insert_with_format_bytes(
197 &self,
198 sql_prefix: impl AsRef<str>,
199 format_name: FormatName,
200 format_bytes: Vec<u8>,
201 settings: impl Into<Option<Settings<'_>>>,
202 ) -> Result<(), Error> {
203 let resp = self
204 .respond_insert_with_format_bytes(
205 sql_prefix,
206 format_name,
207 format_bytes,
208 settings,
209 |req| req,
210 )
211 .await?;
212
213 if !resp.status().is_success() {
214 return Err(ClientInsertWithFormatError::StatusCodeMismatch(resp.status()).into());
215 }
216
217 Ok(())
218 }
219
220 pub async fn respond_insert_with_format_bytes<PreRF>(
221 &self,
222 sql_prefix: impl AsRef<str>,
223 format_name: FormatName,
224 format_bytes: Vec<u8>,
225 settings: impl Into<Option<Settings<'_>>>,
226 mut pre_respond_fn: PreRF,
227 ) -> Result<Response<AsyncBody>, Error>
228 where
229 PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
230 {
231 let mut url = self.get_url().to_owned();
232 let mut req = self.get_request();
233
234 let mut sql = sql_prefix.as_ref().to_owned();
235 let sql_suffix = format!(" FORMAT {format_name}");
236 if !sql.ends_with(sql_suffix.as_str()) {
237 sql.push_str(sql_suffix.as_str());
238 }
239
240 url.query_pairs_mut()
241 .append_pair(QUERY_KEY_URL_PARAMETER, sql.as_str());
242
243 if let Some(settings) = settings.into() {
244 settings.iter().for_each(|(k, v)| {
245 url.query_pairs_mut().append_pair(k, v);
246 });
247 }
248
249 *req.method_mut() = Method::POST;
250 *req.uri_mut() = url.as_str().parse()?;
251
252 let (parts, _) = req.into_parts();
253 let req = Request::from_parts(parts, format_bytes);
254
255 let req = pre_respond_fn(req);
256
257 let resp = self.http_client.send_async(req).await?;
258
259 Ok(resp)
260 }
261
262 pub async fn select_with_format<O: Output>(
266 &self,
267 sql: impl AsRef<str>,
268 output: O,
269 settings: impl Into<Option<Settings<'_>>>,
270 ) -> Result<(Vec<O::Row>, O::Info), Error> {
271 self.internal_select_with_format(sql, output, settings, |req| req)
272 .await
273 .map(|(_, x)| x)
274 }
275
276 pub async fn internal_select_with_format<O: Output, PreRF>(
277 &self,
278 sql: impl AsRef<str>,
279 output: O,
280 settings: impl Into<Option<Settings<'_>>>,
281 mut pre_respond_fn: PreRF,
282 ) -> Result<(ResponseParts, (Vec<O::Row>, O::Info)), Error>
283 where
284 PreRF: FnMut(Request<Vec<u8>>) -> Request<Vec<u8>> + Send,
285 {
286 let mut url = self.get_url().to_owned();
287 let mut req = self.get_request();
288
289 url.query_pairs_mut().append_pair(
290 FORMAT_KEY_URL_PARAMETER,
291 O::format_name().to_string().as_str(),
292 );
293
294 if let Some(settings) = settings.into() {
295 settings.iter().for_each(|(k, v)| {
296 url.query_pairs_mut().append_pair(k, v);
297 });
298 }
299
300 *req.method_mut() = Method::POST;
301 *req.uri_mut() = url.as_str().parse()?;
302
303 let (parts, _) = req.into_parts();
304 let req = Request::from_parts(parts, sql.as_ref().as_bytes().to_owned());
305
306 let req = pre_respond_fn(req);
307
308 let resp = self.http_client.send_async(req).await?;
309
310 if !resp.status().is_success() {
311 return Err(ClientSelectWithFormatError::StatusCodeMismatch(resp.status()).into());
312 }
313
314 let resp_format = resp.headers().get(FORMAT_KEY_HEADER);
315 if resp_format.is_some() && resp_format.unwrap() != O::format_name().to_string().as_str() {
316 return Err(ClientSelectWithFormatError::FormatMismatch(
317 resp_format
318 .unwrap()
319 .to_str()
320 .unwrap_or("Unknown")
321 .to_string(),
322 )
323 .into());
324 }
325
326 let (parts, body) = resp.into_parts();
327 let (mut resp_parts, _) = Response::new(()).into_parts();
328 resp_parts.status = parts.status;
329 resp_parts.version = parts.version;
330 resp_parts.headers = parts.headers.to_owned();
331 let mut resp = Response::from_parts(parts, body);
332
333 let mut resp_body_buf = Vec::with_capacity(4096);
334 resp.copy_to(&mut resp_body_buf).await?;
335
336 let rows_and_info = output
337 .deserialize(&resp_body_buf[..])
338 .map_err(|err| ClientSelectWithFormatError::FormatDeError(err.to_string()))?;
339
340 Ok((resp_parts, rows_and_info))
341 }
342}