clickhouse_arrow/
client.rs

1/// The `client` module provides the primary interface for interacting with `ClickHouse`
2/// over its native protocol, with full support for Apache Arrow interoperability.
3/// The main entry point is the [`Client`] struct, which supports both native `ClickHouse`
4/// data formats ([`NativeClient`]) and Arrow-compatible formats ([`ArrowClient`]).
5///
6/// This module is designed to be thread-safe, with [`Client`] instances that can be
7/// cloned and shared across threads. It supports querying, inserting data, managing
8/// database schemas, and handling `ClickHouse` events like progress and profiling.
9mod builder;
10mod chunk;
11#[cfg(feature = "cloud")]
12mod cloud;
13pub(crate) mod connection;
14mod internal;
15mod options;
16mod reader;
17mod response;
18mod tcp;
19mod writer;
20
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::sync::atomic::AtomicU16;
24
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::compute::take_record_batch;
27use arrow::datatypes::SchemaRef;
28use futures_util::{Stream, StreamExt, TryStreamExt, stream};
29use strum::AsRefStr;
30use tokio::sync::{broadcast, mpsc, oneshot};
31
32pub use self::builder::*;
33pub use self::connection::ConnectionStatus;
34pub(crate) use self::internal::{Message, Operation};
35pub use self::options::*;
36pub use self::response::*;
37pub use self::tcp::Destination;
38use crate::arrow::utils::batch_to_rows;
39use crate::constants::*;
40use crate::formats::{ClientFormat, NativeFormat};
41use crate::native::block::Block;
42use crate::native::protocol::{CompressionMethod, ProfileEvent};
43use crate::prelude::*;
44use crate::query::{ParsedQuery, QueryParams};
45use crate::schema::CreateOptions;
46use crate::{Error, Progress, Result, Row};
47
48static CLIENT_ID: AtomicU16 = AtomicU16::new(0);
49
50/// A `ClickHouse` client configured for the native format.
51///
52/// This type alias provides a client that works with `ClickHouse`'s internal
53/// data representation, useful when you need direct control over types
54/// and don't require Arrow integration.
55pub type NativeClient = Client<NativeFormat>;
56
57/// A `ClickHouse` client configured for Apache Arrow format.
58///
59/// This type alias provides a client that works with Arrow `RecordBatch`es,
60/// enabling seamless integration with the Arrow ecosystem for data processing
61/// and analytics workflows.
62pub type ArrowClient = Client<ArrowFormat>;
63
64/// Configuration for a `ClickHouse` connection, including tracing and cloud-specific settings.
65///
66/// This struct is used to pass optional context to [`Client::connect`], enabling features
67/// like distributed tracing or cloud instance tracking.
68///
69/// # Fields
70/// - `trace`: Optional tracing context for logging and monitoring.
71/// - `cloud`: Optional cloud-specific configuration (requires the `cloud` feature).
72#[derive(Debug, Clone, Default)]
73#[cfg_attr(not(feature = "cloud"), derive(Copy))]
74pub struct ConnectionContext {
75    pub trace: Option<TraceContext>,
76    #[cfg(feature = "cloud")]
77    pub cloud: Option<Arc<std::sync::atomic::AtomicBool>>,
78}
79
80/// Emitted clickhouse events from the underlying connection
81#[derive(Debug, Clone)]
82pub struct Event {
83    pub event:     ClickHouseEvent,
84    pub qid:       Qid,
85    pub client_id: u16,
86}
87
88/// Profile and progress events from clickhouse
89#[derive(Debug, Clone, AsRefStr)]
90pub enum ClickHouseEvent {
91    Progress(Progress),
92    Profile(Vec<ProfileEvent>),
93}
94
95/// A thread-safe handle for interacting with a `ClickHouse` database over its native protocol.
96///
97/// The `Client` struct is the primary interface for executing queries, inserting data, and
98/// managing database schemas. It supports two data formats:
99/// - [`NativeClient`]: Uses `ClickHouse`'s native [`Block`] format for data exchange.
100/// - [`ArrowClient`]: Uses Apache Arrow's [`RecordBatch`] for seamless interoperability with Arrow
101///   ecosystems.
102///
103/// `Client` instances are lightweight and can be cloned and shared across threads. Each instance
104/// maintains a reference to an underlying connection, which is managed automatically. The client
105/// also supports event subscription for receiving progress and profiling information from
106/// `ClickHouse`.
107///
108/// # Usage
109/// Create a `Client` using the [`ClientBuilder`] for a fluent configuration experience, or use
110/// [`Client::connect`] for direct connection setup.
111///
112/// # Examples
113/// ```rust,ignore
114/// use clickhouse_arrow::prelude::*;
115/// use clickhouse_arrow::arrow;
116/// use futures_util::StreamExt;
117///
118/// let client = Client::builder()
119///     .destination("localhost:9000")
120///     .username("default")
121///     .build::<ArrowFormat>()
122///     .await?;
123///
124/// // Execute a query
125/// let batch = client
126///     .query("SELECT 1")
127///     .await?
128///     .collect::<Vec<_>>()
129///     .await
130///     .into_iter()
131///     .collect::<Result<Vec<_>>>()?;
132/// arrow::util::pretty::print_batches(batch)?;
133/// ```
134#[derive(Clone, Debug)]
135pub struct Client<T: ClientFormat> {
136    pub client_id: u16,
137    connection:    Arc<connection::Connection<T>>,
138    events:        Arc<broadcast::Sender<Event>>,
139    settings:      Option<Arc<Settings>>,
140}
141
142impl<T: ClientFormat> Client<T> {
143    /// Get an instance of [`ClientBuilder`] which allows creating a `Client` using a builder
144    /// Creates a new [`ClientBuilder`] for configuring and building a `ClickHouse` client.
145    ///
146    /// This method provides a fluent interface to set up a `Client` with custom connection
147    /// parameters, such as the server address, credentials, TLS, and compression. The
148    /// builder can create either a single [`Client`] or a connection pool (with the `pool`
149    /// feature enabled).
150    ///
151    /// Use this method when you need fine-grained control over the client configuration.
152    /// For simple connections, you can also use [`Client::connect`] directly.
153    ///
154    /// # Returns
155    /// A [`ClientBuilder`] instance ready for configuration.
156    ///
157    /// # Examples
158    /// ```rust,ignore
159    /// use clickhouse_arrow::prelude::*;
160    ///
161    /// let builder = Client::builder()
162    ///     .with_endpoint("localhost:9000")
163    ///     .with_username("default")
164    ///     .with_password("");
165    /// ```
166    pub fn builder() -> ClientBuilder { ClientBuilder::new() }
167
168    /// Establishes a connection to a `ClickHouse` server over TCP, with optional TLS support.
169    ///
170    /// This method creates a new [`Client`] instance connected to the specified `destination`.
171    /// The connection can be configured using [`ClientOptions`], which allows setting parameters
172    /// like username, password, TLS, and compression. Optional `settings` can be provided to
173    /// customize `ClickHouse` session behavior, and a `context` can be used for tracing or
174    /// cloud-specific configurations.
175    ///
176    /// # Parameters
177    /// - `destination`: The `ClickHouse` server address (e.g., `"localhost:9000"` or a
178    ///   [`Destination`]).
179    /// - `options`: Configuration for the connection, including credentials, TLS, and cloud
180    ///   settings.
181    /// - `settings`: Optional `ClickHouse` session settings (e.g., query timeouts, max rows).
182    /// - `context`: Optional connection context for tracing or cloud-specific behavior.
183    ///
184    /// # Returns
185    /// A [`Result`] containing the connected [`Client`] instance, or an error if the connection
186    /// fails.
187    ///
188    /// # Errors
189    /// - Fails if the destination cannot be resolved or the connection cannot be established.
190    /// - Fails if authentication or TLS setup encounters an issue.
191    ///
192    /// # Examples
193    /// ```rust,ignore
194    /// use clickhouse_arrow::client::{Client, ClientOptions};
195    ///
196    /// let options = ClientOptions::default()
197    ///     .username("default")
198    ///     .password("")
199    ///     .use_tls(false);
200    ///
201    /// let client = Client::connect("localhost:9000", options, None, None).await?;
202    /// ```
203    #[instrument(
204        level = "debug",
205        name = "clickhouse.connect",
206        fields(
207            db.system = "clickhouse",
208            db.format = T::FORMAT,
209            network.transport = ?if options.use_tls { "tls" } else { "tcp" }
210        ),
211        skip_all
212    )]
213    pub async fn connect<A: Into<Destination>>(
214        destination: A,
215        options: ClientOptions,
216        settings: Option<Arc<Settings>>,
217        context: Option<ConnectionContext>,
218    ) -> Result<Self> {
219        let context = context.unwrap_or_default();
220        let trace_ctx = context.trace.unwrap_or_default();
221        let _ = trace_ctx.link(&Span::current());
222
223        let client_id = CLIENT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
224
225        // Resolve the destination
226        let destination: Destination = destination.into();
227        let addrs = destination.resolve(options.ipv4_only).await?;
228
229        #[cfg(feature = "cloud")]
230        {
231            // Ping the cloud instance if requested
232            if let Some(domain) = options.domain.as_ref().filter(|_| options.ext.cloud.wakeup) {
233                let cloud_track = context.cloud.as_deref();
234                Self::ping_cloud(domain, options.ext.cloud.timeout, cloud_track).await;
235            }
236        }
237
238        if let Some(addr) = addrs.first() {
239            let _ = Span::current()
240                .record("server.address", tracing::field::debug(&addr.ip()))
241                .record("server.port", addr.port());
242            debug!(server.address = %addr.ip(), server.port = addr.port(), "Initiating connection");
243        }
244
245        let (event_tx, _) = broadcast::channel(EVENTS_CAPACITY);
246        let events = Arc::new(event_tx);
247        let conn_ev = Arc::clone(&events);
248
249        let conn =
250            connection::Connection::connect(client_id, addrs, options, conn_ev, trace_ctx).await?;
251        let connection = Arc::new(conn);
252
253        debug!("created connection successfully");
254
255        Ok(Client { client_id, connection, events, settings })
256    }
257
258    /// Retrieves the status of the underlying `ClickHouse` connection.
259    ///
260    /// This method returns the current [`ConnectionStatus`] of the client's connection,
261    /// indicating whether it is active, idle, or disconnected. Useful for monitoring
262    /// the health of the connection before executing queries.
263    ///
264    /// # Returns
265    /// A [`ConnectionStatus`] enum describing the connection state.
266    ///
267    /// # Examples
268    /// ```rust,ignore
269    /// use clickhouse_arrow::prelude::*;
270    ///
271    /// let client = Client::<ArrowFormat>::builder()
272    ///     .with_endpoint("localhost:9000")
273    ///     .build()
274    ///     .await
275    ///     .unwrap();
276    ///
277    /// let status = client.status();
278    /// println!("Connection status: {status:?}");
279    /// ```
280    pub fn status(&self) -> ConnectionStatus { self.connection.status() }
281
282    /// Subscribes to progress and profile events from `ClickHouse` queries.
283    ///
284    /// This method returns a [`broadcast::Receiver`] that delivers [`Event`] instances
285    /// containing progress updates ([`Progress`]) or profiling information ([`ProfileEvent`])
286    /// as queries execute. Events are generated asynchronously and can be used to monitor
287    /// query execution in real time.
288    ///
289    /// # Returns
290    /// A [`broadcast::Receiver<Event>`] for receiving `ClickHouse` events.
291    ///
292    /// # Examples
293    /// ```rust,ignore
294    /// use clickhouse_arrow::prelude::*;
295    /// use tokio::sync::broadcast::error::RecvError;
296    ///
297    /// let client = Client::builder()
298    ///     .with_endpoint("localhost:9000")
299    ///     .build_arrow()
300    ///     .await
301    ///     .unwrap();
302    ///
303    /// let mut receiver = client.subscribe_events();
304    /// let handle = tokio::spawn(async move {
305    ///     while let Ok(event) = receiver.recv().await {
306    ///         println!("Received event: {:?}", event);
307    ///     }
308    /// });
309    ///
310    /// // Execute a query to generate events
311    /// client.query("SELECT * FROM large_table").await.unwrap();
312    /// ```
313    pub fn subscribe_events(&self) -> broadcast::Receiver<Event> { self.events.subscribe() }
314
315    /// Checks the health of the underlying `ClickHouse` connection.
316    ///
317    /// This method verifies that the connection is active and responsive. If `ping` is
318    /// `true`, it sends a lightweight ping to the `ClickHouse` server to confirm
319    /// connectivity. Otherwise, it checks the connection's internal state.
320    ///
321    /// # Parameters
322    /// - `ping`: If `true`, performs an active ping to the server; if `false`, checks the
323    ///   connection state without network activity.
324    ///
325    /// # Returns
326    /// A [`Result`] indicating whether the connection is healthy.
327    ///
328    /// # Errors
329    /// - Fails if the connection is disconnected or unresponsive.
330    /// - Fails if the ping operation times out or encounters a network error.
331    ///
332    /// # Examples
333    /// ```rust,ignore
334    /// use clickhouse_arrow::prelude::*;
335    ///
336    /// let client = Client::builder()
337    ///     .with_endpoint("localhost:9000")
338    ///     .build::<ArrowFormat>()
339    ///     .await
340    ///     .unwrap();
341    ///
342    /// client.health_check(true).await.unwrap();
343    /// println!("Connection is healthy!");
344    /// ```
345    pub async fn health_check(&self, ping: bool) -> Result<()> {
346        trace!({ ATT_CID } = self.client_id, "sending health check w/ ping={ping}");
347        self.conn().await?.check_connection(ping).await
348    }
349
350    /// Shuts down the `ClickHouse` client and closes its connection.
351    ///
352    /// This method gracefully terminates the underlying connection, ensuring that any
353    /// pending operations are completed or canceled. After shutdown, the client cannot
354    /// be used for further operations.
355    ///
356    /// # Returns
357    /// A [`Result`] indicating whether the shutdown was successful.
358    ///
359    /// # Errors
360    /// - Fails if the connection cannot be closed due to network issues or internal errors.
361    ///
362    /// # Examples
363    /// ```rust,ignore
364    /// use clickhouse_arrow::prelude::*;
365    ///
366    /// let client = Client::builder()
367    ///     .with_endpoint("localhost:9000")
368    ///     .build::<ArrowFormat>()
369    ///     .await
370    ///     .unwrap();
371    ///
372    /// client.shutdown().await.unwrap();
373    /// println!("Client shut down successfully!");
374    /// ```
375    pub async fn shutdown(&self) -> Result<()> {
376        trace!("shutting down client");
377        self.conn().await?.shutdown().await
378    }
379
380    /// Inserts a block of data into `ClickHouse` using the native protocol.
381    ///
382    /// This method sends an insert query with a single block of data, formatted according to
383    /// the client's data format (`T: ClientFormat`). For [`NativeClient`], the data is a
384    /// [`Block`]; for [`ArrowClient`], it is a [`RecordBatch`]. The query is executed
385    /// asynchronously, and any response data, progress events, or errors are streamed back.
386    ///
387    /// Progress and profile events are dispatched to the client's event channel (see
388    /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
389    /// error if the insert fails.
390    ///
391    /// # Parameters
392    /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
393    /// - `block`: The data to insert, in the format specified by `T` ([`Block`] or
394    ///   [`RecordBatch`]).
395    /// - `qid`: Optional query ID for tracking and debugging.
396    ///
397    /// # Returns
398    /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
399    /// the success or failure of processing response data.
400    ///
401    /// # Errors
402    /// - Fails if the query is malformed or the data format is invalid.
403    /// - Fails if the connection to `ClickHouse` is interrupted.
404    /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
405    ///
406    /// # Examples
407    /// ```rust,ignore
408    /// use clickhouse_arrow::prelude::*;
409    /// use arrow::record_batch::RecordBatch;
410    ///
411    /// let client = Client::builder()
412    ///     .destination("localhost:9000")
413    ///     .build_arrow()
414    ///     .await?;
415    ///
416    /// let qid = Qid::new();
417    /// // Assume `batch` is a valid RecordBatch
418    /// let batch: RecordBatch = // ...;
419    /// let stream = client.insert("INSERT INTO my_table VALUES", batch, Some(qid)).await?;
420    /// while let Some(result) = stream.next().await {
421    ///     result?; // Check for errors
422    /// }
423    /// ```
424    #[instrument(
425        level = "trace",
426        name = "clickhouse.insert",
427        skip_all
428        fields(
429            db.system = "clickhouse",
430            db.operation = "insert",
431            db.format = T::FORMAT,
432            clickhouse.client.id = self.client_id,
433            clickhouse.query.id
434        ),
435    )]
436    pub async fn insert(
437        &self,
438        query: impl Into<ParsedQuery>,
439        block: T::Data,
440        qid: Option<Qid>,
441    ) -> Result<impl Stream<Item = Result<()>> + '_> {
442        let (query, qid) = record_query(qid, query.into(), self.client_id);
443
444        // Create metadata channel
445        let (tx, rx) = oneshot::channel();
446        let connection = self.conn().await?;
447
448        // Send query
449        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
450        let conn_idx = connection
451            .send_operation(
452                Operation::Query {
453                    query,
454                    settings: self.settings.clone(),
455                    params: None,
456                    response: tx,
457                    header: None,
458                },
459                qid,
460                false,
461            )
462            .await?;
463
464        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
465        let responses = rx
466            .await
467            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
468            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
469
470        // Send data
471        let (tx, rx) = oneshot::channel();
472        let _ = connection
473            .send_operation(Operation::Insert { data: block, response: tx }, qid, true)
474            .await?;
475        rx.await.map_err(|_| {
476            Error::Protocol(format!("Failed to receive response from insert {qid}"))
477        })??;
478
479        // Decrement load balancer
480        #[cfg(feature = "inner_pool")]
481        connection.finish(conn_idx, Operation::<T::Data>::weight_insert());
482
483        Ok(self.insert_response(responses, qid))
484    }
485
486    /// Inserts multiple blocks of data into `ClickHouse` using the native protocol.
487    ///
488    /// This method sends an insert query with a collection of data blocks, formatted
489    /// according to the client's data format (`T: ClientFormat`). For [`NativeClient`],
490    /// the data is a `Vec<Block>`; for [`ArrowClient`], it is a `Vec<RecordBatch>`.
491    /// The query is executed asynchronously, and any response data, progress events,
492    /// or errors are streamed back.
493    ///
494    /// Progress and profile events are dispatched to the client's event channel (see
495    /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
496    /// error if the insert fails. Use this method when inserting multiple batches of
497    /// data to reduce overhead compared to multiple [`Client::insert`] calls.
498    ///
499    /// # Parameters
500    /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
501    /// - `batch`: A vector of data blocks to insert, in the format specified by `T`.
502    /// - `qid`: Optional query ID for tracking and debugging.
503    ///
504    /// # Returns
505    /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
506    /// the success or failure of processing response data.
507    ///
508    /// # Errors
509    /// - Fails if the query is malformed or any data block is invalid.
510    /// - Fails if the connection to `ClickHouse` is interrupted.
511    /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
512    ///
513    /// # Examples
514    /// ```rust,ignore
515    /// use clickhouse_arrow::prelude::*;
516    /// use arrow::record_batch::RecordBatch;
517    ///
518    /// let client = Client::builder()
519    ///     .with_endpoint("localhost:9000")
520    ///     .build::<ArrowFormat>()
521    ///     .await
522    ///     .unwrap();
523    ///
524    /// // Assume `batches` is a Vec<RecordBatch>
525    /// let batches: Vec<RecordBatch> = vec![/* ... */];
526    /// let stream = client.insert_many("INSERT INTO my_table VALUES", batches, None).await.unwrap();
527    /// while let Some(result) = stream.next().await {
528    ///     result.unwrap(); // Check for errors
529    /// }
530    /// ```
531    #[instrument(
532        name = "clickhouse.insert_many",
533        skip_all,
534        fields(
535            db.system = "clickhouse",
536            db.operation = "insert",
537            db.format = T::FORMAT,
538            clickhouse.client.id = self.client_id,
539            clickhouse.query.id
540        ),
541    )]
542    pub async fn insert_many(
543        &self,
544        query: impl Into<ParsedQuery>,
545        batch: Vec<T::Data>,
546        qid: Option<Qid>,
547    ) -> Result<impl Stream<Item = Result<()>> + '_> {
548        let (query, qid) = record_query(qid, query.into(), self.client_id);
549
550        // Create metadata channel
551        let (tx, rx) = oneshot::channel();
552        let connection = self.conn().await?;
553
554        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
555        let conn_idx = connection
556            .send_operation(
557                Operation::Query {
558                    query,
559                    settings: self.settings.clone(),
560                    params: None,
561                    response: tx,
562                    header: None,
563                },
564                qid,
565                false,
566            )
567            .await?;
568
569        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
570        let responses = rx
571            .await
572            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
573            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
574
575        // Send data
576        let (tx, rx) = oneshot::channel();
577        let _ = connection
578            .send_operation(Operation::InsertMany { data: batch, response: tx }, qid, true)
579            .await?;
580        rx.await.map_err(|_| {
581            Error::Protocol(format!("Failed to receive response from insert {qid}"))
582        })??;
583
584        // Decrement load balancer
585        #[cfg(feature = "inner_pool")]
586        connection.finish(conn_idx, Operation::<T::Data>::weight_insert_many());
587
588        Ok(self.insert_response(responses, qid))
589    }
590
591    /// Executes a raw `ClickHouse` query and streams raw data in the client's format.
592    ///
593    /// This method sends a query to `ClickHouse` and returns a stream of raw data blocks
594    /// in the format specified by `T: ClientFormat` ([`Block`] for [`NativeClient`],
595    /// [`RecordBatch`] for [`ArrowClient`]). It is a low-level method suitable for
596    /// custom processing of query results. For higher-level interfaces, consider
597    /// [`Client::query`] or [`Client::query_rows`].
598    ///
599    /// Progress and profile events are dispatched to the client's event channel (see
600    /// [`Client::subscribe_events`]).
601    ///
602    /// # Parameters
603    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
604    /// - `qid`: A unique query ID for tracking and debugging.
605    ///
606    /// # Returns
607    /// A [`Result`] containing a stream of [`Result<T::Data>`], where each item is a
608    /// data block or an error.
609    ///
610    /// # Errors
611    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
612    /// - Fails if the connection to `ClickHouse` is interrupted.
613    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
614    ///
615    /// # Examples
616    /// ```rust,ignore
617    /// use clickhouse_arrow::prelude::*;
618    ///
619    /// let client = Client::builder()
620    ///     .with_endpoint("localhost:9000")
621    ///     .build::<ArrowFormat>()
622    ///     .await
623    ///     .unwrap();
624    ///
625    /// let qid = Qid::new();
626    /// let mut stream = client.query_raw("SELECT * FROM my_table", qid).await.unwrap();
627    /// while let Some(block) = stream.next().await {
628    ///     let batch = block.unwrap();
629    ///     println!("Received batch with {} rows", batch.num_rows());
630    /// }
631    /// ```
632    #[instrument(
633        name = "clickhouse.query",
634        skip_all
635        fields(
636            db.system = "clickhouse",
637            db.operation = "query",
638            db.format = T::FORMAT,
639            clickhouse.client.id = self.client_id,
640            clickhouse.query.id = %qid
641        ),
642     )]
643    pub async fn query_raw<P: Into<QueryParams>>(
644        &self,
645        query: String,
646        params: Option<P>,
647        qid: Qid,
648    ) -> Result<impl Stream<Item = Result<T::Data>> + 'static> {
649        // Create metadata channel
650        let (tx, rx) = oneshot::channel();
651        let connection = self.conn().await?;
652
653        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
654        let conn_idx = connection
655            .send_operation(
656                Operation::Query {
657                    query,
658                    settings: self.settings.clone(),
659                    params: params.map(Into::into),
660                    response: tx,
661                    header: None,
662                },
663                qid,
664                true,
665            )
666            .await?;
667
668        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
669
670        let responses = rx
671            .await
672            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
673            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
674        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
675
676        // Decrement load balancer
677        #[cfg(feature = "inner_pool")]
678        connection.finish(conn_idx, Operation::<T::Data>::weight_query());
679
680        Ok(create_response_stream::<T>(responses, qid, self.client_id))
681    }
682
683    /// Executes a `ClickHouse` query and discards all returned data.
684    ///
685    /// This method sends a query to `ClickHouse` and processes the response stream to
686    /// check for errors, but discards any returned data blocks. It is useful for
687    /// queries that modify data (e.g., `INSERT`, `UPDATE`, `DELETE`) or DDL statements
688    /// where the result data is not needed. For queries that return data, use
689    /// [`Client::query`] or [`Client::query_raw`].
690    ///
691    /// # Parameters
692    /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
693    /// - `qid`: Optional query ID for tracking and debugging.
694    ///
695    /// # Returns
696    /// A [`Result`] indicating whether the query executed successfully.
697    ///
698    /// # Errors
699    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
700    /// - Fails if the connection to `ClickHouse` is interrupted.
701    /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
702    ///
703    /// # Examples
704    /// ```rust,ignore
705    /// use clickhouse_arrow::prelude::*;
706    ///
707    /// let client = Client::builder()
708    ///     .with_endpoint("localhost:9000")
709    ///     .build_arrow()
710    ///     .await
711    ///     .unwrap();
712    ///
713    /// client.execute("DROP TABLE IF EXISTS my_table", None).await.unwrap();
714    /// println!("Table dropped successfully!");
715    /// ```
716    #[instrument(
717        name = "clickhouse.execute",
718        skip_all,
719        fields(
720            db.system = "clickhouse",
721            db.format = T::FORMAT,
722            db.operation = "query",
723            clickhouse.client.id = self.client_id,
724            clickhouse.query.id
725        )
726    )]
727    pub async fn execute(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
728        self.execute_params(query, None::<QueryParams>, qid).await
729    }
730
731    /// Executes a `ClickHouse` query with query parameters and discards all returned data.
732    ///
733    /// # Parameters
734    /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
735    /// - `params`: The query parameters to provide
736    /// - `qid`: Optional query ID for tracking and debugging.
737    ///
738    /// # Returns
739    /// A [`Result`] indicating whether the query executed successfully.
740    ///
741    /// # Errors
742    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
743    /// - Fails if the connection to `ClickHouse` is interrupted.
744    /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
745    ///
746    /// # Examples
747    /// ```rust,ignore
748    /// use clickhouse_arrow::prelude::*;
749    ///
750    /// let client = Client::builder()
751    ///     .with_endpoint("localhost:9000")
752    ///     .build_arrow()
753    ///     .await
754    ///     .unwrap();
755    ///
756    /// let params = Some(vec![
757    ///     ("str", ParamValue::from("hello")),
758    ///     ("num", ParamValue::from(42)),
759    ///     ("array", ParamValue::from("['a', 'b', 'c']")),
760    /// ]);
761    /// let query = "SELECT {num:Int64}, {str:String}, {array:Array(String)}";
762    /// client.execute_params(query, params, None).await.unwrap();
763    /// println!("Table dropped successfully!");
764    /// ```
765    #[instrument(
766        name = "clickhouse.execute_params",
767        skip_all,
768        fields(
769            db.system = "clickhouse",
770            db.format = T::FORMAT,
771            db.operation = "query",
772            clickhouse.client.id = self.client_id,
773            clickhouse.query.id
774        )
775    )]
776    pub async fn execute_params<P: Into<QueryParams>>(
777        &self,
778        query: impl Into<ParsedQuery>,
779        params: Option<P>,
780        qid: Option<Qid>,
781    ) -> Result<()> {
782        let (query, qid) = record_query(qid, query.into(), self.client_id);
783        let stream = self.query_raw(query, params, qid).await?;
784        tokio::pin!(stream);
785        while let Some(next) = stream.next().await {
786            drop(next?);
787        }
788        Ok(())
789    }
790
791    /// Executes a `ClickHouse` query without processing the response stream.
792    ///
793    /// This method sends a query to `ClickHouse` and immediately discards the response
794    /// stream without checking for errors or processing data. It is a lightweight
795    /// alternative to [`Client::execute`], suitable for fire-and-forget scenarios where
796    /// the query's outcome is not critical. For safer execution, use [`Client::execute`].
797    ///
798    /// # Parameters
799    /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
800    /// - `qid`: Optional query ID for tracking and debugging.
801    ///
802    /// # Returns
803    /// A [`Result`] indicating whether the query was sent successfully.
804    ///
805    /// # Errors
806    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
807    /// - Fails if the connection to `ClickHouse` is interrupted.
808    ///
809    /// # Examples
810    /// ```rust,ignore
811    /// use clickhouse_arrow::prelude::*;
812    ///
813    /// let client = Client::builder()
814    ///     .with_endpoint("localhost:9000")
815    ///     .build::<ArrowFormat>()
816    ///     .await
817    ///     .unwrap();
818    ///
819    /// client.execute_now("INSERT INTO logs VALUES ('event')", None).await.unwrap();
820    /// println!("Log event sent!");
821    /// ```
822    #[instrument(
823        name = "clickhouse.execute_now",
824        skip_all,
825        fields(
826            db.system = "clickhouse",
827            db.format = T::FORMAT,
828            db.operation = "query",
829            clickhouse.client.id = self.client_id,
830            clickhouse.query.id
831        )
832    )]
833    pub async fn execute_now(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
834        self.execute_now_params(query, None::<QueryParams>, qid).await
835    }
836
837    /// Executes a `ClickHouse` query with query parameters without processing the response stream.
838    ///
839    /// # Parameters
840    /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
841    /// - `params`: The query parameters to provide
842    /// - `qid`: Optional query ID for tracking and debugging.
843    ///
844    /// # Returns
845    /// A [`Result`] indicating whether the query was sent successfully.
846    ///
847    /// # Errors
848    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
849    /// - Fails if the connection to `ClickHouse` is interrupted.
850    ///
851    /// # Examples
852    /// ```rust,ignore
853    /// use clickhouse_arrow::prelude::*;
854    ///
855    /// let client = Client::builder()
856    ///     .with_endpoint("localhost:9000")
857    ///     .build::<ArrowFormat>()
858    ///     .await
859    ///     .unwrap();
860    ///
861    ///
862    /// let params = Some(vec![("str", ParamValue::from("hello"))]);
863    /// let query = "INSERT INTO logs VALUES ({str:String})";
864    /// client.execute_now_params(query, params, None).await.unwrap();
865    /// println!("Log event sent!");
866    /// ```
867    #[instrument(
868        name = "clickhouse.execute_now_params",
869        skip_all,
870        fields(
871            db.system = "clickhouse",
872            db.format = T::FORMAT,
873            db.operation = "query",
874            clickhouse.client.id = self.client_id,
875            clickhouse.query.id
876        )
877    )]
878    pub async fn execute_now_params<P: Into<QueryParams>>(
879        &self,
880        query: impl Into<ParsedQuery>,
881        params: Option<P>,
882        qid: Option<Qid>,
883    ) -> Result<()> {
884        let (query, qid) = record_query(qid, query.into(), self.client_id);
885        drop(self.query_raw(query, params, qid).await?);
886        Ok(())
887    }
888
889    /// Creates a new database in `ClickHouse` using a DDL statement.
890    ///
891    /// This method issues a `CREATE DATABASE` statement for the specified database. If no
892    /// database is provided, it uses the client's default database from the connection
893    /// metadata. The `default` database cannot be created, as it is reserved by `ClickHouse`.
894    ///
895    /// # Parameters
896    /// - `database`: Optional name of the database to create. If `None`, uses the client's default
897    ///   database.
898    /// - `qid`: Optional query ID for tracking and debugging.
899    ///
900    /// # Returns
901    /// A [`Result`] indicating success or failure of the operation.
902    ///
903    /// # Errors
904    /// - Fails if the database name is invalid or reserved (e.g., `default`).
905    /// - Fails if the query execution encounters a `ClickHouse` error.
906    /// - Fails if the connection is interrupted.
907    ///
908    /// # Examples
909    /// ```rust,ignore
910    /// use clickhouse_arrow::client::{Client, ClientBuilder};
911    ///
912    /// let client = ClientBuilder::new()
913    ///     .destination("localhost:9000")
914    ///     .build_native()
915    ///     .await?;
916    ///
917    /// client.create_database(Some("my_db"), None).await?;
918    /// ```
919    #[instrument(
920        name = "clickhouse.create_database",
921        skip_all
922        fields(db.system = "clickhouse", db.operation = "create.database")
923    )]
924    pub async fn create_database(&self, database: Option<&str>, qid: Option<Qid>) -> Result<()> {
925        let database = database.unwrap_or(self.connection.database());
926        if database.eq_ignore_ascii_case("default") {
927            warn!("Exiting, cannot create `default` database");
928            return Ok(());
929        }
930
931        let stmt = create_db_statement(database)?;
932        self.execute(stmt, qid).await?;
933        Ok(())
934    }
935
936    /// Drops a database in `ClickHouse` using a DDL statement.
937    ///
938    /// This method issues a `DROP DATABASE` statement for the specified database. The
939    /// `default` database cannot be dropped, as it is reserved by `ClickHouse`. If the client
940    /// is connected to a non-default database, dropping a different database is not allowed
941    /// to prevent accidental data loss.
942    ///
943    /// # Parameters
944    /// - `database`: Name of the database to drop.
945    /// - `sync`: If `true`, the operation waits for `ClickHouse` to complete the drop
946    ///   synchronously.
947    /// - `qid`: Optional query ID for tracking and debugging.
948    ///
949    /// # Returns
950    /// A [`Result`] indicating success or failure of the operation.
951    ///
952    /// # Errors
953    /// - Fails if the database is `default` (reserved).
954    /// - Fails if the client is connected to a non-default database different from `database`.
955    /// - Fails if the query execution encounters a `ClickHouse` error.
956    ///
957    /// # Examples
958    /// ```rust,ignore
959    /// use clickhouse_arrow::prelude::*;
960    ///
961    /// let client = Client::builder()
962    ///     .destination("localhost:9000")
963    ///     .database("default") // Must be connected to default to drop 'other' databases
964    ///     .build::<NativeFormat>()
965    ///     .await?;
966    ///
967    /// client.drop_database("my_db", true, None).await?;
968    /// ```
969    #[instrument(
970        name = "clickhouse.drop_database",
971        skip_all
972        fields(db.system = "clickhouse", db.operation = "drop.database")
973    )]
974    pub async fn drop_database(&self, database: &str, sync: bool, qid: Option<Qid>) -> Result<()> {
975        if database.eq_ignore_ascii_case("default") {
976            warn!("Exiting, cannot drop `default` database");
977            return Ok(());
978        }
979
980        // TODO: Should this check remain? Or should the query writing be modified in the case
981        // of issuing DDL statements while connected to a non-default database
982        let current_database = self.connection.database();
983        if current_database != "default"
984            && !current_database.is_empty()
985            && current_database != database
986        {
987            error!("Cannot drop database {database} while connected to {current_database}");
988            return Err(Error::InsufficientDDLScope(current_database.into()));
989        }
990
991        let stmt = drop_db_statement(database, sync)?;
992        self.execute(stmt, qid).await?;
993        Ok(())
994    }
995}
996
997impl<T: ClientFormat> Client<T> {
998    /// Get a reference to the underlying connection.
999    ///
1000    /// TODO: Support reconnect.
1001    #[expect(clippy::unused_async)]
1002    async fn conn(&self) -> Result<&connection::Connection<T>> {
1003        // TODO: Add reconnection logic here if configured
1004        Ok(self.connection.as_ref())
1005    }
1006
1007    /// # Feature
1008    /// Requires the `cloud` feature to be enabled.
1009    #[cfg(feature = "cloud")]
1010    #[instrument(level = "trace", name = "clickhouse.cloud.ping")]
1011    async fn ping_cloud(
1012        domain: &str,
1013        timeout: Option<u64>,
1014        track: Option<&std::sync::atomic::AtomicBool>,
1015    ) {
1016        debug!("pinging cloud instance");
1017        if !domain.is_empty() {
1018            debug!(domain, "cloud endpoint found");
1019            // Create receiver channel to cancel ping if dropped
1020            let (_tx, rx) = oneshot::channel::<()>();
1021            cloud::ping_cloud(domain.to_string(), timeout, track, rx).await;
1022        }
1023    }
1024
1025    // Helper function to convert a receiver of data into a `ClickHouseResponse`
1026    fn insert_response(
1027        &self,
1028        rx: mpsc::Receiver<Result<T::Data>>,
1029        qid: Qid,
1030    ) -> ClickHouseResponse<()> {
1031        ClickHouseResponse::<()>::from_stream(handle_insert_response::<T>(rx, qid, self.client_id))
1032    }
1033}
1034
1035impl Client<NativeFormat> {
1036    /// Inserts rows into `ClickHouse` using the native protocol.
1037    ///
1038    /// This method sends an insert query with a collection of rows, where each row is
1039    /// a type `T` implementing [`Row`]. The rows are converted into a `ClickHouse`
1040    /// [`Block`] and sent over the native protocol. The query is executed asynchronously,
1041    /// and any response data, progress events, or errors are streamed back.
1042    ///
1043    /// Progress and profile events are dispatched to the client's event channel (see
1044    /// [`Client::subscribe_events`]). The returned [`ClickHouseResponse`] yields `()`
1045    /// on success or an error if the insert fails.
1046    ///
1047    /// # Parameters
1048    /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
1049    /// - `blocks`: An iterator of rows to insert, where each row implements [`Row`].
1050    /// - `qid`: Optional query ID for tracking and debugging.
1051    ///
1052    /// # Returns
1053    /// A [`Result`] containing a [`ClickHouseResponse<()>`] that streams the operation's
1054    /// outcome.
1055    ///
1056    /// # Errors
1057    /// - Fails if the query is malformed or the row data is invalid.
1058    /// - Fails if the connection to `ClickHouse` is interrupted.
1059    /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
1060    ///
1061    /// # Examples
1062    /// ```rust,ignore
1063    /// use clickhouse_arrow::prelude::*;
1064    ///
1065    /// let client = Client::builder()
1066    ///     .with_endpoint("localhost:9000")
1067    ///     .build_native()
1068    ///     .await
1069    ///     .unwrap();
1070    ///
1071    /// // Assume `MyRow` implements `Row`
1072    /// let rows = vec![MyRow { /* ... */ }, MyRow { /* ... */ }];
1073    /// let response = client.insert_rows("INSERT INTO my_table VALUES", rows.into_iter(), None)
1074    ///     .await
1075    ///     .unwrap();
1076    /// while let Some(result) = response.next().await {
1077    ///     result.unwrap(); // Check for errors
1078    /// }
1079    /// ```
1080    #[instrument(
1081        name = "clickhouse.insert_rows",
1082        fields(
1083            db.system = "clickhouse",
1084            db.operation = "insert",
1085            db.format = NativeFormat::FORMAT,
1086            clickhouse.client.id = self.client_id,
1087            clickhouse.query.id
1088        ),
1089        skip_all
1090    )]
1091    pub async fn insert_rows<T: Row + Send + 'static>(
1092        &self,
1093        query: impl Into<ParsedQuery>,
1094        blocks: impl Iterator<Item = T> + Send + Sync + 'static,
1095        qid: Option<Qid>,
1096    ) -> Result<ClickHouseResponse<()>> {
1097        let cid = self.client_id;
1098        let (query, qid) = record_query(qid, query.into(), cid);
1099
1100        // Create metadata channel
1101        let (tx, rx) = oneshot::channel();
1102        let (header_tx, header_rx) = oneshot::channel();
1103
1104        let connection = self.conn().await?;
1105
1106        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1107        let conn_idx = connection
1108            .send_operation(
1109                Operation::Query {
1110                    query,
1111                    settings: self.settings.clone(),
1112                    params: None,
1113                    response: tx,
1114                    header: Some(header_tx),
1115                },
1116                qid,
1117                false,
1118            )
1119            .await?;
1120
1121        trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "sent query, awaiting response");
1122        let responses = rx
1123            .await
1124            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1125            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1126
1127        let header = header_rx
1128            .await
1129            .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1130        let data = Block::from_rows(blocks.collect(), header)?;
1131
1132        let (tx, rx) = oneshot::channel();
1133        let _ =
1134            connection.send_operation(Operation::Insert { data, response: tx }, qid, true).await?;
1135        rx.await.map_err(|_| {
1136            Error::Protocol(format!("Failed to receive response from insert {qid}"))
1137        })??;
1138
1139        // Decrement load balancer
1140        #[cfg(feature = "inner_pool")]
1141        connection.finish(conn_idx, Operation::<Block>::weight_query());
1142
1143        Ok(self.insert_response(responses, qid))
1144    }
1145
1146    /// Executes a `ClickHouse` query and streams deserialized rows.
1147    ///
1148    /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1149    /// each row is deserialized into type `T` implementing [`Row`]. Rows are grouped
1150    /// into `ClickHouse` blocks, and the stream yields rows as they are received. Use
1151    /// this method for type-safe access to query results in native format.
1152    ///
1153    /// Progress and profile events are dispatched to the client's event channel (see
1154    /// [`Client::subscribe_events`]).
1155    ///
1156    /// # Parameters
1157    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1158    /// - `qid`: Optional query ID for tracking and debugging.
1159    ///
1160    /// # Returns
1161    /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1162    /// rows of type `T`.
1163    ///
1164    /// # Errors
1165    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1166    /// - Fails if row deserialization fails (e.g., schema mismatch).
1167    /// - Fails if the connection to `ClickHouse` is interrupted.
1168    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1169    ///
1170    /// # Examples
1171    /// ```rust,ignore
1172    /// use clickhouse_arrow::prelude::*;
1173    ///
1174    /// let client = Client::builder()
1175    ///     .with_endpoint("localhost:9000")
1176    ///     .build_native()
1177    ///     .await
1178    ///     .unwrap();
1179    ///
1180    /// // Assume `MyRow` implements `Row`
1181    /// let mut response = client.query::<MyRow>("SELECT * FROM my_table", None).await.unwrap();
1182    /// while let Some(row) = response.next().await {
1183    ///     let row = row.unwrap();
1184    ///     println!("Row: {:?}", row);
1185    /// }
1186    /// ```
1187    #[instrument(
1188        name = "clickhouse.query",
1189        skip_all,
1190        fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1191    )]
1192    pub async fn query<T: Row + Send + 'static>(
1193        &self,
1194        query: impl Into<ParsedQuery>,
1195        qid: Option<Qid>,
1196    ) -> Result<ClickHouseResponse<T>> {
1197        self.query_params(query, None, qid).await
1198    }
1199
1200    /// Executes a `ClickHouse` query with parameters and streams deserialized rows.
1201    ///
1202    /// # Parameters
1203    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1204    /// - `params`: The query parameters to provide
1205    /// - `qid`: Optional query ID for tracking and debugging.
1206    ///
1207    /// # Returns
1208    /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1209    /// rows of type `T`.
1210    ///
1211    /// # Errors
1212    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1213    /// - Fails if row deserialization fails (e.g., schema mismatch).
1214    /// - Fails if the connection to `ClickHouse` is interrupted.
1215    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1216    ///
1217    /// # Examples
1218    /// ```rust,ignore
1219    /// use clickhouse_arrow::prelude::*;
1220    ///
1221    /// let client = Client::builder()
1222    ///     .with_endpoint("localhost:9000")
1223    ///     .build_native()
1224    ///     .await
1225    ///     .unwrap();
1226    ///
1227    /// // Assume `MyRow` implements `Row`
1228    /// let params = Some(vec![("name", ParamValue::from("my_table"))]);
1229    /// let query = "SELECT * FROM {name:Identifier}";
1230    /// let mut response = client.query_params::<MyRow>(query, params, None).await.unwrap();
1231    /// while let Some(row) = response.next().await {
1232    ///     let row = row.unwrap();
1233    ///     println!("Row: {:?}", row);
1234    /// }
1235    /// ```
1236    #[instrument(
1237        name = "clickhouse.query_params",
1238        skip_all,
1239        fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1240    )]
1241    pub async fn query_params<T: Row + Send + 'static>(
1242        &self,
1243        query: impl Into<ParsedQuery>,
1244        params: Option<QueryParams>,
1245        qid: Option<Qid>,
1246    ) -> Result<ClickHouseResponse<T>> {
1247        let (query, qid) = record_query(qid, query.into(), self.client_id);
1248        let raw = self.query_raw(query, params, qid).await?;
1249        Ok(ClickHouseResponse::new(Box::pin(raw.flat_map(|block| {
1250            match block {
1251                Ok(mut block) => stream::iter(
1252                    block
1253                        .take_iter_rows()
1254                        .filter(|x| !x.is_empty())
1255                        .map(T::deserialize_row)
1256                        .map(|maybe| maybe.inspect_err(|error| error!(?error, "deserializing row")))
1257                        .collect::<Vec<_>>(),
1258                ),
1259                Err(e) => stream::iter(vec![Err(e)]),
1260            }
1261        }))))
1262    }
1263
1264    /// Executes a `ClickHouse` query and returns the first row, discarding the rest.
1265    ///
1266    /// This method sends a query to `ClickHouse` and returns the first row deserialized
1267    /// into type `T` implementing [`Row`], or `None` if the result is empty. It is
1268    /// useful for queries expected to return a single row (e.g., `SELECT COUNT(*)`).
1269    /// For streaming multiple rows, use [`Client::query`].
1270    ///
1271    /// Progress and profile events are dispatched to the client's event channel (see
1272    /// [`Client::subscribe_events`]).
1273    ///
1274    /// # Parameters
1275    /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1276    /// - `qid`: Optional query ID for tracking and debugging.
1277    ///
1278    /// # Returns
1279    /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1280    /// `None` if no rows are returned.
1281    ///
1282    /// # Errors
1283    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1284    /// - Fails if row deserialization fails (e.g., schema mismatch).
1285    /// - Fails if the connection to `ClickHouse` is interrupted.
1286    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1287    ///
1288    /// # Examples
1289    /// ```rust,ignore
1290    /// use clickhouse_arrow::prelude::*;
1291    ///
1292    /// let client = Client::builder()
1293    ///     .with_endpoint("localhost:9000")
1294    ///     .build_native()
1295    ///     .await
1296    ///     .unwrap();
1297    ///
1298    /// // Assume `MyRow` implements `Row`
1299    /// let row = client.query_one::<MyRow>("SELECT name FROM users WHERE id = 1", None)
1300    ///     .await
1301    ///     .unwrap();
1302    /// if let Some(row) = row {
1303    ///     println!("Found row: {:?}", row);
1304    /// }
1305    /// ```
1306    #[instrument(
1307        name = "clickhouse.query_one",
1308        skip_all,
1309        fields(
1310            db.system = "clickhouse",
1311            db.operation = "query",
1312            db.format = NativeFormat::FORMAT,
1313            clickhouse.client.id = self.client_id,
1314            clickhouse.query.id
1315        )
1316    )]
1317    pub async fn query_one<T: Row + Send + 'static>(
1318        &self,
1319        query: impl Into<ParsedQuery>,
1320        qid: Option<Qid>,
1321    ) -> Result<Option<T>> {
1322        self.query_one_params(query, None, qid).await
1323    }
1324
1325    /// Executes a `ClickHouse` query with parameters and returns the first row, discarding the
1326    /// rest.
1327    ///
1328    /// # Parameters
1329    /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1330    /// - `params`: The query parameters to provide
1331    /// - `qid`: Optional query ID for tracking and debugging.
1332    ///
1333    /// # Returns
1334    /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1335    /// `None` if no rows are returned.
1336    ///
1337    /// # Errors
1338    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1339    /// - Fails if row deserialization fails (e.g., schema mismatch).
1340    /// - Fails if the connection to `ClickHouse` is interrupted.
1341    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1342    ///
1343    /// # Examples
1344    /// ```rust,ignore
1345    /// use clickhouse_arrow::prelude::*;
1346    ///
1347    /// let client = Client::builder()
1348    ///     .with_endpoint("localhost:9000")
1349    ///     .build_native()
1350    ///     .await
1351    ///     .unwrap();
1352    ///
1353    /// // Assume `MyRow` implements `Row`
1354    /// let params = Some(vec![
1355    ///     ("str", ParamValue::from("name")),
1356    /// ].into());
1357    /// let query = "SELECT {str:String} FROM users WHERE id = 1";
1358    /// let row = client.query_one_params::<MyRow>(query, params, None)
1359    ///     .await
1360    ///     .unwrap();
1361    /// if let Some(row) = row {
1362    ///     println!("Found row: {:?}", row);
1363    /// }
1364    /// ```
1365    #[instrument(
1366        name = "clickhouse.query_one_params",
1367        skip_all,
1368        fields(
1369            db.system = "clickhouse",
1370            db.operation = "query",
1371            db.format = NativeFormat::FORMAT,
1372            clickhouse.client.id = self.client_id,
1373            clickhouse.query.id
1374        )
1375    )]
1376    pub async fn query_one_params<T: Row + Send + 'static>(
1377        &self,
1378        query: impl Into<ParsedQuery>,
1379        params: Option<QueryParams>,
1380        qid: Option<Qid>,
1381    ) -> Result<Option<T>> {
1382        let mut stream = self.query_params::<T>(query, params, qid).await?;
1383        stream.next().await.transpose()
1384    }
1385
1386    /// Creates a `ClickHouse` table from a Rust struct that implements the `Row` trait.
1387    ///
1388    /// This method generates and executes a `CREATE TABLE` DDL statement based on the
1389    /// structure of the provided `Row` type. The table schema is automatically derived
1390    /// from the struct fields and their types.
1391    ///
1392    /// # Arguments
1393    /// * `database` - Optional database name. If None, uses the client's default database
1394    /// * `table` - The name of the table to create
1395    /// * `options` - Table creation options including engine type, order by, and partition by
1396    /// * `query_id` - Optional query ID for tracking and debugging
1397    ///
1398    /// # Type Parameters
1399    /// * `T` - A type that implements the `Row` trait, typically a struct with the `#[derive(Row)]`
1400    ///   macro
1401    ///
1402    /// # Example
1403    /// ```ignore
1404    /// #[derive(Row)]
1405    /// struct User {
1406    ///     id: u32,
1407    ///     name: String,
1408    ///     created_at: DateTime,
1409    /// }
1410    ///
1411    /// let options = CreateOptions::new("MergeTree")
1412    ///     .with_order_by(&["id"]);
1413    ///
1414    /// client.create_table::<User>(Some("analytics"), "users", &options, None).await?;
1415    /// ```
1416    ///
1417    /// # Errors
1418    /// - Returns an error if the table creation fails
1419    /// - Returns an error if the database/table names are invalid
1420    /// - Returns an error if the connection is lost
1421    #[instrument(
1422        name = "clickhouse.create_table",
1423        skip_all
1424        fields(
1425            db.system = "clickhouse",
1426            db.operation = "create.table",
1427            db.format = NativeFormat::FORMAT,
1428        )
1429    )]
1430    pub async fn create_table<T: Row>(
1431        &self,
1432        database: Option<&str>,
1433        table: &str,
1434        options: &CreateOptions,
1435        qid: Option<Qid>,
1436    ) -> Result<()> {
1437        let database = database.unwrap_or(self.connection.database());
1438        let stmt = create_table_statement_from_native::<T>(Some(database), table, options)?;
1439        self.execute(stmt, qid).await?;
1440        Ok(())
1441    }
1442}
1443
1444impl Client<ArrowFormat> {
1445    /// Executes a `ClickHouse` query and streams Arrow [`RecordBatch`] results.
1446    ///
1447    /// This method sends a query to `ClickHouse` and returns a stream of [`RecordBatch`]
1448    /// instances, each containing a chunk of the query results in Apache Arrow format.
1449    /// Use this method for efficient integration with Arrow-based data processing
1450    /// pipelines. For row-based access, consider [`Client::query_rows`].
1451    ///
1452    /// Progress and profile events are dispatched to the client's event channel (see
1453    /// [`Client::subscribe_events`]).
1454    ///
1455    /// # Parameters
1456    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1457    /// - `qid`: Optional query ID for tracking and debugging.
1458    ///
1459    /// # Returns
1460    /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1461    /// query results.
1462    ///
1463    /// # Errors
1464    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1465    /// - Fails if the connection to `ClickHouse` is interrupted.
1466    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1467    ///
1468    /// # Examples
1469    /// ```rust,ignore
1470    /// use clickhouse_arrow::prelude::*;
1471    ///
1472    /// let client = Client::builder()
1473    ///     .with_endpoint("localhost:9000")
1474    ///     .build_arrow()
1475    ///     .await
1476    ///     .unwrap();
1477    ///
1478    /// let mut response = client.query("SELECT * FROM my_table", None).await.unwrap();
1479    /// while let Some(batch) = response.next().await {
1480    ///     let batch = batch.unwrap();
1481    ///     println!("Received batch with {} rows", batch.num_rows());
1482    /// }
1483    /// ```
1484    #[instrument(
1485        skip_all,
1486        fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1487    )]
1488    pub async fn query(
1489        &self,
1490        query: impl Into<ParsedQuery>,
1491        qid: Option<Qid>,
1492    ) -> Result<ClickHouseResponse<RecordBatch>> {
1493        self.query_params(query, None, qid).await
1494    }
1495
1496    /// Executes a `ClickHouse` query with parameters and streams Arrow [`RecordBatch`] results.
1497    ///
1498    /// # Parameters
1499    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1500    /// - `params`: The query parameters to provide
1501    /// - `qid`: Optional query ID for tracking and debugging.
1502    ///
1503    /// # Returns
1504    /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1505    /// query results.
1506    ///
1507    /// # Errors
1508    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1509    /// - Fails if the connection to `ClickHouse` is interrupted.
1510    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1511    ///
1512    /// # Examples
1513    /// ```rust,ignore
1514    /// use clickhouse_arrow::prelude::*;
1515    ///
1516    /// let client = Client::builder()
1517    ///     .with_endpoint("localhost:9000")
1518    ///     .build_arrow()
1519    ///     .await
1520    ///     .unwrap();
1521    ///
1522    /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1523    /// let query = "SELECT * FROM {name:Identifier}";
1524    /// let mut response = client.query_params(query, params, None).await.unwrap();
1525    /// while let Some(batch) = response.next().await {
1526    ///     let batch = batch.unwrap();
1527    ///     println!("Received batch with {} rows", batch.num_rows());
1528    /// }
1529    /// ```
1530    #[instrument(
1531        skip_all,
1532        fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1533    )]
1534    pub async fn query_params(
1535        &self,
1536        query: impl Into<ParsedQuery>,
1537        params: Option<QueryParams>,
1538        qid: Option<Qid>,
1539    ) -> Result<ClickHouseResponse<RecordBatch>> {
1540        let (query, qid) = record_query(qid, query.into(), self.client_id);
1541        Ok(ClickHouseResponse::new(Box::pin(self.query_raw(query, params, qid).await?)))
1542    }
1543
1544    /// Executes a `ClickHouse` query and streams rows as column-major values.
1545    ///
1546    /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1547    /// each row is represented as a `Vec<Value>` containing column values. The data is
1548    /// transposed from Arrow [`RecordBatch`] format to row-major format, making it
1549    /// convenient for row-based processing. For direct Arrow access, use
1550    /// [`Client::query`].
1551    ///
1552    /// Progress and profile events are dispatched to the client's event channel (see
1553    /// [`Client::subscribe_events`]).
1554    ///
1555    /// # Parameters
1556    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1557    /// - `qid`: Optional query ID for tracking and debugging.
1558    ///
1559    /// # Returns
1560    /// A [`Result`] containing a [`ClickHouseResponse<Vec<Value>>`] that streams rows.
1561    ///
1562    /// # Errors
1563    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1564    /// - Fails if the connection to `ClickHouse` is interrupted.
1565    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1566    ///
1567    /// # Examples
1568    /// ```rust,ignore
1569    /// use clickhouse_arrow::prelude::*;
1570    ///
1571    /// let client = Client::builder()
1572    ///     .with_endpoint("localhost:9000")
1573    ///     .build_arrow()
1574    ///     .await
1575    ///     .unwrap();
1576    ///
1577    /// let mut response = client.query_rows("SELECT * FROM my_table", None).await.unwrap();
1578    /// while let Some(row) = response.next().await {
1579    ///     let row = row.unwrap();
1580    ///     println!("Row values: {:?}", row);
1581    /// }
1582    /// ```
1583    #[instrument(
1584        name = "clickhouse.query_rows",
1585        fields(
1586            db.system = "clickhouse",
1587            db.operation = "query",
1588            db.format = ArrowFormat::FORMAT,
1589            clickhouse.client.id = self.client_id,
1590            clickhouse.query.id
1591        ),
1592        skip_all
1593    )]
1594    pub async fn query_rows(
1595        &self,
1596        query: impl Into<ParsedQuery>,
1597        qid: Option<Qid>,
1598    ) -> Result<ClickHouseResponse<Vec<Value>>> {
1599        let (query, qid) = record_query(qid, query.into(), self.client_id);
1600        let connection = self.conn().await?;
1601
1602        // Create metadata channel
1603        let (tx, rx) = oneshot::channel();
1604        let (header_tx, header_rx) = oneshot::channel();
1605
1606        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1607        let conn_idx = connection
1608            .send_operation(
1609                Operation::Query {
1610                    query,
1611                    settings: self.settings.clone(),
1612                    // TODO: Add arg for params
1613                    params: None,
1614                    response: tx,
1615                    header: Some(header_tx),
1616                },
1617                qid,
1618                true,
1619            )
1620            .await?;
1621
1622        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
1623        let responses = rx
1624            .await
1625            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1626            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1627
1628        let header = header_rx
1629            .await
1630            .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1631
1632        let response = create_response_stream::<ArrowFormat>(responses, qid, self.client_id)
1633            .map(move |batch| (header.clone(), batch))
1634            .map(|(header, batch)| {
1635                let batch = batch?;
1636                let batch_iter = batch_to_rows(&batch, Some(&header))?;
1637                Ok::<_, Error>(stream::iter(batch_iter))
1638            })
1639            .try_flatten();
1640
1641        // Decrement load balancer
1642        #[cfg(feature = "inner_pool")]
1643        connection.finish(conn_idx, Operation::<RecordBatch>::weight_insert_many());
1644
1645        Ok(ClickHouseResponse::from_stream(response))
1646    }
1647
1648    /// Executes a `ClickHouse` query and returns the first column of the first batch.
1649    ///
1650    /// This method sends a query to `ClickHouse` and returns the first column of the
1651    /// first [`RecordBatch`] as an Arrow [`ArrayRef`], or `None` if the result is empty.
1652    /// It is useful for queries that return a single column (e.g., `SELECT id FROM
1653    /// my_table`). For full batch access, use [`Client::query`].
1654    ///
1655    /// Progress and profile events are dispatched to the client's event channel (see
1656    /// [`Client::subscribe_events`]).
1657    ///
1658    /// # Parameters
1659    /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1660    /// - `qid`: Optional query ID for tracking and debugging.
1661    ///
1662    /// # Returns
1663    /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1664    /// the first batch, or `None` if no data is returned.
1665    ///
1666    /// # Errors
1667    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1668    /// - Fails if the connection to `ClickHouse` is interrupted.
1669    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1670    ///
1671    /// # Examples
1672    /// ```rust,ignore
1673    /// use clickhouse_arrow::prelude::*;
1674    ///
1675    /// let client = Client::builder()
1676    ///     .with_endpoint("localhost:9000")
1677    ///     .build_arrow()
1678    ///     .await
1679    ///     .unwrap();
1680    ///
1681    /// let column = client.query_column("SELECT id FROM my_table", None)
1682    ///     .await
1683    ///     .unwrap();
1684    /// if let Some(col) = column {
1685    ///     println!("Column data: {:?}", col);
1686    /// }
1687    /// ```
1688    #[instrument(
1689        name = "clickhouse.query_column",
1690        skip_all,
1691        fields(
1692            db.system = "clickhouse",
1693            db.operation = "query",
1694            db.format = ArrowFormat::FORMAT,
1695            clickhouse.client.id = self.client_id,
1696            clickhouse.query.id
1697        )
1698    )]
1699    pub async fn query_column(
1700        &self,
1701        query: impl Into<ParsedQuery>,
1702        qid: Option<Qid>,
1703    ) -> Result<Option<ArrayRef>> {
1704        self.query_column_params(query, None, qid).await
1705    }
1706
1707    /// Executes a `ClickHouse` query with parameters and returns the first column of the first
1708    /// batch.
1709    ///
1710    /// # Parameters
1711    /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1712    /// - `params`: The query parameters to provide
1713    /// - `qid`: Optional query ID for tracking and debugging.
1714    ///
1715    /// # Returns
1716    /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1717    /// the first batch, or `None` if no data is returned.
1718    ///
1719    /// # Errors
1720    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1721    /// - Fails if the connection to `ClickHouse` is interrupted.
1722    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1723    ///
1724    /// # Examples
1725    /// ```rust,ignore
1726    /// use clickhouse_arrow::prelude::*;
1727    ///
1728    /// let client = Client::builder()
1729    ///     .with_endpoint("localhost:9000")
1730    ///     .build_arrow()
1731    ///     .await
1732    ///     .unwrap();
1733    ///
1734    /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1735    /// let query = "SELECT id FROM {name:Identifier}";
1736    /// let column = client.query_column_params("SELECT id FROM my_table", params, None)
1737    ///     .await
1738    ///     .unwrap();
1739    /// if let Some(col) = column {
1740    ///     println!("Column data: {:?}", col);
1741    /// }
1742    /// ```
1743    #[instrument(
1744        name = "clickhouse.query_column_params",
1745        skip_all,
1746        fields(
1747            db.system = "clickhouse",
1748            db.operation = "query",
1749            db.format = ArrowFormat::FORMAT,
1750            clickhouse.client.id = self.client_id,
1751            clickhouse.query.id
1752        )
1753    )]
1754    pub async fn query_column_params(
1755        &self,
1756        query: impl Into<ParsedQuery>,
1757        params: Option<QueryParams>,
1758        qid: Option<Qid>,
1759    ) -> Result<Option<ArrayRef>> {
1760        let mut stream = self.query_params(query, params, qid).await?;
1761        let Some(batch) = stream.next().await.transpose()? else {
1762            return Ok(None);
1763        };
1764
1765        if batch.num_rows() == 0 { Ok(None) } else { Ok(Some(Arc::clone(batch.column(0)))) }
1766    }
1767
1768    /// Executes a `ClickHouse` query and returns the first row as a [`RecordBatch`].
1769    ///
1770    /// This method sends a query to `ClickHouse` and returns the first row of the first
1771    /// [`RecordBatch`], or `None` if the result is empty. The returned [`RecordBatch`]
1772    /// contains a single row. It is useful for queries expected to return a single row
1773    /// (e.g., `SELECT * FROM users WHERE id = 1`). For streaming multiple rows, use
1774    /// [`Client::query`].
1775    ///
1776    /// Progress and profile events are dispatched to the client's event channel (see
1777    /// [`Client::subscribe_events`]).
1778    ///
1779    /// # Parameters
1780    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1781    /// - `qid`: Optional query ID for tracking and debugging.
1782    ///
1783    /// # Returns
1784    /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1785    /// `None` if no rows are returned.
1786    ///
1787    /// # Errors
1788    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1789    /// - Fails if the connection to `ClickHouse` is interrupted.
1790    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1791    ///
1792    /// # Examples
1793    /// ```rust,ignore
1794    /// use clickhouse_arrow::prelude::*;
1795    ///
1796    /// let client = Client::builder()
1797    ///     .with_endpoint("localhost:9000")
1798    ///     .build_arrow()
1799    ///     .await
1800    ///     .unwrap();
1801    ///
1802    /// let batch = client.query_one("SELECT * FROM users WHERE id = 1", None)
1803    ///     .await
1804    ///     .unwrap();
1805    /// if let Some(row) = batch {
1806    ///     println!("Row data: {:?}", row);
1807    /// }
1808    /// ```
1809    #[instrument(
1810        name = "clickhouse.query_one",
1811        skip_all
1812        fields(
1813            db.system = "clickhouse",
1814            db.operation = "query",
1815            db.format = ArrowFormat::FORMAT,
1816            clickhouse.client.id = self.client_id,
1817            clickhouse.query.id
1818        )
1819    )]
1820    pub async fn query_one(
1821        &self,
1822        query: impl Into<ParsedQuery>,
1823        qid: Option<Qid>,
1824    ) -> Result<Option<RecordBatch>> {
1825        self.query_one_params(query, None, qid).await
1826    }
1827
1828    /// Executes a `ClickHouse` query with parameters and returns the first row as a
1829    /// [`RecordBatch`].
1830    ///
1831    /// # Parameters
1832    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1833    /// - `params`: The query parameters to provide
1834    /// - `qid`: Optional query ID for tracking and debugging.
1835    ///
1836    /// # Returns
1837    /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1838    /// `None` if no rows are returned.
1839    ///
1840    /// # Errors
1841    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1842    /// - Fails if the connection to `ClickHouse` is interrupted.
1843    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1844    ///
1845    /// # Examples
1846    /// ```rust,ignore
1847    /// use clickhouse_arrow::prelude::*;
1848    ///
1849    /// let client = Client::builder()
1850    ///     .with_endpoint("localhost:9000")
1851    ///     .build_arrow()
1852    ///     .await
1853    ///     .unwrap();
1854    ///
1855    /// let params = Some(vec![("id", ParamValue::from(1))]);
1856    /// let batch = client.query_one_params("SELECT * FROM users WHERE id = {id:UInt64}", None)
1857    ///     .await
1858    ///     .unwrap();
1859    /// if let Some(row) = batch {
1860    ///     println!("Row data: {:?}", row);
1861    /// }
1862    /// ```
1863    #[instrument(
1864        name = "clickhouse.query_one_params",
1865        skip_all
1866        fields(
1867            db.system = "clickhouse",
1868            db.operation = "query",
1869            db.format = ArrowFormat::FORMAT,
1870            clickhouse.client.id = self.client_id,
1871            clickhouse.query.id
1872        )
1873    )]
1874    pub async fn query_one_params(
1875        &self,
1876        query: impl Into<ParsedQuery>,
1877        params: Option<QueryParams>,
1878        qid: Option<Qid>,
1879    ) -> Result<Option<RecordBatch>> {
1880        let stream = self.query_params(query, params, qid).await?;
1881        tokio::pin!(stream);
1882
1883        let Some(batch) = stream.next().await.transpose()? else {
1884            return Ok(None);
1885        };
1886
1887        if batch.num_rows() == 0 {
1888            Ok(None)
1889        } else {
1890            Ok(Some(take_record_batch(&batch, &arrow::array::UInt32Array::from(vec![0]))?))
1891        }
1892    }
1893
1894    /// Fetches the list of database names (schemas) in `ClickHouse`.
1895    ///
1896    /// This method queries `ClickHouse` to retrieve the names of all databases
1897    /// accessible to the client. It is useful for exploring the database structure or
1898    /// validating database existence before performing operations.
1899    ///
1900    /// # Parameters
1901    /// - `qid`: Optional query ID for tracking and debugging.
1902    ///
1903    /// # Returns
1904    /// A [`Result`] containing a `Vec<String>` of database names.
1905    ///
1906    /// # Errors
1907    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
1908    /// - Fails if the connection to `ClickHouse` is interrupted.
1909    ///
1910    /// # Examples
1911    /// ```rust,ignore
1912    /// use clickhouse_arrow::prelude::*;
1913    ///
1914    /// let client = Client::builder()
1915    ///     .with_endpoint("localhost:9000")
1916    ///     .build_arrow()
1917    ///     .await
1918    ///     .unwrap();
1919    ///
1920    /// let schemas = client.fetch_schemas(None).await.unwrap();
1921    /// println!("Databases: {:?}", schemas);
1922    /// ```
1923    #[instrument(
1924        name = "clickhouse.fetch_schemas",
1925        skip_all
1926        fields(
1927            db.system = "clickhouse",
1928            db.operation = "query",
1929            db.format = ArrowFormat::FORMAT,
1930            clickhouse.client.id = self.client_id,
1931            clickhouse.query.id
1932        )
1933    )]
1934    pub async fn fetch_schemas(&self, qid: Option<Qid>) -> Result<Vec<String>> {
1935        crate::arrow::schema::fetch_databases(self, qid).await
1936    }
1937
1938    /// Fetches all tables across all databases in `ClickHouse`.
1939    ///
1940    /// This method queries `ClickHouse` to retrieve a mapping of database names to
1941    /// their table names. It is useful for discovering the full schema structure of
1942    /// the `ClickHouse` instance.
1943    ///
1944    /// # Parameters
1945    /// - `qid`: Optional query ID for tracking and debugging.
1946    ///
1947    /// # Returns
1948    /// A [`Result`] containing a `HashMap<String, Vec<String>>`, where each key is a
1949    /// database name and the value is a list of table names in that database.
1950    ///
1951    /// # Errors
1952    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
1953    /// - Fails if the connection to `ClickHouse` is interrupted.
1954    ///
1955    /// # Examples
1956    /// ```rust,ignore
1957    /// use clickhouse_arrow::prelude::*;
1958    ///
1959    /// let client = Client::builder()
1960    ///     .with_endpoint("localhost:9000")
1961    ///     .build_arrow()
1962    ///     .await
1963    ///     .unwrap();
1964    ///
1965    /// let tables = client.fetch_all_tables(None).await.unwrap();
1966    /// for (db, tables) in tables {
1967    ///     println!("Database {} has tables: {:?}", db, tables);
1968    /// }
1969    /// ```
1970    #[instrument(
1971        name = "clickhouse.fetch_all_tables",
1972        skip_all
1973        fields(
1974            db.system = "clickhouse",
1975            db.operation = "query",
1976            db.format = ArrowFormat::FORMAT,
1977            clickhouse.client.id = self.client_id,
1978            clickhouse.query.id
1979        )
1980    )]
1981    pub async fn fetch_all_tables(&self, qid: Option<Qid>) -> Result<HashMap<String, Vec<String>>> {
1982        crate::arrow::schema::fetch_all_tables(self, qid).await
1983    }
1984
1985    /// Fetches the list of table names in a specific `ClickHouse` database.
1986    ///
1987    /// This method queries `ClickHouse` to retrieve the names of all tables in the
1988    /// specified database (or the client's default database if `None`). It is useful
1989    /// for exploring the schema of a specific database.
1990    ///
1991    /// # Parameters
1992    /// - `database`: Optional database name. If `None`, uses the client's default database.
1993    /// - `qid`: Optional query ID for tracking and debugging.
1994    ///
1995    /// # Returns
1996    /// A [`Result`] containing a `Vec<String>` of table names.
1997    ///
1998    /// # Errors
1999    /// - Fails if the database does not exist or is inaccessible.
2000    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2001    /// - Fails if the connection to `ClickHouse` is interrupted.
2002    ///
2003    /// # Examples
2004    /// ```rust,ignore
2005    /// use clickhouse_arrow::prelude::*;
2006    ///
2007    /// let client = Client::builder()
2008    ///     .with_endpoint("localhost:9000")
2009    ///     .build_arrow()
2010    ///     .await
2011    ///     .unwrap();
2012    ///
2013    /// let tables = client.fetch_tables(Some("my_db"), None).await.unwrap();
2014    /// println!("Tables in my_db: {:?}", tables);
2015    /// ```
2016    #[instrument(
2017        name = "clickhouse.fetch_tables",
2018        skip_all
2019        fields(
2020            db.system = "clickhouse",
2021            db.operation = "query",
2022            db.format = ArrowFormat::FORMAT,
2023            clickhouse.client.id = self.client_id,
2024            clickhouse.query.id
2025        )
2026    )]
2027    pub async fn fetch_tables(
2028        &self,
2029        database: Option<&str>,
2030        qid: Option<Qid>,
2031    ) -> Result<Vec<String>> {
2032        let database = database.unwrap_or(self.connection.database());
2033        crate::arrow::schema::fetch_tables(self, database, qid).await
2034    }
2035
2036    /// Fetches the schema of specified tables in a `ClickHouse` database.
2037    ///
2038    /// This method queries `ClickHouse` to retrieve the Arrow schemas of the specified
2039    /// tables in the given database (or the client's default database if `None`). If
2040    /// the `tables` list is empty, it fetches schemas for all tables in the database.
2041    /// The result is a mapping of table names to their corresponding Arrow [`SchemaRef`].
2042    ///
2043    /// # Parameters
2044    /// - `database`: Optional database name. If `None`, uses the client's default database.
2045    /// - `tables`: A list of table names to fetch schemas for. An empty list fetches all tables.
2046    /// - `qid`: Optional query ID for tracking and debugging.
2047    ///
2048    /// # Returns
2049    /// A [`Result`] containing a `HashMap<String, SchemaRef>`, mapping table names to
2050    /// their schemas.
2051    ///
2052    /// # Errors
2053    /// - Fails if the database or any table does not exist or is inaccessible.
2054    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2055    /// - Fails if the connection to `ClickHouse` is interrupted.
2056    ///
2057    /// # Examples
2058    /// ```rust,ignore
2059    /// use clickhouse_arrow::prelude::*;
2060    ///
2061    /// let client = Client::builder()
2062    ///     .with_endpoint("localhost:9000")
2063    ///     .build_arrow()
2064    ///     .await
2065    ///     .unwrap();
2066    ///
2067    /// let schemas = client.fetch_schema(Some("my_db"), &["my_table"], None)
2068    ///     .await
2069    ///     .unwrap();
2070    /// for (table, schema) in schemas {
2071    ///     println!("Table {} schema: {:?}", table, schema);
2072    /// }
2073    /// ```
2074    #[instrument(
2075        name = "clickhouse.fetch_schema",
2076        skip_all
2077        fields(
2078            db.system = "clickhouse",
2079            db.operation = "query",
2080            db.format = ArrowFormat::FORMAT,
2081            clickhouse.client.id = self.client_id,
2082            clickhouse.query.id
2083        )
2084    )]
2085    pub async fn fetch_schema(
2086        &self,
2087        database: Option<&str>,
2088        tables: &[&str],
2089        qid: Option<Qid>,
2090    ) -> Result<HashMap<String, SchemaRef>> {
2091        let database = database.unwrap_or(self.connection.database());
2092        let options = self.connection.metadata().arrow_options;
2093        crate::arrow::schema::fetch_schema(self, database, tables, qid, options).await
2094    }
2095
2096    /// Issues a `CREATE TABLE` DDL statement for a table using Arrow schema.
2097    ///
2098    /// Creates a table in the specified database (or the client's default database if
2099    /// `None`) based on the provided Arrow [`SchemaRef`]. The `options` parameter allows
2100    /// customization of table properties, such as engine type and partitioning. This
2101    /// method is specific to [`ArrowClient`] for seamless integration with Arrow-based
2102    /// data pipelines.
2103    ///
2104    /// # Parameters
2105    /// - `database`: Optional database name. If `None`, uses the client's default database.
2106    /// - `table`: Name of the table to create.
2107    /// - `schema`: The Arrow schema defining the table's structure.
2108    /// - `options`: Configuration for table creation (e.g., engine, partitioning).
2109    /// - `qid`: Optional query ID for tracking and debugging.
2110    ///
2111    /// # Returns
2112    /// A [`Result`] indicating success or failure of the operation.
2113    ///
2114    /// # Errors
2115    /// - Fails if the provided schema is invalid or incompatible with `ClickHouse`.
2116    /// - Fails if the database does not exist or is inaccessible.
2117    /// - Fails if the query execution encounters a `ClickHouse` error.
2118    ///
2119    /// # Examples
2120    /// ```rust,ignore
2121    /// use clickhouse_arrow::prelude::*;
2122    /// use arrow::datatypes::{Schema, SchemaRef};
2123    ///
2124    /// let client = Client::builder()
2125    ///     .with_endpoint("localhost:9000")
2126    ///     .build_arrow()
2127    ///     .await
2128    ///     .unwrap();
2129    ///
2130    /// // Assume `schema` is a valid Arrow schema
2131    /// let schema: SchemaRef = Arc::new(Schema::new(vec![/* ... */]));
2132    /// let options = CreateOptions::default();
2133    /// client.create_table(Some("my_db"), "my_table", &schema, &options, None)
2134    ///     .await
2135    ///     .unwrap();
2136    /// ```
2137    #[instrument(
2138        name = "clickhouse.create_table",
2139        skip_all
2140        fields(
2141            db.system = "clickhouse",
2142            db.operation = "create.table",
2143            db.format = ArrowFormat::FORMAT,
2144            clickhouse.client.id = self.client_id,
2145            clickhouse.query.id
2146        )
2147    )]
2148    pub async fn create_table(
2149        &self,
2150        database: Option<&str>,
2151        table: &str,
2152        schema: &SchemaRef,
2153        options: &CreateOptions,
2154        qid: Option<Qid>,
2155    ) -> Result<()> {
2156        let database = database.unwrap_or(self.connection.database());
2157        let arrow_options = self.connection.metadata().arrow_options;
2158        let stmt = create_table_statement_from_arrow(
2159            Some(database),
2160            table,
2161            schema,
2162            options,
2163            Some(arrow_options),
2164        )?;
2165        self.execute(stmt, qid).await?;
2166        Ok(())
2167    }
2168}
2169
2170impl<T: ClientFormat> Drop for Client<T> {
2171    fn drop(&mut self) {
2172        trace!({ ATT_CID } = self.client_id, "Client dropped");
2173    }
2174}
2175
2176/// Simple helper to log query id and client id
2177fn record_query(qid: Option<Qid>, query: ParsedQuery, cid: u16) -> (String, Qid) {
2178    let qid = qid.unwrap_or_default();
2179    let _ = Span::current().record(ATT_QID, tracing::field::display(qid));
2180    let query = query.0;
2181    trace!(query, { ATT_CID } = cid, "Querying clickhouse");
2182    (query, qid)
2183}
2184
2185#[cfg(test)]
2186mod tests {
2187    use std::sync::Arc;
2188
2189    use arrow::array::Int32Array;
2190    use arrow::datatypes::{DataType, Field, Schema};
2191    use arrow::record_batch::RecordBatch;
2192
2193    use super::*;
2194
2195    #[test]
2196    fn test_record_query_function() {
2197        let query = ParsedQuery("SELECT 1".to_string());
2198        let cid = 123;
2199        let qid = Qid::new();
2200
2201        let (parsed_query, returned_qid) = record_query(Some(qid), query, cid);
2202
2203        assert_eq!(parsed_query, "SELECT 1");
2204        assert_eq!(returned_qid, qid);
2205    }
2206
2207    #[test]
2208    fn test_record_query_function_no_qid() {
2209        let query = ParsedQuery("SELECT 2".to_string());
2210        let cid = 456;
2211
2212        let (parsed_query, returned_qid) = record_query(None, query, cid);
2213
2214        assert_eq!(parsed_query, "SELECT 2");
2215        // Should generate a default Qid
2216        assert!(!returned_qid.to_string().is_empty());
2217    }
2218
2219    // Helper function to create a simple test RecordBatch
2220    fn create_test_record_batch() -> RecordBatch {
2221        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2222        let array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
2223        RecordBatch::try_new(schema, vec![array]).unwrap()
2224    }
2225
2226    #[tokio::test]
2227    async fn test_split_record_batch_function() {
2228        // Test the split_record_batch function that insert_max_rows uses
2229        let batch = create_test_record_batch();
2230        let max_rows = 2;
2231
2232        let batches = crate::arrow::utils::split_record_batch(batch, max_rows);
2233
2234        // Should split 5 rows into 3 batches: [2, 2, 1]
2235        assert_eq!(batches.len(), 3);
2236        assert_eq!(batches[0].num_rows(), 2);
2237        assert_eq!(batches[1].num_rows(), 2);
2238        assert_eq!(batches[2].num_rows(), 1);
2239    }
2240
2241    #[test]
2242    fn test_split_record_batch_edge_cases() {
2243        let batch = create_test_record_batch();
2244
2245        // Test with max_rows equal to batch size
2246        let batches = crate::arrow::utils::split_record_batch(batch.clone(), 5);
2247        assert_eq!(batches.len(), 1);
2248        assert_eq!(batches[0].num_rows(), 5);
2249
2250        // Test with max_rows larger than batch size
2251        let batches = crate::arrow::utils::split_record_batch(batch.clone(), 10);
2252        assert_eq!(batches.len(), 1);
2253        assert_eq!(batches[0].num_rows(), 5);
2254
2255        // Test with max_rows = 1
2256        let batches = crate::arrow::utils::split_record_batch(batch, 1);
2257        assert_eq!(batches.len(), 5);
2258        for batch in batches {
2259            assert_eq!(batch.num_rows(), 1);
2260        }
2261    }
2262}