Skip to main content

clickhouse/
query.rs

1use hyper::{Method, Request, header::CONTENT_LENGTH};
2use serde::Serialize;
3use std::fmt::Display;
4use url::Url;
5
6use crate::{
7    Client,
8    error::{Error, Result},
9    formats,
10    headers::with_request_headers,
11    request_body::RequestBody,
12    response::Response,
13    row::{Row, RowOwned, RowRead},
14    sql::{Bind, SqlBuilder, ser},
15};
16
17#[cfg(feature = "sea-ql")]
18pub use crate::cursors::DataRowCursor;
19pub use crate::cursors::{BytesCursor, RowCursor};
20use crate::headers::with_authentication;
21use crate::settings;
22
23#[must_use]
24#[derive(Clone)]
25pub struct Query {
26    client: Client,
27    sql: SqlBuilder,
28}
29
30impl Query {
31    pub(crate) fn new(client: &Client, template: &str) -> Self {
32        Self {
33            client: client.clone(),
34            sql: SqlBuilder::new(template),
35        }
36    }
37
38    /// Display SQL query as string.
39    pub fn sql_display(&self) -> &impl Display {
40        &self.sql
41    }
42
43    /// Binds `value` to the next `?` in the query.
44    ///
45    /// The `value`, which must either implement [`Serialize`] or be an
46    /// [`Identifier`], will be appropriately escaped.
47    ///
48    /// All possible errors will be returned as [`Error::InvalidParams`]
49    /// during query execution (`execute()`, `fetch()`, etc.).
50    ///
51    /// WARNING: This means that the query must not have any extra `?`, even if
52    /// they are in a string literal! Use `??` to have plain `?` in query.
53    ///
54    /// [`Serialize`]: serde::Serialize
55    /// [`Identifier`]: crate::sql::Identifier
56    #[track_caller]
57    pub fn bind(mut self, value: impl Bind) -> Self {
58        self.sql.bind_arg(value);
59        self
60    }
61
62    /// Executes the query.
63    pub async fn execute(self) -> Result<()> {
64        self.do_execute(None)?.finish().await
65    }
66
67    /// Executes the query, returning a [`RowCursor`] to obtain results.
68    ///
69    /// # Example
70    ///
71    /// ```
72    /// # async fn example() -> clickhouse::error::Result<()> {
73    /// #[derive(clickhouse::Row, serde::Deserialize)]
74    /// struct MyRow<'a> {
75    ///     no: u32,
76    ///     name: &'a str,
77    /// }
78    ///
79    /// let mut cursor = clickhouse::Client::default()
80    ///     .query("SELECT ?fields FROM some WHERE no BETWEEN 0 AND 1")
81    ///     .fetch::<MyRow<'_>>()?;
82    ///
83    /// while let Some(MyRow { name, no }) = cursor.next().await? {
84    ///     println!("{name}: {no}");
85    /// }
86    /// # Ok(()) }
87    /// ```
88    pub fn fetch<T: Row>(mut self) -> Result<RowCursor<T>> {
89        self.sql.bind_fields::<T>();
90
91        let validation = self.client.get_validation();
92        let format = if validation {
93            formats::ROW_BINARY_WITH_NAMES_AND_TYPES
94        } else {
95            formats::ROW_BINARY
96        };
97
98        let response = self.do_execute(Some(format))?;
99        Ok(RowCursor::new(response, validation))
100    }
101
102    /// Executes the query and returns just a single row.
103    ///
104    /// Note that `T` must be owned.
105    pub async fn fetch_one<T>(self) -> Result<T>
106    where
107        T: RowOwned + RowRead,
108    {
109        match self.fetch::<T>()?.next().await {
110            Ok(Some(row)) => Ok(row),
111            Ok(None) => Err(Error::RowNotFound),
112            Err(err) => Err(err),
113        }
114    }
115
116    /// Executes the query and returns at most one row.
117    ///
118    /// Note that `T` must be owned.
119    pub async fn fetch_optional<T>(self) -> Result<Option<T>>
120    where
121        T: RowOwned + RowRead,
122    {
123        self.fetch::<T>()?.next().await
124    }
125
126    /// Executes the query and returns all the generated results,
127    /// collected into a Vec.
128    ///
129    /// Note that `T` must be owned.
130    pub async fn fetch_all<T>(self) -> Result<Vec<T>>
131    where
132        T: RowOwned + RowRead,
133    {
134        let mut result = Vec::new();
135        let mut cursor = self.fetch::<T>()?;
136
137        while let Some(row) = cursor.next().await? {
138            result.push(row);
139        }
140
141        Ok(result)
142    }
143
144    /// Executes the query, returning a [`DataRowCursor`] to obtain dynamically-typed results.
145    ///
146    /// Each row is decoded into a [`crate::DataRow`] containing a [`sea_query::Value`]
147    /// for every column, using the `RowBinaryWithNamesAndTypes` format so that type
148    /// information is always available.
149    ///
150    /// # Example
151    ///
152    /// ```ignore
153    /// let mut cursor = client
154    ///     .query("SELECT number, toString(number) AS s FROM system.numbers LIMIT 3")
155    ///     .fetch_rows()?;
156    ///
157    /// while let Some(row) = cursor.next().await? {
158    ///     for (col, val) in row.column_names.iter().zip(&row.values) {
159    ///         println!("{col}: {val:?}");
160    ///     }
161    /// }
162    /// ```
163    #[cfg(feature = "sea-ql")]
164    #[cfg_attr(docsrs, doc(cfg(feature = "sea-ql")))]
165    pub fn fetch_rows(self) -> Result<DataRowCursor> {
166        let response = self.do_execute(Some(formats::ROW_BINARY_WITH_NAMES_AND_TYPES))?;
167        Ok(DataRowCursor::new(response))
168    }
169
170    /// Executes the query, returning a [`BytesCursor`] to obtain results as raw
171    /// bytes containing data in the [provided format].
172    ///
173    /// [provided format]: https://clickhouse.com/docs/en/interfaces/formats
174    pub fn fetch_bytes(self, format: impl AsRef<str>) -> Result<BytesCursor> {
175        let response = self.do_execute(Some(format.as_ref()))?;
176        Ok(BytesCursor::new(response))
177    }
178
179    pub(crate) fn do_execute(self, default_format: Option<&str>) -> Result<Response> {
180        let query = self.sql.finish()?;
181
182        let mut url =
183            Url::parse(&self.client.url).map_err(|err| Error::InvalidParams(Box::new(err)))?;
184        let mut pairs = url.query_pairs_mut();
185        pairs.clear();
186
187        if let Some(format) = default_format {
188            pairs.append_pair(settings::DEFAULT_FORMAT, format);
189        }
190
191        if let Some(database) = &self.client.database {
192            pairs.append_pair(settings::DATABASE, database);
193        }
194
195        if self.client.compression.is_lz4() {
196            pairs.append_pair(settings::COMPRESS, "1");
197        }
198
199        for (name, value) in &self.client.options {
200            pairs.append_pair(name, value);
201        }
202
203        pairs.extend_pairs(self.client.roles.iter().map(|role| (settings::ROLE, role)));
204
205        drop(pairs);
206
207        let mut builder = Request::builder().method(Method::POST).uri(url.as_str());
208        builder = with_request_headers(builder, &self.client.headers, &self.client.products_info);
209        builder = with_authentication(builder, &self.client.authentication);
210
211        let content_length = query.len();
212        builder = builder.header(CONTENT_LENGTH, content_length.to_string());
213
214        let request = builder
215            .body(RequestBody::full(query))
216            .map_err(|err| Error::InvalidParams(Box::new(err)))?;
217
218        let future = self.client.http.request(request);
219        Ok(Response::new(future, self.client.compression))
220    }
221
222    /// Configure the [roles] to use when executing this query.
223    ///
224    /// Overrides any roles previously set by this method, [`Query::with_option`],
225    /// [`Client::with_roles`] or [`Client::with_option`].
226    ///
227    /// An empty iterator may be passed to clear the set roles.
228    ///
229    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
230    pub fn with_roles(self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
231        Self {
232            client: self.client.with_roles(roles),
233            ..self
234        }
235    }
236
237    /// Clear any explicit [roles] previously set on this `Query` or inherited from [`Client`].
238    ///
239    /// Overrides any roles previously set by [`Query::with_roles`], [`Query::with_option`],
240    /// [`Client::with_roles`] or [`Client::with_option`].
241    ///
242    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
243    pub fn with_default_roles(self) -> Self {
244        Self {
245            client: self.client.with_default_roles(),
246            ..self
247        }
248    }
249
250    /// Similar to [`Client::with_option`], but for this particular query only.
251    pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
252        self.client.set_option(name, value);
253        self
254    }
255
256    /// Specify server side parameter for query.
257    ///
258    /// In queries, you can reference params as {name: type} e.g. {val: Int32}.
259    pub fn param(mut self, name: &str, value: impl Serialize) -> Self {
260        let mut param = String::from("");
261        if let Err(err) = ser::write_param(&mut param, &value) {
262            self.sql = SqlBuilder::Failed(format!("invalid param: {err}"));
263            self
264        } else {
265            self.with_option(format!("param_{name}"), param)
266        }
267    }
268}