typed_clickhouse/
query.rs

1use std::marker::PhantomData;
2
3use bytes::Bytes;
4use hyper::{header::CONTENT_LENGTH, Body, Method, Request};
5use serde::Deserialize;
6use tokio::stream::StreamExt;
7use url::Url;
8
9use crate::{
10    buflist::BufList,
11    error::{Error, Result},
12    introspection::Reflection,
13    response::Response,
14    rowbinary,
15    sql_builder::{Bind, SqlBuilder},
16    Client,
17};
18
19const BUFFER_SIZE: usize = 8 * 1024;
20
21#[derive(Clone)]
22pub struct Query {
23    client: Client,
24    sql: SqlBuilder,
25}
26
27impl Query {
28    pub(crate) fn new(client: &Client, template: &str) -> Self {
29        Self {
30            client: client.clone(),
31            sql: SqlBuilder::new(template),
32        }
33    }
34
35    pub fn bind(mut self, value: impl Bind) -> Self {
36        self.sql.bind_arg(value);
37        self
38    }
39
40    pub async fn execute(self) -> Result<()> {
41        // TODO: should we read the body?
42        let _ = self.do_execute::<()>(Method::POST)?.resolve().await?;
43        Ok(())
44    }
45
46    pub fn rows<T: Reflection>(mut self) -> Result<RowCursor<T>> {
47        self.sql.append(" FORMAT RowBinary");
48
49        Ok(RowCursor {
50            response: self.do_execute::<T>(Method::GET)?,
51            buffer: vec![0; BUFFER_SIZE],
52            pending: BufList::default(),
53            _marker: PhantomData,
54        })
55    }
56
57    fn do_execute<T: Reflection>(mut self, method: Method) -> Result<Response> {
58        let mut url = Url::parse(&self.client.url).expect("TODO");
59        let mut pairs = url.query_pairs_mut();
60        pairs.clear();
61
62        if let Some(database) = &self.client.database {
63            pairs.append_pair("database", database);
64        }
65
66        self.sql.bind_fields::<T>();
67        let query = self.sql.finish()?;
68        pairs.append_pair("allow_experimental_live_view", "1"); // TODO: send only if it's required.
69        pairs.append_pair("query", &query);
70        for (name, value) in &self.client.options {
71            pairs.append_pair(name, value);
72        }
73        drop(pairs);
74
75        let mut builder = Request::builder()
76            .method(method)
77            .uri(url.as_str())
78            .header(CONTENT_LENGTH, "0");
79
80        if let Some(user) = &self.client.user {
81            builder = builder.header("X-ClickHouse-User", user);
82        }
83
84        if let Some(password) = &self.client.password {
85            builder = builder.header("X-ClickHouse-Key", password);
86        }
87
88        let request = builder
89            .body(Body::empty())
90            .map_err(|err| Error::InvalidParams(Box::new(err)))?;
91
92        Ok(self.client.client.request(request).into())
93    }
94}
95
96pub struct RowCursor<T> {
97    response: Response,
98    buffer: Vec<u8>,
99    pending: BufList<Bytes>,
100    _marker: PhantomData<T>,
101}
102
103impl<T> RowCursor<T> {
104    #[allow(clippy::should_implement_trait)]
105    pub async fn next<'a, 'b: 'a>(&'a mut self) -> Result<Option<T>>
106    where
107        T: Deserialize<'b>,
108    {
109        // XXX: a workaround for https://github.com/rust-lang/rust/issues/51132.
110        fn fuck_mut<'a, T>(ptr: &mut T) -> &'a mut T {
111            unsafe { &mut *(ptr as *mut T) }
112        }
113
114        let body = self.response.resolve().await?;
115
116        loop {
117            let buffer = fuck_mut(&mut self.buffer);
118
119            match rowbinary::deserialize_from(&mut self.pending, buffer) {
120                Ok(value) => {
121                    self.pending.commit();
122                    return Ok(Some(value));
123                }
124                Err(Error::NotEnoughData) => {
125                    self.pending.rollback();
126                }
127                Err(err) => return Err(err),
128            }
129
130            match body.try_next().await? {
131                Some(chunk) => self.pending.push(chunk),
132                None if self.pending.bufs_cnt() > 0 => return Err(Error::NotEnoughData),
133                None => return Ok(None),
134            }
135        }
136    }
137}