clickhouse_arrow/client.rs
1/// The `client` module provides the primary interface for interacting with `ClickHouse`
2/// over its native protocol, with full support for Apache Arrow interoperability.
3/// The main entry point is the [`Client`] struct, which supports both native `ClickHouse`
4/// data formats ([`NativeClient`]) and Arrow-compatible formats ([`ArrowClient`]).
5///
6/// This module is designed to be thread-safe, with [`Client`] instances that can be
7/// cloned and shared across threads. It supports querying, inserting data, managing
8/// database schemas, and handling `ClickHouse` events like progress and profiling.
9mod builder;
10mod chunk;
11#[cfg(feature = "cloud")]
12mod cloud;
13pub(crate) mod connection;
14mod internal;
15mod options;
16mod reader;
17mod response;
18mod tcp;
19mod writer;
20
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::sync::atomic::AtomicU16;
24
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::compute::take_record_batch;
27use arrow::datatypes::SchemaRef;
28use futures_util::{Stream, StreamExt, TryStreamExt, stream};
29use strum::AsRefStr;
30use tokio::sync::{broadcast, mpsc, oneshot};
31
32pub use self::builder::*;
33pub use self::connection::ConnectionStatus;
34pub(crate) use self::internal::{Message, Operation};
35pub use self::options::*;
36pub use self::response::*;
37pub use self::tcp::Destination;
38use crate::arrow::utils::batch_to_rows;
39use crate::constants::*;
40use crate::formats::{ClientFormat, NativeFormat};
41use crate::native::block::Block;
42use crate::native::protocol::{CompressionMethod, ProfileEvent};
43use crate::prelude::*;
44use crate::query::{ParsedQuery, QueryParams};
45use crate::schema::CreateOptions;
46use crate::{Error, Progress, Result, Row};
47
48static CLIENT_ID: AtomicU16 = AtomicU16::new(0);
49
50/// A `ClickHouse` client configured for the native format.
51///
52/// This type alias provides a client that works with `ClickHouse`'s internal
53/// data representation, useful when you need direct control over types
54/// and don't require Arrow integration.
55pub type NativeClient = Client<NativeFormat>;
56
57/// A `ClickHouse` client configured for Apache Arrow format.
58///
59/// This type alias provides a client that works with Arrow `RecordBatch`es,
60/// enabling seamless integration with the Arrow ecosystem for data processing
61/// and analytics workflows.
62pub type ArrowClient = Client<ArrowFormat>;
63
64/// Configuration for a `ClickHouse` connection, including tracing and cloud-specific settings.
65///
66/// This struct is used to pass optional context to [`Client::connect`], enabling features
67/// like distributed tracing or cloud instance tracking.
68///
69/// # Fields
70/// - `trace`: Optional tracing context for logging and monitoring.
71/// - `cloud`: Optional cloud-specific configuration (requires the `cloud` feature).
72#[derive(Debug, Clone, Default)]
73#[cfg_attr(not(feature = "cloud"), derive(Copy))]
74pub struct ConnectionContext {
75 pub trace: Option<TraceContext>,
76 #[cfg(feature = "cloud")]
77 pub cloud: Option<Arc<std::sync::atomic::AtomicBool>>,
78}
79
80/// Emitted clickhouse events from the underlying connection
81#[derive(Debug, Clone)]
82pub struct Event {
83 pub event: ClickHouseEvent,
84 pub qid: Qid,
85 pub client_id: u16,
86}
87
88/// Profile and progress events from clickhouse
89#[derive(Debug, Clone, AsRefStr)]
90pub enum ClickHouseEvent {
91 Progress(Progress),
92 Profile(Vec<ProfileEvent>),
93}
94
95/// A thread-safe handle for interacting with a `ClickHouse` database over its native protocol.
96///
97/// The `Client` struct is the primary interface for executing queries, inserting data, and
98/// managing database schemas. It supports two data formats:
99/// - [`NativeClient`]: Uses `ClickHouse`'s native [`Block`] format for data exchange.
100/// - [`ArrowClient`]: Uses Apache Arrow's [`RecordBatch`] for seamless interoperability with Arrow
101/// ecosystems.
102///
103/// `Client` instances are lightweight and can be cloned and shared across threads. Each instance
104/// maintains a reference to an underlying connection, which is managed automatically. The client
105/// also supports event subscription for receiving progress and profiling information from
106/// `ClickHouse`.
107///
108/// # Usage
109/// Create a `Client` using the [`ClientBuilder`] for a fluent configuration experience, or use
110/// [`Client::connect`] for direct connection setup.
111///
112/// # Examples
113/// ```rust,ignore
114/// use clickhouse_arrow::prelude::*;
115/// use clickhouse_arrow::arrow;
116/// use futures_util::StreamExt;
117///
118/// let client = Client::builder()
119/// .destination("localhost:9000")
120/// .username("default")
121/// .build::<ArrowFormat>()
122/// .await?;
123///
124/// // Execute a query
125/// let batch = client
126/// .query("SELECT 1")
127/// .await?
128/// .collect::<Vec<_>>()
129/// .await
130/// .into_iter()
131/// .collect::<Result<Vec<_>>>()?;
132/// arrow::util::pretty::print_batches(batch)?;
133/// ```
134#[derive(Clone, Debug)]
135pub struct Client<T: ClientFormat> {
136 pub client_id: u16,
137 connection: Arc<connection::Connection<T>>,
138 events: Arc<broadcast::Sender<Event>>,
139 settings: Option<Arc<Settings>>,
140}
141
142impl<T: ClientFormat> Client<T> {
143 /// Get an instance of [`ClientBuilder`] which allows creating a `Client` using a builder
144 /// Creates a new [`ClientBuilder`] for configuring and building a `ClickHouse` client.
145 ///
146 /// This method provides a fluent interface to set up a `Client` with custom connection
147 /// parameters, such as the server address, credentials, TLS, and compression. The
148 /// builder can create either a single [`Client`] or a connection pool (with the `pool`
149 /// feature enabled).
150 ///
151 /// Use this method when you need fine-grained control over the client configuration.
152 /// For simple connections, you can also use [`Client::connect`] directly.
153 ///
154 /// # Returns
155 /// A [`ClientBuilder`] instance ready for configuration.
156 ///
157 /// # Examples
158 /// ```rust,ignore
159 /// use clickhouse_arrow::prelude::*;
160 ///
161 /// let builder = Client::builder()
162 /// .with_endpoint("localhost:9000")
163 /// .with_username("default")
164 /// .with_password("");
165 /// ```
166 pub fn builder() -> ClientBuilder { ClientBuilder::new() }
167
168 /// Establishes a connection to a `ClickHouse` server over TCP, with optional TLS support.
169 ///
170 /// This method creates a new [`Client`] instance connected to the specified `destination`.
171 /// The connection can be configured using [`ClientOptions`], which allows setting parameters
172 /// like username, password, TLS, and compression. Optional `settings` can be provided to
173 /// customize `ClickHouse` session behavior, and a `context` can be used for tracing or
174 /// cloud-specific configurations.
175 ///
176 /// # Parameters
177 /// - `destination`: The `ClickHouse` server address (e.g., `"localhost:9000"` or a
178 /// [`Destination`]).
179 /// - `options`: Configuration for the connection, including credentials, TLS, and cloud
180 /// settings.
181 /// - `settings`: Optional `ClickHouse` session settings (e.g., query timeouts, max rows).
182 /// - `context`: Optional connection context for tracing or cloud-specific behavior.
183 ///
184 /// # Returns
185 /// A [`Result`] containing the connected [`Client`] instance, or an error if the connection
186 /// fails.
187 ///
188 /// # Errors
189 /// - Fails if the destination cannot be resolved or the connection cannot be established.
190 /// - Fails if authentication or TLS setup encounters an issue.
191 ///
192 /// # Examples
193 /// ```rust,ignore
194 /// use clickhouse_arrow::client::{Client, ClientOptions};
195 ///
196 /// let options = ClientOptions::default()
197 /// .username("default")
198 /// .password("")
199 /// .use_tls(false);
200 ///
201 /// let client = Client::connect("localhost:9000", options, None, None).await?;
202 /// ```
203 #[instrument(
204 level = "debug",
205 name = "clickhouse.connect",
206 fields(
207 db.system = "clickhouse",
208 db.format = T::FORMAT,
209 network.transport = ?if options.use_tls { "tls" } else { "tcp" }
210 ),
211 skip_all
212 )]
213 pub async fn connect<A: Into<Destination>>(
214 destination: A,
215 options: ClientOptions,
216 settings: Option<Arc<Settings>>,
217 context: Option<ConnectionContext>,
218 ) -> Result<Self> {
219 let context = context.unwrap_or_default();
220 let trace_ctx = context.trace.unwrap_or_default();
221 let _ = trace_ctx.link(&Span::current());
222
223 let client_id = CLIENT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
224
225 // Resolve the destination
226 let destination: Destination = destination.into();
227 let addrs = destination.resolve(options.ipv4_only).await?;
228
229 #[cfg(feature = "cloud")]
230 {
231 // Ping the cloud instance if requested
232 if let Some(domain) = options.domain.as_ref().filter(|_| options.ext.cloud.wakeup) {
233 let cloud_track = context.cloud.as_deref();
234 Self::ping_cloud(domain, options.ext.cloud.timeout, cloud_track).await;
235 }
236 }
237
238 if let Some(addr) = addrs.first() {
239 let _ = Span::current()
240 .record("server.address", tracing::field::debug(&addr.ip()))
241 .record("server.port", addr.port());
242 debug!(server.address = %addr.ip(), server.port = addr.port(), "Initiating connection");
243 }
244
245 let (event_tx, _) = broadcast::channel(EVENTS_CAPACITY);
246 let events = Arc::new(event_tx);
247 let conn_ev = Arc::clone(&events);
248
249 let conn =
250 connection::Connection::connect(client_id, addrs, options, conn_ev, trace_ctx).await?;
251 let connection = Arc::new(conn);
252
253 debug!("created connection successfully");
254
255 Ok(Client { client_id, connection, events, settings })
256 }
257
258 /// Retrieves the status of the underlying `ClickHouse` connection.
259 ///
260 /// This method returns the current [`ConnectionStatus`] of the client's connection,
261 /// indicating whether it is active, idle, or disconnected. Useful for monitoring
262 /// the health of the connection before executing queries.
263 ///
264 /// # Returns
265 /// A [`ConnectionStatus`] enum describing the connection state.
266 ///
267 /// # Examples
268 /// ```rust,ignore
269 /// use clickhouse_arrow::prelude::*;
270 ///
271 /// let client = Client::<ArrowFormat>::builder()
272 /// .with_endpoint("localhost:9000")
273 /// .build()
274 /// .await
275 /// .unwrap();
276 ///
277 /// let status = client.status();
278 /// println!("Connection status: {status:?}");
279 /// ```
280 pub fn status(&self) -> ConnectionStatus { self.connection.status() }
281
282 /// Subscribes to progress and profile events from `ClickHouse` queries.
283 ///
284 /// This method returns a [`broadcast::Receiver`] that delivers [`Event`] instances
285 /// containing progress updates ([`Progress`]) or profiling information ([`ProfileEvent`])
286 /// as queries execute. Events are generated asynchronously and can be used to monitor
287 /// query execution in real time.
288 ///
289 /// # Returns
290 /// A [`broadcast::Receiver<Event>`] for receiving `ClickHouse` events.
291 ///
292 /// # Examples
293 /// ```rust,ignore
294 /// use clickhouse_arrow::prelude::*;
295 /// use tokio::sync::broadcast::error::RecvError;
296 ///
297 /// let client = Client::builder()
298 /// .with_endpoint("localhost:9000")
299 /// .build_arrow()
300 /// .await
301 /// .unwrap();
302 ///
303 /// let mut receiver = client.subscribe_events();
304 /// let handle = tokio::spawn(async move {
305 /// while let Ok(event) = receiver.recv().await {
306 /// println!("Received event: {:?}", event);
307 /// }
308 /// });
309 ///
310 /// // Execute a query to generate events
311 /// client.query("SELECT * FROM large_table").await.unwrap();
312 /// ```
313 pub fn subscribe_events(&self) -> broadcast::Receiver<Event> { self.events.subscribe() }
314
315 /// Checks the health of the underlying `ClickHouse` connection.
316 ///
317 /// This method verifies that the connection is active and responsive. If `ping` is
318 /// `true`, it sends a lightweight ping to the `ClickHouse` server to confirm
319 /// connectivity. Otherwise, it checks the connection's internal state.
320 ///
321 /// # Parameters
322 /// - `ping`: If `true`, performs an active ping to the server; if `false`, checks the
323 /// connection state without network activity.
324 ///
325 /// # Returns
326 /// A [`Result`] indicating whether the connection is healthy.
327 ///
328 /// # Errors
329 /// - Fails if the connection is disconnected or unresponsive.
330 /// - Fails if the ping operation times out or encounters a network error.
331 ///
332 /// # Examples
333 /// ```rust,ignore
334 /// use clickhouse_arrow::prelude::*;
335 ///
336 /// let client = Client::builder()
337 /// .with_endpoint("localhost:9000")
338 /// .build::<ArrowFormat>()
339 /// .await
340 /// .unwrap();
341 ///
342 /// client.health_check(true).await.unwrap();
343 /// println!("Connection is healthy!");
344 /// ```
345 pub async fn health_check(&self, ping: bool) -> Result<()> {
346 trace!({ ATT_CID } = self.client_id, "sending health check w/ ping={ping}");
347 self.conn().await?.check_connection(ping).await
348 }
349
350 /// Shuts down the `ClickHouse` client and closes its connection.
351 ///
352 /// This method gracefully terminates the underlying connection, ensuring that any
353 /// pending operations are completed or canceled. After shutdown, the client cannot
354 /// be used for further operations.
355 ///
356 /// # Returns
357 /// A [`Result`] indicating whether the shutdown was successful.
358 ///
359 /// # Errors
360 /// - Fails if the connection cannot be closed due to network issues or internal errors.
361 ///
362 /// # Examples
363 /// ```rust,ignore
364 /// use clickhouse_arrow::prelude::*;
365 ///
366 /// let client = Client::builder()
367 /// .with_endpoint("localhost:9000")
368 /// .build::<ArrowFormat>()
369 /// .await
370 /// .unwrap();
371 ///
372 /// client.shutdown().await.unwrap();
373 /// println!("Client shut down successfully!");
374 /// ```
375 pub async fn shutdown(&self) -> Result<()> {
376 trace!("shutting down client");
377 self.conn().await?.shutdown().await
378 }
379
380 /// Inserts a block of data into `ClickHouse` using the native protocol.
381 ///
382 /// This method sends an insert query with a single block of data, formatted according to
383 /// the client's data format (`T: ClientFormat`). For [`NativeClient`], the data is a
384 /// [`Block`]; for [`ArrowClient`], it is a [`RecordBatch`]. The query is executed
385 /// asynchronously, and any response data, progress events, or errors are streamed back.
386 ///
387 /// Progress and profile events are dispatched to the client's event channel (see
388 /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
389 /// error if the insert fails.
390 ///
391 /// # Parameters
392 /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
393 /// - `block`: The data to insert, in the format specified by `T` ([`Block`] or
394 /// [`RecordBatch`]).
395 /// - `qid`: Optional query ID for tracking and debugging.
396 ///
397 /// # Returns
398 /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
399 /// the success or failure of processing response data.
400 ///
401 /// # Errors
402 /// - Fails if the query is malformed or the data format is invalid.
403 /// - Fails if the connection to `ClickHouse` is interrupted.
404 /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
405 ///
406 /// # Examples
407 /// ```rust,ignore
408 /// use clickhouse_arrow::prelude::*;
409 /// use arrow::record_batch::RecordBatch;
410 ///
411 /// let client = Client::builder()
412 /// .destination("localhost:9000")
413 /// .build_arrow()
414 /// .await?;
415 ///
416 /// let qid = Qid::new();
417 /// // Assume `batch` is a valid RecordBatch
418 /// let batch: RecordBatch = // ...;
419 /// let stream = client.insert("INSERT INTO my_table VALUES", batch, Some(qid)).await?;
420 /// while let Some(result) = stream.next().await {
421 /// result?; // Check for errors
422 /// }
423 /// ```
424 #[instrument(
425 level = "trace",
426 name = "clickhouse.insert",
427 skip_all
428 fields(
429 db.system = "clickhouse",
430 db.operation = "insert",
431 db.format = T::FORMAT,
432 clickhouse.client.id = self.client_id,
433 clickhouse.query.id
434 ),
435 )]
436 pub async fn insert(
437 &self,
438 query: impl Into<ParsedQuery>,
439 block: T::Data,
440 qid: Option<Qid>,
441 ) -> Result<impl Stream<Item = Result<()>> + '_> {
442 let (query, qid) = record_query(qid, query.into(), self.client_id);
443
444 // Create metadata channel
445 let (tx, rx) = oneshot::channel();
446 let connection = self.conn().await?;
447
448 // Send query
449 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
450 let conn_idx = connection
451 .send_operation(
452 Operation::Query {
453 query,
454 settings: self.settings.clone(),
455 params: None,
456 response: tx,
457 header: None,
458 },
459 qid,
460 false,
461 )
462 .await?;
463
464 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
465 let responses = rx
466 .await
467 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
468 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
469
470 // Send data
471 let (tx, rx) = oneshot::channel();
472 let _ = connection
473 .send_operation(Operation::Insert { data: block, response: tx }, qid, true)
474 .await?;
475 rx.await.map_err(|_| {
476 Error::Protocol(format!("Failed to receive response from insert {qid}"))
477 })??;
478
479 // Decrement load balancer
480 #[cfg(feature = "inner_pool")]
481 connection.finish(conn_idx, Operation::<T::Data>::weight_insert());
482
483 Ok(self.insert_response(responses, qid))
484 }
485
486 /// Inserts multiple blocks of data into `ClickHouse` using the native protocol.
487 ///
488 /// This method sends an insert query with a collection of data blocks, formatted
489 /// according to the client's data format (`T: ClientFormat`). For [`NativeClient`],
490 /// the data is a `Vec<Block>`; for [`ArrowClient`], it is a `Vec<RecordBatch>`.
491 /// The query is executed asynchronously, and any response data, progress events,
492 /// or errors are streamed back.
493 ///
494 /// Progress and profile events are dispatched to the client's event channel (see
495 /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
496 /// error if the insert fails. Use this method when inserting multiple batches of
497 /// data to reduce overhead compared to multiple [`Client::insert`] calls.
498 ///
499 /// # Parameters
500 /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
501 /// - `batch`: A vector of data blocks to insert, in the format specified by `T`.
502 /// - `qid`: Optional query ID for tracking and debugging.
503 ///
504 /// # Returns
505 /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
506 /// the success or failure of processing response data.
507 ///
508 /// # Errors
509 /// - Fails if the query is malformed or any data block is invalid.
510 /// - Fails if the connection to `ClickHouse` is interrupted.
511 /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
512 ///
513 /// # Examples
514 /// ```rust,ignore
515 /// use clickhouse_arrow::prelude::*;
516 /// use arrow::record_batch::RecordBatch;
517 ///
518 /// let client = Client::builder()
519 /// .with_endpoint("localhost:9000")
520 /// .build::<ArrowFormat>()
521 /// .await
522 /// .unwrap();
523 ///
524 /// // Assume `batches` is a Vec<RecordBatch>
525 /// let batches: Vec<RecordBatch> = vec![/* ... */];
526 /// let stream = client.insert_many("INSERT INTO my_table VALUES", batches, None).await.unwrap();
527 /// while let Some(result) = stream.next().await {
528 /// result.unwrap(); // Check for errors
529 /// }
530 /// ```
531 #[instrument(
532 name = "clickhouse.insert_many",
533 skip_all,
534 fields(
535 db.system = "clickhouse",
536 db.operation = "insert",
537 db.format = T::FORMAT,
538 clickhouse.client.id = self.client_id,
539 clickhouse.query.id
540 ),
541 )]
542 pub async fn insert_many(
543 &self,
544 query: impl Into<ParsedQuery>,
545 batch: Vec<T::Data>,
546 qid: Option<Qid>,
547 ) -> Result<impl Stream<Item = Result<()>> + '_> {
548 let (query, qid) = record_query(qid, query.into(), self.client_id);
549
550 // Create metadata channel
551 let (tx, rx) = oneshot::channel();
552 let connection = self.conn().await?;
553
554 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
555 let conn_idx = connection
556 .send_operation(
557 Operation::Query {
558 query,
559 settings: self.settings.clone(),
560 params: None,
561 response: tx,
562 header: None,
563 },
564 qid,
565 false,
566 )
567 .await?;
568
569 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
570 let responses = rx
571 .await
572 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
573 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
574
575 // Send data
576 let (tx, rx) = oneshot::channel();
577 let _ = connection
578 .send_operation(Operation::InsertMany { data: batch, response: tx }, qid, true)
579 .await?;
580 rx.await.map_err(|_| {
581 Error::Protocol(format!("Failed to receive response from insert {qid}"))
582 })??;
583
584 // Decrement load balancer
585 #[cfg(feature = "inner_pool")]
586 connection.finish(conn_idx, Operation::<T::Data>::weight_insert_many());
587
588 Ok(self.insert_response(responses, qid))
589 }
590
591 /// Executes a raw `ClickHouse` query and streams raw data in the client's format.
592 ///
593 /// This method sends a query to `ClickHouse` and returns a stream of raw data blocks
594 /// in the format specified by `T: ClientFormat` ([`Block`] for [`NativeClient`],
595 /// [`RecordBatch`] for [`ArrowClient`]). It is a low-level method suitable for
596 /// custom processing of query results. For higher-level interfaces, consider
597 /// [`Client::query`] or [`Client::query_rows`].
598 ///
599 /// Progress and profile events are dispatched to the client's event channel (see
600 /// [`Client::subscribe_events`]).
601 ///
602 /// # Parameters
603 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
604 /// - `qid`: A unique query ID for tracking and debugging.
605 ///
606 /// # Returns
607 /// A [`Result`] containing a stream of [`Result<T::Data>`], where each item is a
608 /// data block or an error.
609 ///
610 /// # Errors
611 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
612 /// - Fails if the connection to `ClickHouse` is interrupted.
613 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
614 ///
615 /// # Examples
616 /// ```rust,ignore
617 /// use clickhouse_arrow::prelude::*;
618 ///
619 /// let client = Client::builder()
620 /// .with_endpoint("localhost:9000")
621 /// .build::<ArrowFormat>()
622 /// .await
623 /// .unwrap();
624 ///
625 /// let qid = Qid::new();
626 /// let mut stream = client.query_raw("SELECT * FROM my_table", qid).await.unwrap();
627 /// while let Some(block) = stream.next().await {
628 /// let batch = block.unwrap();
629 /// println!("Received batch with {} rows", batch.num_rows());
630 /// }
631 /// ```
632 #[instrument(
633 name = "clickhouse.query",
634 skip_all
635 fields(
636 db.system = "clickhouse",
637 db.operation = "query",
638 db.format = T::FORMAT,
639 clickhouse.client.id = self.client_id,
640 clickhouse.query.id = %qid
641 ),
642 )]
643 pub async fn query_raw<P: Into<QueryParams>>(
644 &self,
645 query: String,
646 params: Option<P>,
647 qid: Qid,
648 ) -> Result<impl Stream<Item = Result<T::Data>> + 'static> {
649 // Create metadata channel
650 let (tx, rx) = oneshot::channel();
651 let connection = self.conn().await?;
652
653 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
654 let conn_idx = connection
655 .send_operation(
656 Operation::Query {
657 query,
658 settings: self.settings.clone(),
659 params: params.map(Into::into),
660 response: tx,
661 header: None,
662 },
663 qid,
664 true,
665 )
666 .await?;
667
668 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
669
670 let responses = rx
671 .await
672 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
673 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
674 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
675
676 // Decrement load balancer
677 #[cfg(feature = "inner_pool")]
678 connection.finish(conn_idx, Operation::<T::Data>::weight_query());
679
680 Ok(create_response_stream::<T>(responses, qid, self.client_id))
681 }
682
683 /// Executes a `ClickHouse` query and discards all returned data.
684 ///
685 /// This method sends a query to `ClickHouse` and processes the response stream to
686 /// check for errors, but discards any returned data blocks. It is useful for
687 /// queries that modify data (e.g., `INSERT`, `UPDATE`, `DELETE`) or DDL statements
688 /// where the result data is not needed. For queries that return data, use
689 /// [`Client::query`] or [`Client::query_raw`].
690 ///
691 /// # Parameters
692 /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
693 /// - `qid`: Optional query ID for tracking and debugging.
694 ///
695 /// # Returns
696 /// A [`Result`] indicating whether the query executed successfully.
697 ///
698 /// # Errors
699 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
700 /// - Fails if the connection to `ClickHouse` is interrupted.
701 /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
702 ///
703 /// # Examples
704 /// ```rust,ignore
705 /// use clickhouse_arrow::prelude::*;
706 ///
707 /// let client = Client::builder()
708 /// .with_endpoint("localhost:9000")
709 /// .build_arrow()
710 /// .await
711 /// .unwrap();
712 ///
713 /// client.execute("DROP TABLE IF EXISTS my_table", None).await.unwrap();
714 /// println!("Table dropped successfully!");
715 /// ```
716 #[instrument(
717 name = "clickhouse.execute",
718 skip_all,
719 fields(
720 db.system = "clickhouse",
721 db.format = T::FORMAT,
722 db.operation = "query",
723 clickhouse.client.id = self.client_id,
724 clickhouse.query.id
725 )
726 )]
727 pub async fn execute(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
728 self.execute_params(query, None::<QueryParams>, qid).await
729 }
730
731 /// Executes a `ClickHouse` query with query parameters and discards all returned data.
732 ///
733 /// # Parameters
734 /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
735 /// - `params`: The query parameters to provide
736 /// - `qid`: Optional query ID for tracking and debugging.
737 ///
738 /// # Returns
739 /// A [`Result`] indicating whether the query executed successfully.
740 ///
741 /// # Errors
742 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
743 /// - Fails if the connection to `ClickHouse` is interrupted.
744 /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
745 ///
746 /// # Examples
747 /// ```rust,ignore
748 /// use clickhouse_arrow::prelude::*;
749 ///
750 /// let client = Client::builder()
751 /// .with_endpoint("localhost:9000")
752 /// .build_arrow()
753 /// .await
754 /// .unwrap();
755 ///
756 /// let params = Some(vec![
757 /// ("str", ParamValue::from("hello")),
758 /// ("num", ParamValue::from(42)),
759 /// ("array", ParamValue::from("['a', 'b', 'c']")),
760 /// ]);
761 /// let query = "SELECT {num:Int64}, {str:String}, {array:Array(String)}";
762 /// client.execute_params(query, params, None).await.unwrap();
763 /// println!("Table dropped successfully!");
764 /// ```
765 #[instrument(
766 name = "clickhouse.execute_params",
767 skip_all,
768 fields(
769 db.system = "clickhouse",
770 db.format = T::FORMAT,
771 db.operation = "query",
772 clickhouse.client.id = self.client_id,
773 clickhouse.query.id
774 )
775 )]
776 pub async fn execute_params<P: Into<QueryParams>>(
777 &self,
778 query: impl Into<ParsedQuery>,
779 params: Option<P>,
780 qid: Option<Qid>,
781 ) -> Result<()> {
782 let (query, qid) = record_query(qid, query.into(), self.client_id);
783 let stream = self.query_raw(query, params, qid).await?;
784 tokio::pin!(stream);
785 while let Some(next) = stream.next().await {
786 drop(next?);
787 }
788 Ok(())
789 }
790
791 /// Executes a `ClickHouse` query without processing the response stream.
792 ///
793 /// This method sends a query to `ClickHouse` and immediately discards the response
794 /// stream without checking for errors or processing data. It is a lightweight
795 /// alternative to [`Client::execute`], suitable for fire-and-forget scenarios where
796 /// the query's outcome is not critical. For safer execution, use [`Client::execute`].
797 ///
798 /// # Parameters
799 /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
800 /// - `qid`: Optional query ID for tracking and debugging.
801 ///
802 /// # Returns
803 /// A [`Result`] indicating whether the query was sent successfully.
804 ///
805 /// # Errors
806 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
807 /// - Fails if the connection to `ClickHouse` is interrupted.
808 ///
809 /// # Examples
810 /// ```rust,ignore
811 /// use clickhouse_arrow::prelude::*;
812 ///
813 /// let client = Client::builder()
814 /// .with_endpoint("localhost:9000")
815 /// .build::<ArrowFormat>()
816 /// .await
817 /// .unwrap();
818 ///
819 /// client.execute_now("INSERT INTO logs VALUES ('event')", None).await.unwrap();
820 /// println!("Log event sent!");
821 /// ```
822 #[instrument(
823 name = "clickhouse.execute_now",
824 skip_all,
825 fields(
826 db.system = "clickhouse",
827 db.format = T::FORMAT,
828 db.operation = "query",
829 clickhouse.client.id = self.client_id,
830 clickhouse.query.id
831 )
832 )]
833 pub async fn execute_now(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
834 self.execute_now_params(query, None::<QueryParams>, qid).await
835 }
836
837 /// Executes a `ClickHouse` query with query parameters without processing the response stream.
838 ///
839 /// # Parameters
840 /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
841 /// - `params`: The query parameters to provide
842 /// - `qid`: Optional query ID for tracking and debugging.
843 ///
844 /// # Returns
845 /// A [`Result`] indicating whether the query was sent successfully.
846 ///
847 /// # Errors
848 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
849 /// - Fails if the connection to `ClickHouse` is interrupted.
850 ///
851 /// # Examples
852 /// ```rust,ignore
853 /// use clickhouse_arrow::prelude::*;
854 ///
855 /// let client = Client::builder()
856 /// .with_endpoint("localhost:9000")
857 /// .build::<ArrowFormat>()
858 /// .await
859 /// .unwrap();
860 ///
861 ///
862 /// let params = Some(vec![("str", ParamValue::from("hello"))]);
863 /// let query = "INSERT INTO logs VALUES ({str:String})";
864 /// client.execute_now_params(query, params, None).await.unwrap();
865 /// println!("Log event sent!");
866 /// ```
867 #[instrument(
868 name = "clickhouse.execute_now_params",
869 skip_all,
870 fields(
871 db.system = "clickhouse",
872 db.format = T::FORMAT,
873 db.operation = "query",
874 clickhouse.client.id = self.client_id,
875 clickhouse.query.id
876 )
877 )]
878 pub async fn execute_now_params<P: Into<QueryParams>>(
879 &self,
880 query: impl Into<ParsedQuery>,
881 params: Option<P>,
882 qid: Option<Qid>,
883 ) -> Result<()> {
884 let (query, qid) = record_query(qid, query.into(), self.client_id);
885 drop(self.query_raw(query, params, qid).await?);
886 Ok(())
887 }
888
889 /// Creates a new database in `ClickHouse` using a DDL statement.
890 ///
891 /// This method issues a `CREATE DATABASE` statement for the specified database. If no
892 /// database is provided, it uses the client's default database from the connection
893 /// metadata. The `default` database cannot be created, as it is reserved by `ClickHouse`.
894 ///
895 /// # Parameters
896 /// - `database`: Optional name of the database to create. If `None`, uses the client's default
897 /// database.
898 /// - `qid`: Optional query ID for tracking and debugging.
899 ///
900 /// # Returns
901 /// A [`Result`] indicating success or failure of the operation.
902 ///
903 /// # Errors
904 /// - Fails if the database name is invalid or reserved (e.g., `default`).
905 /// - Fails if the query execution encounters a `ClickHouse` error.
906 /// - Fails if the connection is interrupted.
907 ///
908 /// # Examples
909 /// ```rust,ignore
910 /// use clickhouse_arrow::client::{Client, ClientBuilder};
911 ///
912 /// let client = ClientBuilder::new()
913 /// .destination("localhost:9000")
914 /// .build_native()
915 /// .await?;
916 ///
917 /// client.create_database(Some("my_db"), None).await?;
918 /// ```
919 #[instrument(
920 name = "clickhouse.create_database",
921 skip_all
922 fields(db.system = "clickhouse", db.operation = "create.database")
923 )]
924 pub async fn create_database(&self, database: Option<&str>, qid: Option<Qid>) -> Result<()> {
925 let database = database.unwrap_or(self.connection.database());
926 if database.eq_ignore_ascii_case("default") {
927 warn!("Exiting, cannot create `default` database");
928 return Ok(());
929 }
930
931 let stmt = create_db_statement(database)?;
932 self.execute(stmt, qid).await?;
933 Ok(())
934 }
935
936 /// Drops a database in `ClickHouse` using a DDL statement.
937 ///
938 /// This method issues a `DROP DATABASE` statement for the specified database. The
939 /// `default` database cannot be dropped, as it is reserved by `ClickHouse`. If the client
940 /// is connected to a non-default database, dropping a different database is not allowed
941 /// to prevent accidental data loss.
942 ///
943 /// # Parameters
944 /// - `database`: Name of the database to drop.
945 /// - `sync`: If `true`, the operation waits for `ClickHouse` to complete the drop
946 /// synchronously.
947 /// - `qid`: Optional query ID for tracking and debugging.
948 ///
949 /// # Returns
950 /// A [`Result`] indicating success or failure of the operation.
951 ///
952 /// # Errors
953 /// - Fails if the database is `default` (reserved).
954 /// - Fails if the client is connected to a non-default database different from `database`.
955 /// - Fails if the query execution encounters a `ClickHouse` error.
956 ///
957 /// # Examples
958 /// ```rust,ignore
959 /// use clickhouse_arrow::prelude::*;
960 ///
961 /// let client = Client::builder()
962 /// .destination("localhost:9000")
963 /// .database("default") // Must be connected to default to drop 'other' databases
964 /// .build::<NativeFormat>()
965 /// .await?;
966 ///
967 /// client.drop_database("my_db", true, None).await?;
968 /// ```
969 #[instrument(
970 name = "clickhouse.drop_database",
971 skip_all
972 fields(db.system = "clickhouse", db.operation = "drop.database")
973 )]
974 pub async fn drop_database(&self, database: &str, sync: bool, qid: Option<Qid>) -> Result<()> {
975 if database.eq_ignore_ascii_case("default") {
976 warn!("Exiting, cannot drop `default` database");
977 return Ok(());
978 }
979
980 // TODO: Should this check remain? Or should the query writing be modified in the case
981 // of issuing DDL statements while connected to a non-default database
982 let current_database = self.connection.database();
983 if current_database != "default"
984 && !current_database.is_empty()
985 && current_database != database
986 {
987 error!("Cannot drop database {database} while connected to {current_database}");
988 return Err(Error::InsufficientDDLScope(current_database.into()));
989 }
990
991 let stmt = drop_db_statement(database, sync)?;
992 self.execute(stmt, qid).await?;
993 Ok(())
994 }
995}
996
997impl<T: ClientFormat> Client<T> {
998 /// Get a reference to the underlying connection.
999 ///
1000 /// TODO: Support reconnect.
1001 #[expect(clippy::unused_async)]
1002 async fn conn(&self) -> Result<&connection::Connection<T>> {
1003 // TODO: Add reconnection logic here if configured
1004 Ok(self.connection.as_ref())
1005 }
1006
1007 /// # Feature
1008 /// Requires the `cloud` feature to be enabled.
1009 #[cfg(feature = "cloud")]
1010 #[instrument(level = "trace", name = "clickhouse.cloud.ping")]
1011 async fn ping_cloud(
1012 domain: &str,
1013 timeout: Option<u64>,
1014 track: Option<&std::sync::atomic::AtomicBool>,
1015 ) {
1016 debug!("pinging cloud instance");
1017 if !domain.is_empty() {
1018 debug!(domain, "cloud endpoint found");
1019 // Create receiver channel to cancel ping if dropped
1020 let (_tx, rx) = oneshot::channel::<()>();
1021 cloud::ping_cloud(domain.to_string(), timeout, track, rx).await;
1022 }
1023 }
1024
1025 // Helper function to convert a receiver of data into a `ClickHouseResponse`
1026 fn insert_response(
1027 &self,
1028 rx: mpsc::Receiver<Result<T::Data>>,
1029 qid: Qid,
1030 ) -> ClickHouseResponse<()> {
1031 ClickHouseResponse::<()>::from_stream(handle_insert_response::<T>(rx, qid, self.client_id))
1032 }
1033}
1034
1035impl Client<NativeFormat> {
1036 /// Inserts rows into `ClickHouse` using the native protocol.
1037 ///
1038 /// This method sends an insert query with a collection of rows, where each row is
1039 /// a type `T` implementing [`Row`]. The rows are converted into a `ClickHouse`
1040 /// [`Block`] and sent over the native protocol. The query is executed asynchronously,
1041 /// and any response data, progress events, or errors are streamed back.
1042 ///
1043 /// Progress and profile events are dispatched to the client's event channel (see
1044 /// [`Client::subscribe_events`]). The returned [`ClickHouseResponse`] yields `()`
1045 /// on success or an error if the insert fails.
1046 ///
1047 /// # Parameters
1048 /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
1049 /// - `blocks`: An iterator of rows to insert, where each row implements [`Row`].
1050 /// - `qid`: Optional query ID for tracking and debugging.
1051 ///
1052 /// # Returns
1053 /// A [`Result`] containing a [`ClickHouseResponse<()>`] that streams the operation's
1054 /// outcome.
1055 ///
1056 /// # Errors
1057 /// - Fails if the query is malformed or the row data is invalid.
1058 /// - Fails if the connection to `ClickHouse` is interrupted.
1059 /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
1060 ///
1061 /// # Examples
1062 /// ```rust,ignore
1063 /// use clickhouse_arrow::prelude::*;
1064 ///
1065 /// let client = Client::builder()
1066 /// .with_endpoint("localhost:9000")
1067 /// .build_native()
1068 /// .await
1069 /// .unwrap();
1070 ///
1071 /// // Assume `MyRow` implements `Row`
1072 /// let rows = vec![MyRow { /* ... */ }, MyRow { /* ... */ }];
1073 /// let response = client.insert_rows("INSERT INTO my_table VALUES", rows.into_iter(), None)
1074 /// .await
1075 /// .unwrap();
1076 /// while let Some(result) = response.next().await {
1077 /// result.unwrap(); // Check for errors
1078 /// }
1079 /// ```
1080 #[instrument(
1081 name = "clickhouse.insert_rows",
1082 fields(
1083 db.system = "clickhouse",
1084 db.operation = "insert",
1085 db.format = NativeFormat::FORMAT,
1086 clickhouse.client.id = self.client_id,
1087 clickhouse.query.id
1088 ),
1089 skip_all
1090 )]
1091 pub async fn insert_rows<T: Row + Send + 'static>(
1092 &self,
1093 query: impl Into<ParsedQuery>,
1094 blocks: impl Iterator<Item = T> + Send + Sync + 'static,
1095 qid: Option<Qid>,
1096 ) -> Result<ClickHouseResponse<()>> {
1097 let cid = self.client_id;
1098 let (query, qid) = record_query(qid, query.into(), cid);
1099
1100 // Create metadata channel
1101 let (tx, rx) = oneshot::channel();
1102 let (header_tx, header_rx) = oneshot::channel();
1103
1104 let connection = self.conn().await?;
1105
1106 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1107 let conn_idx = connection
1108 .send_operation(
1109 Operation::Query {
1110 query,
1111 settings: self.settings.clone(),
1112 params: None,
1113 response: tx,
1114 header: Some(header_tx),
1115 },
1116 qid,
1117 false,
1118 )
1119 .await?;
1120
1121 trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "sent query, awaiting response");
1122 let responses = rx
1123 .await
1124 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1125 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1126
1127 let header = header_rx
1128 .await
1129 .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1130 let data = Block::from_rows(blocks.collect(), header)?;
1131
1132 let (tx, rx) = oneshot::channel();
1133 let _ =
1134 connection.send_operation(Operation::Insert { data, response: tx }, qid, true).await?;
1135 rx.await.map_err(|_| {
1136 Error::Protocol(format!("Failed to receive response from insert {qid}"))
1137 })??;
1138
1139 // Decrement load balancer
1140 #[cfg(feature = "inner_pool")]
1141 connection.finish(conn_idx, Operation::<Block>::weight_query());
1142
1143 Ok(self.insert_response(responses, qid))
1144 }
1145
1146 /// Executes a `ClickHouse` query and streams deserialized rows.
1147 ///
1148 /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1149 /// each row is deserialized into type `T` implementing [`Row`]. Rows are grouped
1150 /// into `ClickHouse` blocks, and the stream yields rows as they are received. Use
1151 /// this method for type-safe access to query results in native format.
1152 ///
1153 /// Progress and profile events are dispatched to the client's event channel (see
1154 /// [`Client::subscribe_events`]).
1155 ///
1156 /// # Parameters
1157 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1158 /// - `qid`: Optional query ID for tracking and debugging.
1159 ///
1160 /// # Returns
1161 /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1162 /// rows of type `T`.
1163 ///
1164 /// # Errors
1165 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1166 /// - Fails if row deserialization fails (e.g., schema mismatch).
1167 /// - Fails if the connection to `ClickHouse` is interrupted.
1168 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1169 ///
1170 /// # Examples
1171 /// ```rust,ignore
1172 /// use clickhouse_arrow::prelude::*;
1173 ///
1174 /// let client = Client::builder()
1175 /// .with_endpoint("localhost:9000")
1176 /// .build_native()
1177 /// .await
1178 /// .unwrap();
1179 ///
1180 /// // Assume `MyRow` implements `Row`
1181 /// let mut response = client.query::<MyRow>("SELECT * FROM my_table", None).await.unwrap();
1182 /// while let Some(row) = response.next().await {
1183 /// let row = row.unwrap();
1184 /// println!("Row: {:?}", row);
1185 /// }
1186 /// ```
1187 #[instrument(
1188 name = "clickhouse.query",
1189 skip_all,
1190 fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1191 )]
1192 pub async fn query<T: Row + Send + 'static>(
1193 &self,
1194 query: impl Into<ParsedQuery>,
1195 qid: Option<Qid>,
1196 ) -> Result<ClickHouseResponse<T>> {
1197 self.query_params(query, None, qid).await
1198 }
1199
1200 /// Executes a `ClickHouse` query with parameters and streams deserialized rows.
1201 ///
1202 /// # Parameters
1203 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1204 /// - `params`: The query parameters to provide
1205 /// - `qid`: Optional query ID for tracking and debugging.
1206 ///
1207 /// # Returns
1208 /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1209 /// rows of type `T`.
1210 ///
1211 /// # Errors
1212 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1213 /// - Fails if row deserialization fails (e.g., schema mismatch).
1214 /// - Fails if the connection to `ClickHouse` is interrupted.
1215 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1216 ///
1217 /// # Examples
1218 /// ```rust,ignore
1219 /// use clickhouse_arrow::prelude::*;
1220 ///
1221 /// let client = Client::builder()
1222 /// .with_endpoint("localhost:9000")
1223 /// .build_native()
1224 /// .await
1225 /// .unwrap();
1226 ///
1227 /// // Assume `MyRow` implements `Row`
1228 /// let params = Some(vec![("name", ParamValue::from("my_table"))]);
1229 /// let query = "SELECT * FROM {name:Identifier}";
1230 /// let mut response = client.query_params::<MyRow>(query, params, None).await.unwrap();
1231 /// while let Some(row) = response.next().await {
1232 /// let row = row.unwrap();
1233 /// println!("Row: {:?}", row);
1234 /// }
1235 /// ```
1236 #[instrument(
1237 name = "clickhouse.query_params",
1238 skip_all,
1239 fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1240 )]
1241 pub async fn query_params<T: Row + Send + 'static>(
1242 &self,
1243 query: impl Into<ParsedQuery>,
1244 params: Option<QueryParams>,
1245 qid: Option<Qid>,
1246 ) -> Result<ClickHouseResponse<T>> {
1247 let (query, qid) = record_query(qid, query.into(), self.client_id);
1248 let raw = self.query_raw(query, params, qid).await?;
1249 Ok(ClickHouseResponse::new(Box::pin(raw.flat_map(|block| {
1250 match block {
1251 Ok(mut block) => stream::iter(
1252 block
1253 .take_iter_rows()
1254 .filter(|x| !x.is_empty())
1255 .map(T::deserialize_row)
1256 .map(|maybe| maybe.inspect_err(|error| error!(?error, "deserializing row")))
1257 .collect::<Vec<_>>(),
1258 ),
1259 Err(e) => stream::iter(vec![Err(e)]),
1260 }
1261 }))))
1262 }
1263
1264 /// Executes a `ClickHouse` query and returns the first row, discarding the rest.
1265 ///
1266 /// This method sends a query to `ClickHouse` and returns the first row deserialized
1267 /// into type `T` implementing [`Row`], or `None` if the result is empty. It is
1268 /// useful for queries expected to return a single row (e.g., `SELECT COUNT(*)`).
1269 /// For streaming multiple rows, use [`Client::query`].
1270 ///
1271 /// Progress and profile events are dispatched to the client's event channel (see
1272 /// [`Client::subscribe_events`]).
1273 ///
1274 /// # Parameters
1275 /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1276 /// - `qid`: Optional query ID for tracking and debugging.
1277 ///
1278 /// # Returns
1279 /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1280 /// `None` if no rows are returned.
1281 ///
1282 /// # Errors
1283 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1284 /// - Fails if row deserialization fails (e.g., schema mismatch).
1285 /// - Fails if the connection to `ClickHouse` is interrupted.
1286 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1287 ///
1288 /// # Examples
1289 /// ```rust,ignore
1290 /// use clickhouse_arrow::prelude::*;
1291 ///
1292 /// let client = Client::builder()
1293 /// .with_endpoint("localhost:9000")
1294 /// .build_native()
1295 /// .await
1296 /// .unwrap();
1297 ///
1298 /// // Assume `MyRow` implements `Row`
1299 /// let row = client.query_one::<MyRow>("SELECT name FROM users WHERE id = 1", None)
1300 /// .await
1301 /// .unwrap();
1302 /// if let Some(row) = row {
1303 /// println!("Found row: {:?}", row);
1304 /// }
1305 /// ```
1306 #[instrument(
1307 name = "clickhouse.query_one",
1308 skip_all,
1309 fields(
1310 db.system = "clickhouse",
1311 db.operation = "query",
1312 db.format = NativeFormat::FORMAT,
1313 clickhouse.client.id = self.client_id,
1314 clickhouse.query.id
1315 )
1316 )]
1317 pub async fn query_one<T: Row + Send + 'static>(
1318 &self,
1319 query: impl Into<ParsedQuery>,
1320 qid: Option<Qid>,
1321 ) -> Result<Option<T>> {
1322 self.query_one_params(query, None, qid).await
1323 }
1324
1325 /// Executes a `ClickHouse` query with parameters and returns the first row, discarding the
1326 /// rest.
1327 ///
1328 /// # Parameters
1329 /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1330 /// - `params`: The query parameters to provide
1331 /// - `qid`: Optional query ID for tracking and debugging.
1332 ///
1333 /// # Returns
1334 /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1335 /// `None` if no rows are returned.
1336 ///
1337 /// # Errors
1338 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1339 /// - Fails if row deserialization fails (e.g., schema mismatch).
1340 /// - Fails if the connection to `ClickHouse` is interrupted.
1341 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1342 ///
1343 /// # Examples
1344 /// ```rust,ignore
1345 /// use clickhouse_arrow::prelude::*;
1346 ///
1347 /// let client = Client::builder()
1348 /// .with_endpoint("localhost:9000")
1349 /// .build_native()
1350 /// .await
1351 /// .unwrap();
1352 ///
1353 /// // Assume `MyRow` implements `Row`
1354 /// let params = Some(vec![
1355 /// ("str", ParamValue::from("name")),
1356 /// ].into());
1357 /// let query = "SELECT {str:String} FROM users WHERE id = 1";
1358 /// let row = client.query_one_params::<MyRow>(query, params, None)
1359 /// .await
1360 /// .unwrap();
1361 /// if let Some(row) = row {
1362 /// println!("Found row: {:?}", row);
1363 /// }
1364 /// ```
1365 #[instrument(
1366 name = "clickhouse.query_one_params",
1367 skip_all,
1368 fields(
1369 db.system = "clickhouse",
1370 db.operation = "query",
1371 db.format = NativeFormat::FORMAT,
1372 clickhouse.client.id = self.client_id,
1373 clickhouse.query.id
1374 )
1375 )]
1376 pub async fn query_one_params<T: Row + Send + 'static>(
1377 &self,
1378 query: impl Into<ParsedQuery>,
1379 params: Option<QueryParams>,
1380 qid: Option<Qid>,
1381 ) -> Result<Option<T>> {
1382 let mut stream = self.query_params::<T>(query, params, qid).await?;
1383 stream.next().await.transpose()
1384 }
1385
1386 /// Creates a `ClickHouse` table from a Rust struct that implements the `Row` trait.
1387 ///
1388 /// This method generates and executes a `CREATE TABLE` DDL statement based on the
1389 /// structure of the provided `Row` type. The table schema is automatically derived
1390 /// from the struct fields and their types.
1391 ///
1392 /// # Arguments
1393 /// * `database` - Optional database name. If None, uses the client's default database
1394 /// * `table` - The name of the table to create
1395 /// * `options` - Table creation options including engine type, order by, and partition by
1396 /// * `query_id` - Optional query ID for tracking and debugging
1397 ///
1398 /// # Type Parameters
1399 /// * `T` - A type that implements the `Row` trait, typically a struct with the `#[derive(Row)]`
1400 /// macro
1401 ///
1402 /// # Example
1403 /// ```ignore
1404 /// #[derive(Row)]
1405 /// struct User {
1406 /// id: u32,
1407 /// name: String,
1408 /// created_at: DateTime,
1409 /// }
1410 ///
1411 /// let options = CreateOptions::new("MergeTree")
1412 /// .with_order_by(&["id"]);
1413 ///
1414 /// client.create_table::<User>(Some("analytics"), "users", &options, None).await?;
1415 /// ```
1416 ///
1417 /// # Errors
1418 /// - Returns an error if the table creation fails
1419 /// - Returns an error if the database/table names are invalid
1420 /// - Returns an error if the connection is lost
1421 #[instrument(
1422 name = "clickhouse.create_table",
1423 skip_all
1424 fields(
1425 db.system = "clickhouse",
1426 db.operation = "create.table",
1427 db.format = NativeFormat::FORMAT,
1428 )
1429 )]
1430 pub async fn create_table<T: Row>(
1431 &self,
1432 database: Option<&str>,
1433 table: &str,
1434 options: &CreateOptions,
1435 qid: Option<Qid>,
1436 ) -> Result<()> {
1437 let database = database.unwrap_or(self.connection.database());
1438 let stmt = create_table_statement_from_native::<T>(Some(database), table, options)?;
1439 self.execute(stmt, qid).await?;
1440 Ok(())
1441 }
1442}
1443
1444impl Client<ArrowFormat> {
1445 /// Executes a `ClickHouse` query and streams Arrow [`RecordBatch`] results.
1446 ///
1447 /// This method sends a query to `ClickHouse` and returns a stream of [`RecordBatch`]
1448 /// instances, each containing a chunk of the query results in Apache Arrow format.
1449 /// Use this method for efficient integration with Arrow-based data processing
1450 /// pipelines. For row-based access, consider [`Client::query_rows`].
1451 ///
1452 /// Progress and profile events are dispatched to the client's event channel (see
1453 /// [`Client::subscribe_events`]).
1454 ///
1455 /// # Parameters
1456 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1457 /// - `qid`: Optional query ID for tracking and debugging.
1458 ///
1459 /// # Returns
1460 /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1461 /// query results.
1462 ///
1463 /// # Errors
1464 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1465 /// - Fails if the connection to `ClickHouse` is interrupted.
1466 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1467 ///
1468 /// # Examples
1469 /// ```rust,ignore
1470 /// use clickhouse_arrow::prelude::*;
1471 ///
1472 /// let client = Client::builder()
1473 /// .with_endpoint("localhost:9000")
1474 /// .build_arrow()
1475 /// .await
1476 /// .unwrap();
1477 ///
1478 /// let mut response = client.query("SELECT * FROM my_table", None).await.unwrap();
1479 /// while let Some(batch) = response.next().await {
1480 /// let batch = batch.unwrap();
1481 /// println!("Received batch with {} rows", batch.num_rows());
1482 /// }
1483 /// ```
1484 #[instrument(
1485 skip_all,
1486 fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1487 )]
1488 pub async fn query(
1489 &self,
1490 query: impl Into<ParsedQuery>,
1491 qid: Option<Qid>,
1492 ) -> Result<ClickHouseResponse<RecordBatch>> {
1493 self.query_params(query, None, qid).await
1494 }
1495
1496 /// Executes a `ClickHouse` query with parameters and streams Arrow [`RecordBatch`] results.
1497 ///
1498 /// # Parameters
1499 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1500 /// - `params`: The query parameters to provide
1501 /// - `qid`: Optional query ID for tracking and debugging.
1502 ///
1503 /// # Returns
1504 /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1505 /// query results.
1506 ///
1507 /// # Errors
1508 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1509 /// - Fails if the connection to `ClickHouse` is interrupted.
1510 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1511 ///
1512 /// # Examples
1513 /// ```rust,ignore
1514 /// use clickhouse_arrow::prelude::*;
1515 ///
1516 /// let client = Client::builder()
1517 /// .with_endpoint("localhost:9000")
1518 /// .build_arrow()
1519 /// .await
1520 /// .unwrap();
1521 ///
1522 /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1523 /// let query = "SELECT * FROM {name:Identifier}";
1524 /// let mut response = client.query_params(query, params, None).await.unwrap();
1525 /// while let Some(batch) = response.next().await {
1526 /// let batch = batch.unwrap();
1527 /// println!("Received batch with {} rows", batch.num_rows());
1528 /// }
1529 /// ```
1530 #[instrument(
1531 skip_all,
1532 fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1533 )]
1534 pub async fn query_params(
1535 &self,
1536 query: impl Into<ParsedQuery>,
1537 params: Option<QueryParams>,
1538 qid: Option<Qid>,
1539 ) -> Result<ClickHouseResponse<RecordBatch>> {
1540 let (query, qid) = record_query(qid, query.into(), self.client_id);
1541 Ok(ClickHouseResponse::new(Box::pin(self.query_raw(query, params, qid).await?)))
1542 }
1543
1544 /// Executes a `ClickHouse` query and streams rows as column-major values.
1545 ///
1546 /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1547 /// each row is represented as a `Vec<Value>` containing column values. The data is
1548 /// transposed from Arrow [`RecordBatch`] format to row-major format, making it
1549 /// convenient for row-based processing. For direct Arrow access, use
1550 /// [`Client::query`].
1551 ///
1552 /// Progress and profile events are dispatched to the client's event channel (see
1553 /// [`Client::subscribe_events`]).
1554 ///
1555 /// # Parameters
1556 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1557 /// - `qid`: Optional query ID for tracking and debugging.
1558 ///
1559 /// # Returns
1560 /// A [`Result`] containing a [`ClickHouseResponse<Vec<Value>>`] that streams rows.
1561 ///
1562 /// # Errors
1563 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1564 /// - Fails if the connection to `ClickHouse` is interrupted.
1565 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1566 ///
1567 /// # Examples
1568 /// ```rust,ignore
1569 /// use clickhouse_arrow::prelude::*;
1570 ///
1571 /// let client = Client::builder()
1572 /// .with_endpoint("localhost:9000")
1573 /// .build_arrow()
1574 /// .await
1575 /// .unwrap();
1576 ///
1577 /// let mut response = client.query_rows("SELECT * FROM my_table", None).await.unwrap();
1578 /// while let Some(row) = response.next().await {
1579 /// let row = row.unwrap();
1580 /// println!("Row values: {:?}", row);
1581 /// }
1582 /// ```
1583 #[instrument(
1584 name = "clickhouse.query_rows",
1585 fields(
1586 db.system = "clickhouse",
1587 db.operation = "query",
1588 db.format = ArrowFormat::FORMAT,
1589 clickhouse.client.id = self.client_id,
1590 clickhouse.query.id
1591 ),
1592 skip_all
1593 )]
1594 pub async fn query_rows(
1595 &self,
1596 query: impl Into<ParsedQuery>,
1597 qid: Option<Qid>,
1598 ) -> Result<ClickHouseResponse<Vec<Value>>> {
1599 let (query, qid) = record_query(qid, query.into(), self.client_id);
1600 let connection = self.conn().await?;
1601
1602 // Create metadata channel
1603 let (tx, rx) = oneshot::channel();
1604 let (header_tx, header_rx) = oneshot::channel();
1605
1606 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1607 let conn_idx = connection
1608 .send_operation(
1609 Operation::Query {
1610 query,
1611 settings: self.settings.clone(),
1612 // TODO: Add arg for params
1613 params: None,
1614 response: tx,
1615 header: Some(header_tx),
1616 },
1617 qid,
1618 true,
1619 )
1620 .await?;
1621
1622 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
1623 let responses = rx
1624 .await
1625 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1626 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1627
1628 let header = header_rx
1629 .await
1630 .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1631
1632 let response = create_response_stream::<ArrowFormat>(responses, qid, self.client_id)
1633 .map(move |batch| (header.clone(), batch))
1634 .map(|(header, batch)| {
1635 let batch = batch?;
1636 let batch_iter = batch_to_rows(&batch, Some(&header))?;
1637 Ok::<_, Error>(stream::iter(batch_iter))
1638 })
1639 .try_flatten();
1640
1641 // Decrement load balancer
1642 #[cfg(feature = "inner_pool")]
1643 connection.finish(conn_idx, Operation::<RecordBatch>::weight_insert_many());
1644
1645 Ok(ClickHouseResponse::from_stream(response))
1646 }
1647
1648 /// Executes a `ClickHouse` query and returns the first column of the first batch.
1649 ///
1650 /// This method sends a query to `ClickHouse` and returns the first column of the
1651 /// first [`RecordBatch`] as an Arrow [`ArrayRef`], or `None` if the result is empty.
1652 /// It is useful for queries that return a single column (e.g., `SELECT id FROM
1653 /// my_table`). For full batch access, use [`Client::query`].
1654 ///
1655 /// Progress and profile events are dispatched to the client's event channel (see
1656 /// [`Client::subscribe_events`]).
1657 ///
1658 /// # Parameters
1659 /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1660 /// - `qid`: Optional query ID for tracking and debugging.
1661 ///
1662 /// # Returns
1663 /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1664 /// the first batch, or `None` if no data is returned.
1665 ///
1666 /// # Errors
1667 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1668 /// - Fails if the connection to `ClickHouse` is interrupted.
1669 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1670 ///
1671 /// # Examples
1672 /// ```rust,ignore
1673 /// use clickhouse_arrow::prelude::*;
1674 ///
1675 /// let client = Client::builder()
1676 /// .with_endpoint("localhost:9000")
1677 /// .build_arrow()
1678 /// .await
1679 /// .unwrap();
1680 ///
1681 /// let column = client.query_column("SELECT id FROM my_table", None)
1682 /// .await
1683 /// .unwrap();
1684 /// if let Some(col) = column {
1685 /// println!("Column data: {:?}", col);
1686 /// }
1687 /// ```
1688 #[instrument(
1689 name = "clickhouse.query_column",
1690 skip_all,
1691 fields(
1692 db.system = "clickhouse",
1693 db.operation = "query",
1694 db.format = ArrowFormat::FORMAT,
1695 clickhouse.client.id = self.client_id,
1696 clickhouse.query.id
1697 )
1698 )]
1699 pub async fn query_column(
1700 &self,
1701 query: impl Into<ParsedQuery>,
1702 qid: Option<Qid>,
1703 ) -> Result<Option<ArrayRef>> {
1704 self.query_column_params(query, None, qid).await
1705 }
1706
1707 /// Executes a `ClickHouse` query with parameters and returns the first column of the first
1708 /// batch.
1709 ///
1710 /// # Parameters
1711 /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1712 /// - `params`: The query parameters to provide
1713 /// - `qid`: Optional query ID for tracking and debugging.
1714 ///
1715 /// # Returns
1716 /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1717 /// the first batch, or `None` if no data is returned.
1718 ///
1719 /// # Errors
1720 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1721 /// - Fails if the connection to `ClickHouse` is interrupted.
1722 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1723 ///
1724 /// # Examples
1725 /// ```rust,ignore
1726 /// use clickhouse_arrow::prelude::*;
1727 ///
1728 /// let client = Client::builder()
1729 /// .with_endpoint("localhost:9000")
1730 /// .build_arrow()
1731 /// .await
1732 /// .unwrap();
1733 ///
1734 /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1735 /// let query = "SELECT id FROM {name:Identifier}";
1736 /// let column = client.query_column_params("SELECT id FROM my_table", params, None)
1737 /// .await
1738 /// .unwrap();
1739 /// if let Some(col) = column {
1740 /// println!("Column data: {:?}", col);
1741 /// }
1742 /// ```
1743 #[instrument(
1744 name = "clickhouse.query_column_params",
1745 skip_all,
1746 fields(
1747 db.system = "clickhouse",
1748 db.operation = "query",
1749 db.format = ArrowFormat::FORMAT,
1750 clickhouse.client.id = self.client_id,
1751 clickhouse.query.id
1752 )
1753 )]
1754 pub async fn query_column_params(
1755 &self,
1756 query: impl Into<ParsedQuery>,
1757 params: Option<QueryParams>,
1758 qid: Option<Qid>,
1759 ) -> Result<Option<ArrayRef>> {
1760 let mut stream = self.query_params(query, params, qid).await?;
1761 let Some(batch) = stream.next().await.transpose()? else {
1762 return Ok(None);
1763 };
1764
1765 if batch.num_rows() == 0 { Ok(None) } else { Ok(Some(Arc::clone(batch.column(0)))) }
1766 }
1767
1768 /// Executes a `ClickHouse` query and returns the first row as a [`RecordBatch`].
1769 ///
1770 /// This method sends a query to `ClickHouse` and returns the first row of the first
1771 /// [`RecordBatch`], or `None` if the result is empty. The returned [`RecordBatch`]
1772 /// contains a single row. It is useful for queries expected to return a single row
1773 /// (e.g., `SELECT * FROM users WHERE id = 1`). For streaming multiple rows, use
1774 /// [`Client::query`].
1775 ///
1776 /// Progress and profile events are dispatched to the client's event channel (see
1777 /// [`Client::subscribe_events`]).
1778 ///
1779 /// # Parameters
1780 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1781 /// - `qid`: Optional query ID for tracking and debugging.
1782 ///
1783 /// # Returns
1784 /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1785 /// `None` if no rows are returned.
1786 ///
1787 /// # Errors
1788 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1789 /// - Fails if the connection to `ClickHouse` is interrupted.
1790 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1791 ///
1792 /// # Examples
1793 /// ```rust,ignore
1794 /// use clickhouse_arrow::prelude::*;
1795 ///
1796 /// let client = Client::builder()
1797 /// .with_endpoint("localhost:9000")
1798 /// .build_arrow()
1799 /// .await
1800 /// .unwrap();
1801 ///
1802 /// let batch = client.query_one("SELECT * FROM users WHERE id = 1", None)
1803 /// .await
1804 /// .unwrap();
1805 /// if let Some(row) = batch {
1806 /// println!("Row data: {:?}", row);
1807 /// }
1808 /// ```
1809 #[instrument(
1810 name = "clickhouse.query_one",
1811 skip_all
1812 fields(
1813 db.system = "clickhouse",
1814 db.operation = "query",
1815 db.format = ArrowFormat::FORMAT,
1816 clickhouse.client.id = self.client_id,
1817 clickhouse.query.id
1818 )
1819 )]
1820 pub async fn query_one(
1821 &self,
1822 query: impl Into<ParsedQuery>,
1823 qid: Option<Qid>,
1824 ) -> Result<Option<RecordBatch>> {
1825 self.query_one_params(query, None, qid).await
1826 }
1827
1828 /// Executes a `ClickHouse` query with parameters and returns the first row as a
1829 /// [`RecordBatch`].
1830 ///
1831 /// # Parameters
1832 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1833 /// - `params`: The query parameters to provide
1834 /// - `qid`: Optional query ID for tracking and debugging.
1835 ///
1836 /// # Returns
1837 /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1838 /// `None` if no rows are returned.
1839 ///
1840 /// # Errors
1841 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1842 /// - Fails if the connection to `ClickHouse` is interrupted.
1843 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1844 ///
1845 /// # Examples
1846 /// ```rust,ignore
1847 /// use clickhouse_arrow::prelude::*;
1848 ///
1849 /// let client = Client::builder()
1850 /// .with_endpoint("localhost:9000")
1851 /// .build_arrow()
1852 /// .await
1853 /// .unwrap();
1854 ///
1855 /// let params = Some(vec![("id", ParamValue::from(1))]);
1856 /// let batch = client.query_one_params("SELECT * FROM users WHERE id = {id:UInt64}", None)
1857 /// .await
1858 /// .unwrap();
1859 /// if let Some(row) = batch {
1860 /// println!("Row data: {:?}", row);
1861 /// }
1862 /// ```
1863 #[instrument(
1864 name = "clickhouse.query_one_params",
1865 skip_all
1866 fields(
1867 db.system = "clickhouse",
1868 db.operation = "query",
1869 db.format = ArrowFormat::FORMAT,
1870 clickhouse.client.id = self.client_id,
1871 clickhouse.query.id
1872 )
1873 )]
1874 pub async fn query_one_params(
1875 &self,
1876 query: impl Into<ParsedQuery>,
1877 params: Option<QueryParams>,
1878 qid: Option<Qid>,
1879 ) -> Result<Option<RecordBatch>> {
1880 let stream = self.query_params(query, params, qid).await?;
1881 tokio::pin!(stream);
1882
1883 let Some(batch) = stream.next().await.transpose()? else {
1884 return Ok(None);
1885 };
1886
1887 if batch.num_rows() == 0 {
1888 Ok(None)
1889 } else {
1890 Ok(Some(take_record_batch(&batch, &arrow::array::UInt32Array::from(vec![0]))?))
1891 }
1892 }
1893
1894 /// Fetches the list of database names (schemas) in `ClickHouse`.
1895 ///
1896 /// This method queries `ClickHouse` to retrieve the names of all databases
1897 /// accessible to the client. It is useful for exploring the database structure or
1898 /// validating database existence before performing operations.
1899 ///
1900 /// # Parameters
1901 /// - `qid`: Optional query ID for tracking and debugging.
1902 ///
1903 /// # Returns
1904 /// A [`Result`] containing a `Vec<String>` of database names.
1905 ///
1906 /// # Errors
1907 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
1908 /// - Fails if the connection to `ClickHouse` is interrupted.
1909 ///
1910 /// # Examples
1911 /// ```rust,ignore
1912 /// use clickhouse_arrow::prelude::*;
1913 ///
1914 /// let client = Client::builder()
1915 /// .with_endpoint("localhost:9000")
1916 /// .build_arrow()
1917 /// .await
1918 /// .unwrap();
1919 ///
1920 /// let schemas = client.fetch_schemas(None).await.unwrap();
1921 /// println!("Databases: {:?}", schemas);
1922 /// ```
1923 #[instrument(
1924 name = "clickhouse.fetch_schemas",
1925 skip_all
1926 fields(
1927 db.system = "clickhouse",
1928 db.operation = "query",
1929 db.format = ArrowFormat::FORMAT,
1930 clickhouse.client.id = self.client_id,
1931 clickhouse.query.id
1932 )
1933 )]
1934 pub async fn fetch_schemas(&self, qid: Option<Qid>) -> Result<Vec<String>> {
1935 crate::arrow::schema::fetch_databases(self, qid).await
1936 }
1937
1938 /// Fetches all tables across all databases in `ClickHouse`.
1939 ///
1940 /// This method queries `ClickHouse` to retrieve a mapping of database names to
1941 /// their table names. It is useful for discovering the full schema structure of
1942 /// the `ClickHouse` instance.
1943 ///
1944 /// # Parameters
1945 /// - `qid`: Optional query ID for tracking and debugging.
1946 ///
1947 /// # Returns
1948 /// A [`Result`] containing a `HashMap<String, Vec<String>>`, where each key is a
1949 /// database name and the value is a list of table names in that database.
1950 ///
1951 /// # Errors
1952 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
1953 /// - Fails if the connection to `ClickHouse` is interrupted.
1954 ///
1955 /// # Examples
1956 /// ```rust,ignore
1957 /// use clickhouse_arrow::prelude::*;
1958 ///
1959 /// let client = Client::builder()
1960 /// .with_endpoint("localhost:9000")
1961 /// .build_arrow()
1962 /// .await
1963 /// .unwrap();
1964 ///
1965 /// let tables = client.fetch_all_tables(None).await.unwrap();
1966 /// for (db, tables) in tables {
1967 /// println!("Database {} has tables: {:?}", db, tables);
1968 /// }
1969 /// ```
1970 #[instrument(
1971 name = "clickhouse.fetch_all_tables",
1972 skip_all
1973 fields(
1974 db.system = "clickhouse",
1975 db.operation = "query",
1976 db.format = ArrowFormat::FORMAT,
1977 clickhouse.client.id = self.client_id,
1978 clickhouse.query.id
1979 )
1980 )]
1981 pub async fn fetch_all_tables(&self, qid: Option<Qid>) -> Result<HashMap<String, Vec<String>>> {
1982 crate::arrow::schema::fetch_all_tables(self, qid).await
1983 }
1984
1985 /// Fetches the list of table names in a specific `ClickHouse` database.
1986 ///
1987 /// This method queries `ClickHouse` to retrieve the names of all tables in the
1988 /// specified database (or the client's default database if `None`). It is useful
1989 /// for exploring the schema of a specific database.
1990 ///
1991 /// # Parameters
1992 /// - `database`: Optional database name. If `None`, uses the client's default database.
1993 /// - `qid`: Optional query ID for tracking and debugging.
1994 ///
1995 /// # Returns
1996 /// A [`Result`] containing a `Vec<String>` of table names.
1997 ///
1998 /// # Errors
1999 /// - Fails if the database does not exist or is inaccessible.
2000 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2001 /// - Fails if the connection to `ClickHouse` is interrupted.
2002 ///
2003 /// # Examples
2004 /// ```rust,ignore
2005 /// use clickhouse_arrow::prelude::*;
2006 ///
2007 /// let client = Client::builder()
2008 /// .with_endpoint("localhost:9000")
2009 /// .build_arrow()
2010 /// .await
2011 /// .unwrap();
2012 ///
2013 /// let tables = client.fetch_tables(Some("my_db"), None).await.unwrap();
2014 /// println!("Tables in my_db: {:?}", tables);
2015 /// ```
2016 #[instrument(
2017 name = "clickhouse.fetch_tables",
2018 skip_all
2019 fields(
2020 db.system = "clickhouse",
2021 db.operation = "query",
2022 db.format = ArrowFormat::FORMAT,
2023 clickhouse.client.id = self.client_id,
2024 clickhouse.query.id
2025 )
2026 )]
2027 pub async fn fetch_tables(
2028 &self,
2029 database: Option<&str>,
2030 qid: Option<Qid>,
2031 ) -> Result<Vec<String>> {
2032 let database = database.unwrap_or(self.connection.database());
2033 crate::arrow::schema::fetch_tables(self, database, qid).await
2034 }
2035
2036 /// Fetches the schema of specified tables in a `ClickHouse` database.
2037 ///
2038 /// This method queries `ClickHouse` to retrieve the Arrow schemas of the specified
2039 /// tables in the given database (or the client's default database if `None`). If
2040 /// the `tables` list is empty, it fetches schemas for all tables in the database.
2041 /// The result is a mapping of table names to their corresponding Arrow [`SchemaRef`].
2042 ///
2043 /// # Parameters
2044 /// - `database`: Optional database name. If `None`, uses the client's default database.
2045 /// - `tables`: A list of table names to fetch schemas for. An empty list fetches all tables.
2046 /// - `qid`: Optional query ID for tracking and debugging.
2047 ///
2048 /// # Returns
2049 /// A [`Result`] containing a `HashMap<String, SchemaRef>`, mapping table names to
2050 /// their schemas.
2051 ///
2052 /// # Errors
2053 /// - Fails if the database or any table does not exist or is inaccessible.
2054 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2055 /// - Fails if the connection to `ClickHouse` is interrupted.
2056 ///
2057 /// # Examples
2058 /// ```rust,ignore
2059 /// use clickhouse_arrow::prelude::*;
2060 ///
2061 /// let client = Client::builder()
2062 /// .with_endpoint("localhost:9000")
2063 /// .build_arrow()
2064 /// .await
2065 /// .unwrap();
2066 ///
2067 /// let schemas = client.fetch_schema(Some("my_db"), &["my_table"], None)
2068 /// .await
2069 /// .unwrap();
2070 /// for (table, schema) in schemas {
2071 /// println!("Table {} schema: {:?}", table, schema);
2072 /// }
2073 /// ```
2074 #[instrument(
2075 name = "clickhouse.fetch_schema",
2076 skip_all
2077 fields(
2078 db.system = "clickhouse",
2079 db.operation = "query",
2080 db.format = ArrowFormat::FORMAT,
2081 clickhouse.client.id = self.client_id,
2082 clickhouse.query.id
2083 )
2084 )]
2085 pub async fn fetch_schema(
2086 &self,
2087 database: Option<&str>,
2088 tables: &[&str],
2089 qid: Option<Qid>,
2090 ) -> Result<HashMap<String, SchemaRef>> {
2091 let database = database.unwrap_or(self.connection.database());
2092 let options = self.connection.metadata().arrow_options;
2093 crate::arrow::schema::fetch_schema(self, database, tables, qid, options).await
2094 }
2095
2096 /// Issues a `CREATE TABLE` DDL statement for a table using Arrow schema.
2097 ///
2098 /// Creates a table in the specified database (or the client's default database if
2099 /// `None`) based on the provided Arrow [`SchemaRef`]. The `options` parameter allows
2100 /// customization of table properties, such as engine type and partitioning. This
2101 /// method is specific to [`ArrowClient`] for seamless integration with Arrow-based
2102 /// data pipelines.
2103 ///
2104 /// # Parameters
2105 /// - `database`: Optional database name. If `None`, uses the client's default database.
2106 /// - `table`: Name of the table to create.
2107 /// - `schema`: The Arrow schema defining the table's structure.
2108 /// - `options`: Configuration for table creation (e.g., engine, partitioning).
2109 /// - `qid`: Optional query ID for tracking and debugging.
2110 ///
2111 /// # Returns
2112 /// A [`Result`] indicating success or failure of the operation.
2113 ///
2114 /// # Errors
2115 /// - Fails if the provided schema is invalid or incompatible with `ClickHouse`.
2116 /// - Fails if the database does not exist or is inaccessible.
2117 /// - Fails if the query execution encounters a `ClickHouse` error.
2118 ///
2119 /// # Examples
2120 /// ```rust,ignore
2121 /// use clickhouse_arrow::prelude::*;
2122 /// use arrow::datatypes::{Schema, SchemaRef};
2123 ///
2124 /// let client = Client::builder()
2125 /// .with_endpoint("localhost:9000")
2126 /// .build_arrow()
2127 /// .await
2128 /// .unwrap();
2129 ///
2130 /// // Assume `schema` is a valid Arrow schema
2131 /// let schema: SchemaRef = Arc::new(Schema::new(vec![/* ... */]));
2132 /// let options = CreateOptions::default();
2133 /// client.create_table(Some("my_db"), "my_table", &schema, &options, None)
2134 /// .await
2135 /// .unwrap();
2136 /// ```
2137 #[instrument(
2138 name = "clickhouse.create_table",
2139 skip_all
2140 fields(
2141 db.system = "clickhouse",
2142 db.operation = "create.table",
2143 db.format = ArrowFormat::FORMAT,
2144 clickhouse.client.id = self.client_id,
2145 clickhouse.query.id
2146 )
2147 )]
2148 pub async fn create_table(
2149 &self,
2150 database: Option<&str>,
2151 table: &str,
2152 schema: &SchemaRef,
2153 options: &CreateOptions,
2154 qid: Option<Qid>,
2155 ) -> Result<()> {
2156 let database = database.unwrap_or(self.connection.database());
2157 let arrow_options = self.connection.metadata().arrow_options;
2158 let stmt = create_table_statement_from_arrow(
2159 Some(database),
2160 table,
2161 schema,
2162 options,
2163 Some(arrow_options),
2164 )?;
2165 self.execute(stmt, qid).await?;
2166 Ok(())
2167 }
2168}
2169
2170impl<T: ClientFormat> Drop for Client<T> {
2171 fn drop(&mut self) {
2172 trace!({ ATT_CID } = self.client_id, "Client dropped");
2173 }
2174}
2175
2176/// Simple helper to log query id and client id
2177fn record_query(qid: Option<Qid>, query: ParsedQuery, cid: u16) -> (String, Qid) {
2178 let qid = qid.unwrap_or_default();
2179 let _ = Span::current().record(ATT_QID, tracing::field::display(qid));
2180 let query = query.0;
2181 trace!(query, { ATT_CID } = cid, "Querying clickhouse");
2182 (query, qid)
2183}
2184
2185#[cfg(test)]
2186mod tests {
2187 use std::sync::Arc;
2188
2189 use arrow::array::Int32Array;
2190 use arrow::datatypes::{DataType, Field, Schema};
2191 use arrow::record_batch::RecordBatch;
2192
2193 use super::*;
2194
2195 #[test]
2196 fn test_record_query_function() {
2197 let query = ParsedQuery("SELECT 1".to_string());
2198 let cid = 123;
2199 let qid = Qid::new();
2200
2201 let (parsed_query, returned_qid) = record_query(Some(qid), query, cid);
2202
2203 assert_eq!(parsed_query, "SELECT 1");
2204 assert_eq!(returned_qid, qid);
2205 }
2206
2207 #[test]
2208 fn test_record_query_function_no_qid() {
2209 let query = ParsedQuery("SELECT 2".to_string());
2210 let cid = 456;
2211
2212 let (parsed_query, returned_qid) = record_query(None, query, cid);
2213
2214 assert_eq!(parsed_query, "SELECT 2");
2215 // Should generate a default Qid
2216 assert!(!returned_qid.to_string().is_empty());
2217 }
2218
2219 // Helper function to create a simple test RecordBatch
2220 fn create_test_record_batch() -> RecordBatch {
2221 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2222 let array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
2223 RecordBatch::try_new(schema, vec![array]).unwrap()
2224 }
2225
2226 #[tokio::test]
2227 async fn test_split_record_batch_function() {
2228 // Test the split_record_batch function that insert_max_rows uses
2229 let batch = create_test_record_batch();
2230 let max_rows = 2;
2231
2232 let batches = crate::arrow::utils::split_record_batch(batch, max_rows);
2233
2234 // Should split 5 rows into 3 batches: [2, 2, 1]
2235 assert_eq!(batches.len(), 3);
2236 assert_eq!(batches[0].num_rows(), 2);
2237 assert_eq!(batches[1].num_rows(), 2);
2238 assert_eq!(batches[2].num_rows(), 1);
2239 }
2240
2241 #[test]
2242 fn test_split_record_batch_edge_cases() {
2243 let batch = create_test_record_batch();
2244
2245 // Test with max_rows equal to batch size
2246 let batches = crate::arrow::utils::split_record_batch(batch.clone(), 5);
2247 assert_eq!(batches.len(), 1);
2248 assert_eq!(batches[0].num_rows(), 5);
2249
2250 // Test with max_rows larger than batch size
2251 let batches = crate::arrow::utils::split_record_batch(batch.clone(), 10);
2252 assert_eq!(batches.len(), 1);
2253 assert_eq!(batches[0].num_rows(), 5);
2254
2255 // Test with max_rows = 1
2256 let batches = crate::arrow::utils::split_record_batch(batch, 1);
2257 assert_eq!(batches.len(), 5);
2258 for batch in batches {
2259 assert_eq!(batch.num_rows(), 1);
2260 }
2261 }
2262}