typed_clickhouse/
query.rs1use std::marker::PhantomData;
2
3use bytes::Bytes;
4use hyper::{header::CONTENT_LENGTH, Body, Method, Request};
5use serde::Deserialize;
6use tokio::stream::StreamExt;
7use url::Url;
8
9use crate::{
10 buflist::BufList,
11 error::{Error, Result},
12 introspection::Reflection,
13 response::Response,
14 rowbinary,
15 sql_builder::{Bind, SqlBuilder},
16 Client,
17};
18
19const BUFFER_SIZE: usize = 8 * 1024;
20
21#[derive(Clone)]
22pub struct Query {
23 client: Client,
24 sql: SqlBuilder,
25}
26
27impl Query {
28 pub(crate) fn new(client: &Client, template: &str) -> Self {
29 Self {
30 client: client.clone(),
31 sql: SqlBuilder::new(template),
32 }
33 }
34
35 pub fn bind(mut self, value: impl Bind) -> Self {
36 self.sql.bind_arg(value);
37 self
38 }
39
40 pub async fn execute(self) -> Result<()> {
41 let _ = self.do_execute::<()>(Method::POST)?.resolve().await?;
43 Ok(())
44 }
45
46 pub fn rows<T: Reflection>(mut self) -> Result<RowCursor<T>> {
47 self.sql.append(" FORMAT RowBinary");
48
49 Ok(RowCursor {
50 response: self.do_execute::<T>(Method::GET)?,
51 buffer: vec![0; BUFFER_SIZE],
52 pending: BufList::default(),
53 _marker: PhantomData,
54 })
55 }
56
57 fn do_execute<T: Reflection>(mut self, method: Method) -> Result<Response> {
58 let mut url = Url::parse(&self.client.url).expect("TODO");
59 let mut pairs = url.query_pairs_mut();
60 pairs.clear();
61
62 if let Some(database) = &self.client.database {
63 pairs.append_pair("database", database);
64 }
65
66 self.sql.bind_fields::<T>();
67 let query = self.sql.finish()?;
68 pairs.append_pair("allow_experimental_live_view", "1"); pairs.append_pair("query", &query);
70 for (name, value) in &self.client.options {
71 pairs.append_pair(name, value);
72 }
73 drop(pairs);
74
75 let mut builder = Request::builder()
76 .method(method)
77 .uri(url.as_str())
78 .header(CONTENT_LENGTH, "0");
79
80 if let Some(user) = &self.client.user {
81 builder = builder.header("X-ClickHouse-User", user);
82 }
83
84 if let Some(password) = &self.client.password {
85 builder = builder.header("X-ClickHouse-Key", password);
86 }
87
88 let request = builder
89 .body(Body::empty())
90 .map_err(|err| Error::InvalidParams(Box::new(err)))?;
91
92 Ok(self.client.client.request(request).into())
93 }
94}
95
96pub struct RowCursor<T> {
97 response: Response,
98 buffer: Vec<u8>,
99 pending: BufList<Bytes>,
100 _marker: PhantomData<T>,
101}
102
103impl<T> RowCursor<T> {
104 #[allow(clippy::should_implement_trait)]
105 pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>>
106 where
107 T: Deserialize<'b>,
108 {
109 fn fuck_mut<'a, T>(ptr: &mut T) -> &'a mut T {
111 unsafe { &mut *(ptr as *mut T) }
112 }
113
114 let body = self.response.resolve().await?;
115
116 loop {
117 let buffer = fuck_mut(&mut self.buffer);
118
119 match rowbinary::deserialize_from(&mut self.pending, buffer) {
120 Ok(value) => {
121 self.pending.commit();
122 return Ok(Some(value));
123 }
124 Err(Error::NotEnoughData) => {
125 self.pending.rollback();
126 }
127 Err(err) => return Err(err),
128 }
129
130 match body.try_next().await? {
131 Some(chunk) => self.pending.push(chunk),
132 None if self.pending.bufs_cnt() > 0 => return Err(Error::NotEnoughData),
133 None => return Ok(None),
134 }
135 }
136 }
137}