Skip to main content

clickhouse/
query.rs

1use hyper::{Method, Request, header::CONTENT_LENGTH};
2use serde::Serialize;
3use std::fmt::Display;
4use tracing::Instrument;
5use url::Url;
6
7use crate::{
8    Client,
9    error::{Error, Result},
10    formats,
11    headers::with_request_headers,
12    request_body::RequestBody,
13    response::Response,
14    row::{Row, RowOwned, RowRead},
15    sql::{Bind, SqlBuilder, ser},
16};
17
18#[cfg(feature = "sea-ql")]
19pub use crate::cursors::DataRowCursor;
20pub use crate::cursors::{BytesCursor, RowCursor};
21use crate::headers::with_authentication;
22use crate::settings;
23
24#[must_use]
25#[derive(Clone)]
26pub struct Query {
27    client: Client,
28    sql: SqlBuilder,
29}
30
31impl Query {
32    pub(crate) fn new(client: &Client, template: &str) -> Self {
33        Self {
34            client: client.clone(),
35            sql: SqlBuilder::new(template),
36        }
37    }
38
39    /// Display SQL query as string.
40    pub fn sql_display(&self) -> &impl Display {
41        &self.sql
42    }
43
44    /// Binds `value` to the next `?` in the query.
45    ///
46    /// The `value`, which must either implement [`Serialize`] or be an
47    /// [`Identifier`], will be appropriately escaped.
48    ///
49    /// All possible errors will be returned as [`Error::InvalidParams`]
50    /// during query execution (`execute()`, `fetch()`, etc.).
51    ///
52    /// WARNING: This means that the query must not have any extra `?`, even if
53    /// they are in a string literal! Use `??` to have plain `?` in query.
54    ///
55    /// [`Serialize`]: serde::Serialize
56    /// [`Identifier`]: crate::sql::Identifier
57    #[track_caller]
58    pub fn bind(mut self, value: impl Bind) -> Self {
59        self.sql.bind_arg(value);
60        self
61    }
62
63    /// Executes the query.
64    pub async fn execute(self) -> Result<()> {
65        // Enter the span for the `self.do_execute()` call
66        let span = self.make_span(None);
67
68        async {
69            let mut response = self
70                .do_execute(None)
71                .inspect_err(|e| e.record_in_current_span("error executing query"))?;
72
73            response
74                .finish()
75                .await
76                .inspect_err(|e| e.record_in_current_span("response error"))
77        }
78        .instrument(span)
79        .await
80    }
81
82    /// Executes the query, returning a [`RowCursor`] to obtain results.
83    ///
84    /// # Example
85    ///
86    /// ```
87    /// # async fn example() -> clickhouse::error::Result<()> {
88    /// #[derive(clickhouse::Row, serde::Deserialize)]
89    /// struct MyRow<'a> {
90    ///     no: u32,
91    ///     name: &'a str,
92    /// }
93    ///
94    /// let mut cursor = clickhouse::Client::default()
95    ///     .query("SELECT ?fields FROM some WHERE no BETWEEN 0 AND 1")
96    ///     .fetch::<MyRow<'_>>()?;
97    ///
98    /// while let Some(MyRow { name, no }) = cursor.next().await? {
99    ///     println!("{name}: {no}");
100    /// }
101    /// # Ok(()) }
102    /// ```
103    pub fn fetch<T: Row>(mut self) -> Result<RowCursor<T>> {
104        let validation = self.client.get_validation();
105        let format = if validation {
106            formats::ROW_BINARY_WITH_NAMES_AND_TYPES
107        } else {
108            formats::ROW_BINARY
109        };
110
111        let span = self.make_span(Some(format)).entered();
112
113        self.sql.bind_fields::<T>();
114
115        let response = self
116            .do_execute(Some(format))
117            .inspect_err(|e| e.record_in_current_span("error executing fetch"))?;
118
119        Ok(RowCursor::new(response, validation, span.exit()))
120    }
121
122    /// Executes the query and returns just a single row.
123    ///
124    /// Note that `T` must be owned.
125    pub async fn fetch_one<T>(self) -> Result<T>
126    where
127        T: RowOwned + RowRead,
128    {
129        match self.fetch::<T>()?.next().await {
130            Ok(Some(row)) => Ok(row),
131            Ok(None) => Err(Error::RowNotFound),
132            Err(err) => Err(err),
133        }
134    }
135
136    /// Executes the query and returns at most one row.
137    ///
138    /// Note that `T` must be owned.
139    pub async fn fetch_optional<T>(self) -> Result<Option<T>>
140    where
141        T: RowOwned + RowRead,
142    {
143        self.fetch::<T>()?.next().await
144    }
145
146    /// Executes the query and returns all the generated results,
147    /// collected into a Vec.
148    ///
149    /// Note that `T` must be owned.
150    pub async fn fetch_all<T>(self) -> Result<Vec<T>>
151    where
152        T: RowOwned + RowRead,
153    {
154        let mut result = Vec::new();
155        let mut cursor = self.fetch::<T>()?;
156
157        while let Some(row) = cursor.next().await? {
158            result.push(row);
159        }
160
161        Ok(result)
162    }
163
164    /// Executes the query, returning a [`DataRowCursor`] to obtain dynamically-typed results.
165    ///
166    /// Each row is decoded into a [`crate::DataRow`] containing a [`sea_query::Value`]
167    /// for every column, using the `RowBinaryWithNamesAndTypes` format so that type
168    /// information is always available.
169    ///
170    /// # Example
171    ///
172    /// ```ignore
173    /// let mut cursor = client
174    ///     .query("SELECT number, toString(number) AS s FROM system.numbers LIMIT 3")
175    ///     .fetch_rows()?;
176    ///
177    /// while let Some(row) = cursor.next().await? {
178    ///     for (col, val) in row.column_names.iter().zip(&row.values) {
179    ///         println!("{col}: {val:?}");
180    ///     }
181    /// }
182    /// ```
183    #[cfg(feature = "sea-ql")]
184    #[cfg_attr(docsrs, doc(cfg(feature = "sea-ql")))]
185    pub fn fetch_rows(self) -> Result<DataRowCursor> {
186        let response = self.do_execute(Some(formats::ROW_BINARY_WITH_NAMES_AND_TYPES))?;
187        Ok(DataRowCursor::new(response))
188    }
189
190    /// Executes the query, returning a [`BytesCursor`] to obtain results as raw
191    /// bytes containing data in the [provided format].
192    ///
193    /// [provided format]: https://clickhouse.com/docs/en/interfaces/formats
194    pub fn fetch_bytes(self, format: impl AsRef<str>) -> Result<BytesCursor> {
195        let format = format.as_ref();
196
197        let span = self.make_span(Some(format)).entered();
198
199        let response = self.do_execute(Some(format))?;
200        Ok(BytesCursor::new(response, span.exit()))
201    }
202
203    pub(crate) fn make_span(&self, response_format: Option<&str>) -> tracing::Span {
204        // https://opentelemetry.io/docs/specs/semconv/db/sql/
205        // TODO: write our own Semantic Conventions for ClickHouse
206        tracing::info_span!(
207            "clickhouse.query",
208            // OTel conventional fields
209            // Note that `Empty` or `Option::None` fields are not reported,
210            // so we can avoid adding noise to logs when the `opentelemetry` feature is disabled.
211            otel.status_code = tracing::field::Empty,
212            otel.kind = cfg!(feature = "opentelemetry").then_some("client"),
213            error.type = tracing::field::Empty,
214            db.system.name = cfg!(feature = "opentelemetry").then_some("clickhouse"),
215            // Only log full query text at TRACE level
216            // Important that this is taken before client-side parameters are populated
217            // FIXME: we can't use `enabled!` due to https://github.com/tokio-rs/tracing/issues/2448
218            // but we don't want to log the full query at all verbosity levels.
219            // db.query.text = tracing::enabled!(tracing::Level::TRACE).then(|| self.sql.to_string()),
220            // TODO: generate summary
221            db.query.summary = tracing::field::Empty,
222            db.response.status_code = tracing::field::Empty,
223            db.response.returned_rows = tracing::field::Empty,
224            // ClickHouse-specific extension fields
225            clickhouse.request.session_id = self.client.get_setting(settings::SESSION_ID),
226            clickhouse.request.query_id = self.client.get_setting(settings::QUERY_ID),
227            clickhouse.response.received_bytes = tracing::field::Empty,
228            clickhouse.response.decoded_bytes = tracing::field::Empty,
229            clickhouse.response.format = response_format,
230        )
231    }
232
233    pub(crate) fn do_execute(self, default_format: Option<&str>) -> Result<Response> {
234        let query = self.sql.finish()?;
235
236        let mut url =
237            Url::parse(&self.client.url).map_err(|err| Error::InvalidParams(Box::new(err)))?;
238        let mut pairs = url.query_pairs_mut();
239        pairs.clear();
240
241        if let Some(format) = default_format {
242            pairs.append_pair(settings::DEFAULT_FORMAT, format);
243        }
244
245        if let Some(database) = &self.client.database {
246            pairs.append_pair(settings::DATABASE, database);
247        }
248
249        if self.client.compression.is_enabled() {
250            #[cfg(feature = "zstd")]
251            if matches!(self.client.compression, crate::Compression::Zstd(_)) {
252                pairs.append_pair(settings::ENABLE_HTTP_COMPRESSION, "1");
253            } else {
254                pairs.append_pair(settings::COMPRESS, "1");
255            }
256
257            #[cfg(not(feature = "zstd"))]
258            pairs.append_pair(settings::COMPRESS, "1");
259        }
260
261        for (name, value) in &self.client.settings {
262            pairs.append_pair(name, value);
263        }
264
265        pairs.extend_pairs(self.client.roles.iter().map(|role| (settings::ROLE, role)));
266
267        drop(pairs);
268
269        let mut builder = Request::builder().method(Method::POST).uri(url.as_str());
270        builder = with_request_headers(builder, &self.client.headers, &self.client.products_info);
271        builder = with_authentication(builder, &self.client.authentication);
272
273        #[cfg(feature = "zstd")]
274        if matches!(self.client.compression, crate::Compression::Zstd(_)) {
275            builder = builder.header("Accept-Encoding", "zstd");
276        }
277
278        let content_length = query.len();
279        builder = builder.header(CONTENT_LENGTH, content_length.to_string());
280
281        let request = builder.body(RequestBody::full(query)).map_err(|err| {
282            let err = Error::InvalidParams(Box::new(err));
283            err.record_in_current_span("invalid params in query");
284            err
285        })?;
286
287        let future = self.client.http.request(request);
288        Ok(Response::new(future, self.client.compression))
289    }
290
291    /// Configure the [roles] to use when executing this query.
292    ///
293    /// Overrides any roles previously set by this method, [`Query::with_setting`],
294    /// [`Client::with_roles`] or [`Client::with_setting`].
295    ///
296    /// An empty iterator may be passed to clear the set roles.
297    ///
298    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
299    pub fn with_roles(self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
300        Self {
301            client: self.client.with_roles(roles),
302            ..self
303        }
304    }
305
306    /// Clear any explicit [roles] previously set on this `Query` or inherited from [`Client`].
307    ///
308    /// Overrides any roles previously set by [`Query::with_roles`], [`Query::with_setting`],
309    /// [`Client::with_roles`] or [`Client::with_setting`].
310    ///
311    /// [roles]: https://clickhouse.com/docs/operations/access-rights#role-management
312    pub fn with_default_roles(self) -> Self {
313        Self {
314            client: self.client.with_default_roles(),
315            ..self
316        }
317    }
318
319    /// Similar to [`Client::with_option`], but for this particular query only.
320    #[deprecated(since = "0.14.3", note = "please use `with_setting` instead")]
321    pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
322        self.client.set_setting(name, value);
323        self
324    }
325
326    /// Similar to [`Client::with_setting`], but for this particular query only.
327    pub fn with_setting(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
328        self.client.set_setting(name, value);
329        self
330    }
331
332    /// Specify server side parameter for query.
333    ///
334    /// In queries, you can reference params as {name: type} e.g. {val: Int32}.
335    pub fn param(mut self, name: &str, value: impl Serialize) -> Self {
336        let mut param = String::from("");
337        if let Err(err) = ser::write_param(&mut param, &value) {
338            self.sql = SqlBuilder::Failed(format!("invalid param: {err}"));
339            self
340        } else {
341            self.with_setting(format!("param_{name}"), param)
342        }
343    }
344}