1use hyper::{Method, Request, header::CONTENT_LENGTH};
2use serde::Serialize;
3use std::fmt::Display;
4use tracing::Instrument;
5use url::Url;
6
7use crate::{
8 Client,
9 error::{Error, Result},
10 formats,
11 headers::with_request_headers,
12 request_body::RequestBody,
13 response::Response,
14 row::{Row, RowOwned, RowRead},
15 sql::{Bind, SqlBuilder, ser},
16};
17
18#[cfg(feature = "sea-ql")]
19pub use crate::cursors::DataRowCursor;
20pub use crate::cursors::{BytesCursor, RowCursor};
21use crate::headers::with_authentication;
22use crate::settings;
23
24#[must_use]
25#[derive(Clone)]
26pub struct Query {
27 client: Client,
28 sql: SqlBuilder,
29}
30
31impl Query {
32 pub(crate) fn new(client: &Client, template: &str) -> Self {
33 Self {
34 client: client.clone(),
35 sql: SqlBuilder::new(template),
36 }
37 }
38
39 pub fn sql_display(&self) -> &impl Display {
41 &self.sql
42 }
43
44 #[track_caller]
58 pub fn bind(mut self, value: impl Bind) -> Self {
59 self.sql.bind_arg(value);
60 self
61 }
62
63 pub async fn execute(self) -> Result<()> {
65 let span = self.make_span(None);
67
68 async {
69 let mut response = self
70 .do_execute(None)
71 .inspect_err(|e| e.record_in_current_span("error executing query"))?;
72
73 response
74 .finish()
75 .await
76 .inspect_err(|e| e.record_in_current_span("response error"))
77 }
78 .instrument(span)
79 .await
80 }
81
82 pub fn fetch<T: Row>(mut self) -> Result<RowCursor<T>> {
104 let validation = self.client.get_validation();
105 let format = if validation {
106 formats::ROW_BINARY_WITH_NAMES_AND_TYPES
107 } else {
108 formats::ROW_BINARY
109 };
110
111 let span = self.make_span(Some(format)).entered();
112
113 self.sql.bind_fields::<T>();
114
115 let response = self
116 .do_execute(Some(format))
117 .inspect_err(|e| e.record_in_current_span("error executing fetch"))?;
118
119 Ok(RowCursor::new(response, validation, span.exit()))
120 }
121
122 pub async fn fetch_one<T>(self) -> Result<T>
126 where
127 T: RowOwned + RowRead,
128 {
129 match self.fetch::<T>()?.next().await {
130 Ok(Some(row)) => Ok(row),
131 Ok(None) => Err(Error::RowNotFound),
132 Err(err) => Err(err),
133 }
134 }
135
136 pub async fn fetch_optional<T>(self) -> Result<Option<T>>
140 where
141 T: RowOwned + RowRead,
142 {
143 self.fetch::<T>()?.next().await
144 }
145
146 pub async fn fetch_all<T>(self) -> Result<Vec<T>>
151 where
152 T: RowOwned + RowRead,
153 {
154 let mut result = Vec::new();
155 let mut cursor = self.fetch::<T>()?;
156
157 while let Some(row) = cursor.next().await? {
158 result.push(row);
159 }
160
161 Ok(result)
162 }
163
164 #[cfg(feature = "sea-ql")]
184 #[cfg_attr(docsrs, doc(cfg(feature = "sea-ql")))]
185 pub fn fetch_rows(self) -> Result<DataRowCursor> {
186 let response = self.do_execute(Some(formats::ROW_BINARY_WITH_NAMES_AND_TYPES))?;
187 Ok(DataRowCursor::new(response))
188 }
189
190 pub fn fetch_bytes(self, format: impl AsRef<str>) -> Result<BytesCursor> {
195 let format = format.as_ref();
196
197 let span = self.make_span(Some(format)).entered();
198
199 let response = self.do_execute(Some(format))?;
200 Ok(BytesCursor::new(response, span.exit()))
201 }
202
203 pub(crate) fn make_span(&self, response_format: Option<&str>) -> tracing::Span {
204 tracing::info_span!(
207 "clickhouse.query",
208 otel.status_code = tracing::field::Empty,
212 otel.kind = cfg!(feature = "opentelemetry").then_some("client"),
213 error.type = tracing::field::Empty,
214 db.system.name = cfg!(feature = "opentelemetry").then_some("clickhouse"),
215 db.query.summary = tracing::field::Empty,
222 db.response.status_code = tracing::field::Empty,
223 db.response.returned_rows = tracing::field::Empty,
224 clickhouse.request.session_id = self.client.get_setting(settings::SESSION_ID),
226 clickhouse.request.query_id = self.client.get_setting(settings::QUERY_ID),
227 clickhouse.response.received_bytes = tracing::field::Empty,
228 clickhouse.response.decoded_bytes = tracing::field::Empty,
229 clickhouse.response.format = response_format,
230 )
231 }
232
233 pub(crate) fn do_execute(self, default_format: Option<&str>) -> Result<Response> {
234 let query = self.sql.finish()?;
235
236 let mut url =
237 Url::parse(&self.client.url).map_err(|err| Error::InvalidParams(Box::new(err)))?;
238 let mut pairs = url.query_pairs_mut();
239 pairs.clear();
240
241 if let Some(format) = default_format {
242 pairs.append_pair(settings::DEFAULT_FORMAT, format);
243 }
244
245 if let Some(database) = &self.client.database {
246 pairs.append_pair(settings::DATABASE, database);
247 }
248
249 if self.client.compression.is_enabled() {
250 #[cfg(feature = "zstd")]
251 if matches!(self.client.compression, crate::Compression::Zstd(_)) {
252 pairs.append_pair(settings::ENABLE_HTTP_COMPRESSION, "1");
253 } else {
254 pairs.append_pair(settings::COMPRESS, "1");
255 }
256
257 #[cfg(not(feature = "zstd"))]
258 pairs.append_pair(settings::COMPRESS, "1");
259 }
260
261 for (name, value) in &self.client.settings {
262 pairs.append_pair(name, value);
263 }
264
265 pairs.extend_pairs(self.client.roles.iter().map(|role| (settings::ROLE, role)));
266
267 drop(pairs);
268
269 let mut builder = Request::builder().method(Method::POST).uri(url.as_str());
270 builder = with_request_headers(builder, &self.client.headers, &self.client.products_info);
271 builder = with_authentication(builder, &self.client.authentication);
272
273 #[cfg(feature = "zstd")]
274 if matches!(self.client.compression, crate::Compression::Zstd(_)) {
275 builder = builder.header("Accept-Encoding", "zstd");
276 }
277
278 let content_length = query.len();
279 builder = builder.header(CONTENT_LENGTH, content_length.to_string());
280
281 let request = builder.body(RequestBody::full(query)).map_err(|err| {
282 let err = Error::InvalidParams(Box::new(err));
283 err.record_in_current_span("invalid params in query");
284 err
285 })?;
286
287 let future = self.client.http.request(request);
288 Ok(Response::new(future, self.client.compression))
289 }
290
291 pub fn with_roles(self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
300 Self {
301 client: self.client.with_roles(roles),
302 ..self
303 }
304 }
305
306 pub fn with_default_roles(self) -> Self {
313 Self {
314 client: self.client.with_default_roles(),
315 ..self
316 }
317 }
318
319 #[deprecated(since = "0.14.3", note = "please use `with_setting` instead")]
321 pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
322 self.client.set_setting(name, value);
323 self
324 }
325
326 pub fn with_setting(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
328 self.client.set_setting(name, value);
329 self
330 }
331
332 pub fn param(mut self, name: &str, value: impl Serialize) -> Self {
336 let mut param = String::from("");
337 if let Err(err) = ser::write_param(&mut param, &value) {
338 self.sql = SqlBuilder::Failed(format!("invalid param: {err}"));
339 self
340 } else {
341 self.with_setting(format!("param_{name}"), param)
342 }
343 }
344}