clickhouse_arrow/
client.rs

1/// The `client` module provides the primary interface for interacting with `ClickHouse`
2/// over its native protocol, with full support for Apache Arrow interoperability.
3/// The main entry point is the [`Client`] struct, which supports both native `ClickHouse`
4/// data formats ([`NativeClient`]) and Arrow-compatible formats ([`ArrowClient`]).
5///
6/// This module is designed to be thread-safe, with [`Client`] instances that can be
7/// cloned and shared across threads. It supports querying, inserting data, managing
8/// database schemas, and handling `ClickHouse` events like progress and profiling.
9mod builder;
10mod chunk;
11#[cfg(feature = "cloud")]
12mod cloud;
13pub(crate) mod connection;
14mod internal;
15mod options;
16mod reader;
17mod response;
18mod tcp;
19mod writer;
20
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::sync::atomic::AtomicU16;
24
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::compute::take_record_batch;
27use arrow::datatypes::SchemaRef;
28use futures_util::{Stream, StreamExt, TryStreamExt, stream};
29use strum::AsRefStr;
30use tokio::sync::{broadcast, mpsc, oneshot};
31
32pub use self::builder::*;
33pub use self::connection::ConnectionStatus;
34pub(crate) use self::internal::{Message, Operation};
35pub use self::options::*;
36pub use self::response::*;
37pub use self::tcp::Destination;
38use crate::arrow::utils::batch_to_rows;
39use crate::constants::*;
40use crate::formats::{ClientFormat, NativeFormat};
41use crate::native::block::Block;
42use crate::native::protocol::{CompressionMethod, ProfileEvent};
43use crate::prelude::*;
44use crate::query::{ParsedQuery, QueryParams};
45use crate::schema::CreateOptions;
46use crate::{Error, Progress, Result, Row};
47
48static CLIENT_ID: AtomicU16 = AtomicU16::new(0);
49
50/// A `ClickHouse` client configured for the native format.
51///
52/// This type alias provides a client that works with `ClickHouse`'s internal
53/// data representation, useful when you need direct control over types
54/// and don't require Arrow integration.
55pub type NativeClient = Client<NativeFormat>;
56
57/// A `ClickHouse` client configured for Apache Arrow format.
58///
59/// This type alias provides a client that works with Arrow `RecordBatch`es,
60/// enabling seamless integration with the Arrow ecosystem for data processing
61/// and analytics workflows.
62pub type ArrowClient = Client<ArrowFormat>;
63
64/// Configuration for a `ClickHouse` connection, including tracing and cloud-specific settings.
65///
66/// This struct is used to pass optional context to [`Client::connect`], enabling features
67/// like distributed tracing or cloud instance tracking.
68///
69/// # Fields
70/// - `trace`: Optional tracing context for logging and monitoring.
71/// - `cloud`: Optional cloud-specific configuration (requires the `cloud` feature).
72#[derive(Debug, Clone, Default)]
73#[cfg_attr(not(feature = "cloud"), derive(Copy))]
74pub struct ConnectionContext {
75    pub trace: Option<TraceContext>,
76    #[cfg(feature = "cloud")]
77    pub cloud: Option<Arc<std::sync::atomic::AtomicBool>>,
78}
79
80/// Emitted clickhouse events from the underlying connection
81#[derive(Debug, Clone)]
82pub struct Event {
83    pub event:     ClickHouseEvent,
84    pub qid:       Qid,
85    pub client_id: u16,
86}
87
88/// Profile and progress events from clickhouse
89#[derive(Debug, Clone, AsRefStr)]
90pub enum ClickHouseEvent {
91    Progress(Progress),
92    Profile(Vec<ProfileEvent>),
93}
94
95/// A thread-safe handle for interacting with a `ClickHouse` database over its native protocol.
96///
97/// The `Client` struct is the primary interface for executing queries, inserting data, and
98/// managing database schemas. It supports two data formats:
99/// - [`NativeClient`]: Uses `ClickHouse`'s native [`Block`] format for data exchange.
100/// - [`ArrowClient`]: Uses Apache Arrow's [`RecordBatch`] for seamless interoperability with Arrow
101///   ecosystems.
102///
103/// `Client` instances are lightweight and can be cloned and shared across threads. Each instance
104/// maintains a reference to an underlying connection, which is managed automatically. The client
105/// also supports event subscription for receiving progress and profiling information from
106/// `ClickHouse`.
107///
108/// # Usage
109/// Create a `Client` using the [`ClientBuilder`] for a fluent configuration experience, or use
110/// [`Client::connect`] for direct connection setup.
111///
112/// # Examples
113/// ```rust,ignore
114/// use clickhouse_arrow::prelude::*;
115/// use clickhouse_arrow::arrow;
116/// use futures_util::StreamExt;
117///
118/// let client = Client::builder()
119///     .destination("localhost:9000")
120///     .username("default")
121///     .build::<ArrowFormat>()
122///     .await?;
123///
124/// // Execute a query
125/// let batch = client
126///     .query("SELECT 1")
127///     .await?
128///     .collect::<Vec<_>>()
129///     .await
130///     .into_iter()
131///     .collect::<Result<Vec<_>>>()?;
132/// arrow::util::pretty::print_batches(batch)?;
133/// ```
134#[derive(Clone, Debug)]
135pub struct Client<T: ClientFormat> {
136    pub client_id: u16,
137    connection:    Arc<connection::Connection<T>>,
138    events:        Arc<broadcast::Sender<Event>>,
139    settings:      Option<Arc<Settings>>,
140}
141
142impl<T: ClientFormat> Client<T> {
143    /// Get an instance of [`ClientBuilder`] which allows creating a `Client` using a builder
144    /// Creates a new [`ClientBuilder`] for configuring and building a `ClickHouse` client.
145    ///
146    /// This method provides a fluent interface to set up a `Client` with custom connection
147    /// parameters, such as the server address, credentials, TLS, and compression. The
148    /// builder can create either a single [`Client`] or a connection pool (with the `pool`
149    /// feature enabled).
150    ///
151    /// Use this method when you need fine-grained control over the client configuration.
152    /// For simple connections, you can also use [`Client::connect`] directly.
153    ///
154    /// # Returns
155    /// A [`ClientBuilder`] instance ready for configuration.
156    ///
157    /// # Examples
158    /// ```rust,ignore
159    /// use clickhouse_arrow::prelude::*;
160    ///
161    /// let builder = Client::builder()
162    ///     .with_endpoint("localhost:9000")
163    ///     .with_username("default")
164    ///     .with_password("");
165    /// ```
166    pub fn builder() -> ClientBuilder { ClientBuilder::new() }
167
168    /// Establishes a connection to a `ClickHouse` server over TCP, with optional TLS support.
169    ///
170    /// This method creates a new [`Client`] instance connected to the specified `destination`.
171    /// The connection can be configured using [`ClientOptions`], which allows setting parameters
172    /// like username, password, TLS, and compression. Optional `settings` can be provided to
173    /// customize `ClickHouse` session behavior, and a `context` can be used for tracing or
174    /// cloud-specific configurations.
175    ///
176    /// # Parameters
177    /// - `destination`: The `ClickHouse` server address (e.g., `"localhost:9000"` or a
178    ///   [`Destination`]).
179    /// - `options`: Configuration for the connection, including credentials, TLS, and cloud
180    ///   settings.
181    /// - `settings`: Optional `ClickHouse` session settings (e.g., query timeouts, max rows).
182    /// - `context`: Optional connection context for tracing or cloud-specific behavior.
183    ///
184    /// # Returns
185    /// A [`Result`] containing the connected [`Client`] instance, or an error if the connection
186    /// fails.
187    ///
188    /// # Errors
189    /// - Fails if the destination cannot be resolved or the connection cannot be established.
190    /// - Fails if authentication or TLS setup encounters an issue.
191    ///
192    /// # Examples
193    /// ```rust,ignore
194    /// use clickhouse_arrow::client::{Client, ClientOptions};
195    ///
196    /// let options = ClientOptions::default()
197    ///     .username("default")
198    ///     .password("")
199    ///     .use_tls(false);
200    ///
201    /// let client = Client::connect("localhost:9000", options, None, None).await?;
202    /// ```
203    #[instrument(
204        level = "debug",
205        name = "clickhouse.connect",
206        fields(
207            db.system = "clickhouse",
208            db.format = T::FORMAT,
209            network.transport = ?if options.use_tls { "tls" } else { "tcp" }
210        ),
211        skip_all
212    )]
213    pub async fn connect<A: Into<Destination>>(
214        destination: A,
215        options: ClientOptions,
216        settings: Option<Arc<Settings>>,
217        context: Option<ConnectionContext>,
218    ) -> Result<Self> {
219        let context = context.unwrap_or_default();
220        let trace_ctx = context.trace.unwrap_or_default();
221        let _ = trace_ctx.link(&Span::current());
222
223        let client_id = CLIENT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
224
225        // Resolve the destination
226        let destination: Destination = destination.into();
227        let addrs = destination.resolve(options.ipv4_only).await?;
228
229        #[cfg(feature = "cloud")]
230        {
231            // Ping the cloud instance if requested
232            if let Some(domain) = options.domain.as_ref().filter(|_| options.ext.cloud.wakeup) {
233                let cloud_track = context.cloud.as_deref();
234                Self::ping_cloud(domain, options.ext.cloud.timeout, cloud_track).await;
235            }
236        }
237
238        if let Some(addr) = addrs.first() {
239            let _ = Span::current()
240                .record("server.address", tracing::field::debug(&addr.ip()))
241                .record("server.port", addr.port());
242            debug!(server.address = %addr.ip(), server.port = addr.port(), "Initiating connection");
243        }
244
245        let (event_tx, _) = broadcast::channel(EVENTS_CAPACITY);
246        let events = Arc::new(event_tx);
247        let conn_ev = Arc::clone(&events);
248
249        let conn =
250            connection::Connection::connect(client_id, addrs, options, conn_ev, trace_ctx).await?;
251        let connection = Arc::new(conn);
252
253        debug!("created connection successfully");
254
255        Ok(Client { client_id, connection, events, settings })
256    }
257
258    /// Retrieves the status of the underlying `ClickHouse` connection.
259    ///
260    /// This method returns the current [`ConnectionStatus`] of the client's connection,
261    /// indicating whether it is active, idle, or disconnected. Useful for monitoring
262    /// the health of the connection before executing queries.
263    ///
264    /// # Returns
265    /// A [`ConnectionStatus`] enum describing the connection state.
266    ///
267    /// # Examples
268    /// ```rust,ignore
269    /// use clickhouse_arrow::prelude::*;
270    ///
271    /// let client = Client::<ArrowFormat>::builder()
272    ///     .with_endpoint("localhost:9000")
273    ///     .build()
274    ///     .await
275    ///     .unwrap();
276    ///
277    /// let status = client.status();
278    /// println!("Connection status: {status:?}");
279    /// ```
280    pub fn status(&self) -> ConnectionStatus { self.connection.status() }
281
282    /// Subscribes to progress and profile events from `ClickHouse` queries.
283    ///
284    /// This method returns a [`broadcast::Receiver`] that delivers [`Event`] instances
285    /// containing progress updates ([`Progress`]) or profiling information ([`ProfileEvent`])
286    /// as queries execute. Events are generated asynchronously and can be used to monitor
287    /// query execution in real time.
288    ///
289    /// # Returns
290    /// A [`broadcast::Receiver<Event>`] for receiving `ClickHouse` events.
291    ///
292    /// # Examples
293    /// ```rust,ignore
294    /// use clickhouse_arrow::prelude::*;
295    /// use tokio::sync::broadcast::error::RecvError;
296    ///
297    /// let client = Client::builder()
298    ///     .with_endpoint("localhost:9000")
299    ///     .build_arrow()
300    ///     .await
301    ///     .unwrap();
302    ///
303    /// let mut receiver = client.subscribe_events();
304    /// let handle = tokio::spawn(async move {
305    ///     while let Ok(event) = receiver.recv().await {
306    ///         println!("Received event: {:?}", event);
307    ///     }
308    /// });
309    ///
310    /// // Execute a query to generate events
311    /// client.query("SELECT * FROM large_table").await.unwrap();
312    /// ```
313    pub fn subscribe_events(&self) -> broadcast::Receiver<Event> { self.events.subscribe() }
314
315    /// Checks the health of the underlying `ClickHouse` connection.
316    ///
317    /// This method verifies that the connection is active and responsive. If `ping` is
318    /// `true`, it sends a lightweight ping to the `ClickHouse` server to confirm
319    /// connectivity. Otherwise, it checks the connection's internal state.
320    ///
321    /// # Parameters
322    /// - `ping`: If `true`, performs an active ping to the server; if `false`, checks the
323    ///   connection state without network activity.
324    ///
325    /// # Returns
326    /// A [`Result`] indicating whether the connection is healthy.
327    ///
328    /// # Errors
329    /// - Fails if the connection is disconnected or unresponsive.
330    /// - Fails if the ping operation times out or encounters a network error.
331    ///
332    /// # Examples
333    /// ```rust,ignore
334    /// use clickhouse_arrow::prelude::*;
335    ///
336    /// let client = Client::builder()
337    ///     .with_endpoint("localhost:9000")
338    ///     .build::<ArrowFormat>()
339    ///     .await
340    ///     .unwrap();
341    ///
342    /// client.health_check(true).await.unwrap();
343    /// println!("Connection is healthy!");
344    /// ```
345    pub async fn health_check(&self, ping: bool) -> Result<()> {
346        trace!({ ATT_CID } = self.client_id, "sending health check w/ ping={ping}");
347        self.conn().await?.check_connection(ping).await
348    }
349
350    /// Shuts down the `ClickHouse` client and closes its connection.
351    ///
352    /// This method gracefully terminates the underlying connection, ensuring that any
353    /// pending operations are completed or canceled. After shutdown, the client cannot
354    /// be used for further operations.
355    ///
356    /// # Returns
357    /// A [`Result`] indicating whether the shutdown was successful.
358    ///
359    /// # Errors
360    /// - Fails if the connection cannot be closed due to network issues or internal errors.
361    ///
362    /// # Examples
363    /// ```rust,ignore
364    /// use clickhouse_arrow::prelude::*;
365    ///
366    /// let client = Client::builder()
367    ///     .with_endpoint("localhost:9000")
368    ///     .build::<ArrowFormat>()
369    ///     .await
370    ///     .unwrap();
371    ///
372    /// client.shutdown().await.unwrap();
373    /// println!("Client shut down successfully!");
374    /// ```
375    pub async fn shutdown(&self) -> Result<()> {
376        trace!("shutting down client");
377        self.conn().await?.shutdown().await
378    }
379
380    /// Inserts a block of data into `ClickHouse` using the native protocol.
381    ///
382    /// This method sends an insert query with a single block of data, formatted according to
383    /// the client's data format (`T: ClientFormat`). For [`NativeClient`], the data is a
384    /// [`Block`]; for [`ArrowClient`], it is a [`RecordBatch`]. The query is executed
385    /// asynchronously, and any response data, progress events, or errors are streamed back.
386    ///
387    /// Progress and profile events are dispatched to the client's event channel (see
388    /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
389    /// error if the insert fails.
390    ///
391    /// # Parameters
392    /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
393    /// - `block`: The data to insert, in the format specified by `T` ([`Block`] or
394    ///   [`RecordBatch`]).
395    /// - `qid`: Optional query ID for tracking and debugging.
396    ///
397    /// # Returns
398    /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
399    /// the success or failure of processing response data.
400    ///
401    /// # Errors
402    /// - Fails if the query is malformed or the data format is invalid.
403    /// - Fails if the connection to `ClickHouse` is interrupted.
404    /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
405    ///
406    /// # Examples
407    /// ```rust,ignore
408    /// use clickhouse_arrow::prelude::*;
409    /// use arrow::record_batch::RecordBatch;
410    ///
411    /// let client = Client::builder()
412    ///     .destination("localhost:9000")
413    ///     .build_arrow()
414    ///     .await?;
415    ///
416    /// let qid = Qid::new();
417    /// // Assume `batch` is a valid RecordBatch
418    /// let batch: RecordBatch = // ...;
419    /// let stream = client.insert("INSERT INTO my_table VALUES", batch, Some(qid)).await?;
420    /// while let Some(result) = stream.next().await {
421    ///     result?; // Check for errors
422    /// }
423    /// ```
424    #[instrument(
425        level = "trace",
426        name = "clickhouse.insert",
427        skip_all
428        fields(
429            db.system = "clickhouse",
430            db.operation = "insert",
431            db.format = T::FORMAT,
432            clickhouse.client.id = self.client_id,
433            clickhouse.query.id
434        ),
435    )]
436    pub async fn insert(
437        &self,
438        query: impl Into<ParsedQuery>,
439        block: T::Data,
440        qid: Option<Qid>,
441    ) -> Result<impl Stream<Item = Result<()>> + '_> {
442        let (query, qid) = record_query(qid, query.into(), self.client_id);
443
444        // Create metadata channel
445        let (tx, rx) = oneshot::channel();
446        let connection = self.conn().await?;
447
448        // Send query
449        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
450        let conn_idx = connection
451            .send_operation(
452                Operation::Query {
453                    query,
454                    settings: self.settings.clone(),
455                    params: None,
456                    response: tx,
457                    header: None,
458                },
459                qid,
460                false,
461            )
462            .await?;
463
464        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
465        let responses = rx
466            .await
467            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
468            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
469
470        // Send data
471        let (tx, rx) = oneshot::channel();
472        let _ = connection
473            .send_operation(Operation::Insert { data: block, response: tx }, qid, true)
474            .await?;
475        rx.await.map_err(|_| {
476            Error::Protocol(format!("Failed to receive response from insert {qid}"))
477        })??;
478
479        // Decrement load balancer
480        #[cfg(feature = "inner_pool")]
481        connection.finish(conn_idx, Operation::<T::Data>::weight_insert());
482
483        Ok(self.insert_response(responses, qid))
484    }
485
486    /// Inserts multiple blocks of data into `ClickHouse` using the native protocol.
487    ///
488    /// This method sends an insert query with a collection of data blocks, formatted
489    /// according to the client's data format (`T: ClientFormat`). For [`NativeClient`],
490    /// the data is a `Vec<Block>`; for [`ArrowClient`], it is a `Vec<RecordBatch>`.
491    /// The query is executed asynchronously, and any response data, progress events,
492    /// or errors are streamed back.
493    ///
494    /// Progress and profile events are dispatched to the client's event channel (see
495    /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
496    /// error if the insert fails. Use this method when inserting multiple batches of
497    /// data to reduce overhead compared to multiple [`Client::insert`] calls.
498    ///
499    /// # Parameters
500    /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
501    /// - `batch`: A vector of data blocks to insert, in the format specified by `T`.
502    /// - `qid`: Optional query ID for tracking and debugging.
503    ///
504    /// # Returns
505    /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
506    /// the success or failure of processing response data.
507    ///
508    /// # Errors
509    /// - Fails if the query is malformed or any data block is invalid.
510    /// - Fails if the connection to `ClickHouse` is interrupted.
511    /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
512    ///
513    /// # Examples
514    /// ```rust,ignore
515    /// use clickhouse_arrow::prelude::*;
516    /// use arrow::record_batch::RecordBatch;
517    ///
518    /// let client = Client::builder()
519    ///     .with_endpoint("localhost:9000")
520    ///     .build::<ArrowFormat>()
521    ///     .await
522    ///     .unwrap();
523    ///
524    /// // Assume `batches` is a Vec<RecordBatch>
525    /// let batches: Vec<RecordBatch> = vec![/* ... */];
526    /// let stream = client.insert_many("INSERT INTO my_table VALUES", batches, None).await.unwrap();
527    /// while let Some(result) = stream.next().await {
528    ///     result.unwrap(); // Check for errors
529    /// }
530    /// ```
531    #[instrument(
532        name = "clickhouse.insert_many",
533        skip_all,
534        fields(
535            db.system = "clickhouse",
536            db.operation = "insert",
537            db.format = T::FORMAT,
538            clickhouse.client.id = self.client_id,
539            clickhouse.query.id
540        ),
541    )]
542    pub async fn insert_many(
543        &self,
544        query: impl Into<ParsedQuery>,
545        batch: Vec<T::Data>,
546        qid: Option<Qid>,
547    ) -> Result<impl Stream<Item = Result<()>> + '_> {
548        let (query, qid) = record_query(qid, query.into(), self.client_id);
549
550        // Create metadata channel
551        let (tx, rx) = oneshot::channel();
552        let connection = self.conn().await?;
553
554        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
555        let conn_idx = connection
556            .send_operation(
557                Operation::Query {
558                    query,
559                    settings: self.settings.clone(),
560                    params: None,
561                    response: tx,
562                    header: None,
563                },
564                qid,
565                false,
566            )
567            .await?;
568
569        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
570        let responses = rx
571            .await
572            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
573            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
574
575        // Send data
576        let (tx, rx) = oneshot::channel();
577        let _ = connection
578            .send_operation(Operation::InsertMany { data: batch, response: tx }, qid, true)
579            .await?;
580        rx.await.map_err(|_| {
581            Error::Protocol(format!("Failed to receive response from insert {qid}"))
582        })??;
583
584        // Decrement load balancer
585        #[cfg(feature = "inner_pool")]
586        connection.finish(conn_idx, Operation::<T::Data>::weight_insert_many());
587
588        Ok(self.insert_response(responses, qid))
589    }
590
591    /// Executes a raw `ClickHouse` query and streams raw data in the client's format.
592    ///
593    /// This method sends a query to `ClickHouse` and returns a stream of raw data blocks
594    /// in the format specified by `T: ClientFormat` ([`Block`] for [`NativeClient`],
595    /// [`RecordBatch`] for [`ArrowClient`]). It is a low-level method suitable for
596    /// custom processing of query results. For higher-level interfaces, consider
597    /// [`Client::query`] or [`Client::query_rows`].
598    ///
599    /// Progress and profile events are dispatched to the client's event channel (see
600    /// [`Client::subscribe_events`]).
601    ///
602    /// # Parameters
603    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
604    /// - `qid`: A unique query ID for tracking and debugging.
605    ///
606    /// # Returns
607    /// A [`Result`] containing a stream of [`Result<T::Data>`], where each item is a
608    /// data block or an error.
609    ///
610    /// # Errors
611    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
612    /// - Fails if the connection to `ClickHouse` is interrupted.
613    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
614    ///
615    /// # Examples
616    /// ```rust,ignore
617    /// use clickhouse_arrow::prelude::*;
618    ///
619    /// let client = Client::builder()
620    ///     .with_endpoint("localhost:9000")
621    ///     .build::<ArrowFormat>()
622    ///     .await
623    ///     .unwrap();
624    ///
625    /// let qid = Qid::new();
626    /// let mut stream = client.query_raw("SELECT * FROM my_table", qid).await.unwrap();
627    /// while let Some(block) = stream.next().await {
628    ///     let batch = block.unwrap();
629    ///     println!("Received batch with {} rows", batch.num_rows());
630    /// }
631    /// ```
632    #[instrument(
633        name = "clickhouse.query",
634        skip_all
635        fields(
636            db.system = "clickhouse",
637            db.operation = "query",
638            db.format = T::FORMAT,
639            clickhouse.client.id = self.client_id,
640            clickhouse.query.id = %qid
641        ),
642     )]
643    pub async fn query_raw<P: Into<QueryParams>>(
644        &self,
645        query: String,
646        params: Option<P>,
647        qid: Qid,
648    ) -> Result<impl Stream<Item = Result<T::Data>> + 'static> {
649        // Create metadata channel
650        let (tx, rx) = oneshot::channel();
651        let connection = self.conn().await?;
652
653        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
654        let conn_idx = connection
655            .send_operation(
656                Operation::Query {
657                    query,
658                    settings: self.settings.clone(),
659                    params: params.map(Into::into),
660                    response: tx,
661                    header: None,
662                },
663                qid,
664                true,
665            )
666            .await?;
667
668        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
669
670        let responses = rx
671            .await
672            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
673            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
674        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
675
676        // Decrement load balancer
677        #[cfg(feature = "inner_pool")]
678        connection.finish(conn_idx, Operation::<T::Data>::weight_query());
679
680        Ok(create_response_stream::<T>(responses, qid, self.client_id))
681    }
682
683    /// Executes a `ClickHouse` query and discards all returned data.
684    ///
685    /// This method sends a query to `ClickHouse` and processes the response stream to
686    /// check for errors, but discards any returned data blocks. It is useful for
687    /// queries that modify data (e.g., `INSERT`, `UPDATE`, `DELETE`) or DDL statements
688    /// where the result data is not needed. For queries that return data, use
689    /// [`Client::query`] or [`Client::query_raw`].
690    ///
691    /// # Parameters
692    /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
693    /// - `qid`: Optional query ID for tracking and debugging.
694    ///
695    /// # Returns
696    /// A [`Result`] indicating whether the query executed successfully.
697    ///
698    /// # Errors
699    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
700    /// - Fails if the connection to `ClickHouse` is interrupted.
701    /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
702    ///
703    /// # Examples
704    /// ```rust,ignore
705    /// use clickhouse_arrow::prelude::*;
706    ///
707    /// let client = Client::builder()
708    ///     .with_endpoint("localhost:9000")
709    ///     .build_arrow()
710    ///     .await
711    ///     .unwrap();
712    ///
713    /// client.execute("DROP TABLE IF EXISTS my_table", None).await.unwrap();
714    /// println!("Table dropped successfully!");
715    /// ```
716    #[instrument(
717        name = "clickhouse.execute",
718        skip_all,
719        fields(
720            db.system = "clickhouse",
721            db.format = T::FORMAT,
722            db.operation = "query",
723            clickhouse.client.id = self.client_id,
724            clickhouse.query.id
725        )
726    )]
727    pub async fn execute(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
728        self.execute_params(query, None::<QueryParams>, qid).await
729    }
730
731    /// Executes a `ClickHouse` query with query parameters and discards all returned data.
732    ///
733    /// # Parameters
734    /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
735    /// - `params`: The query parameters to provide
736    /// - `qid`: Optional query ID for tracking and debugging.
737    ///
738    /// # Returns
739    /// A [`Result`] indicating whether the query executed successfully.
740    ///
741    /// # Errors
742    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
743    /// - Fails if the connection to `ClickHouse` is interrupted.
744    /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
745    ///
746    /// # Examples
747    /// ```rust,ignore
748    /// use clickhouse_arrow::prelude::*;
749    ///
750    /// let client = Client::builder()
751    ///     .with_endpoint("localhost:9000")
752    ///     .build_arrow()
753    ///     .await
754    ///     .unwrap();
755    ///
756    /// let params = Some(vec![
757    ///     ("str", ParamValue::from("hello")),
758    ///     ("num", ParamValue::from(42)),
759    ///     ("array", ParamValue::from("['a', 'b', 'c']")),
760    /// ]);
761    /// let query = "SELECT {num:Int64}, {str:String}, {array:Array(String)}";
762    /// client.execute_params(query, params, None).await.unwrap();
763    /// println!("Table dropped successfully!");
764    /// ```
765    #[instrument(
766        name = "clickhouse.execute_params",
767        skip_all,
768        fields(
769            db.system = "clickhouse",
770            db.format = T::FORMAT,
771            db.operation = "query",
772            clickhouse.client.id = self.client_id,
773            clickhouse.query.id
774        )
775    )]
776    pub async fn execute_params<P: Into<QueryParams>>(
777        &self,
778        query: impl Into<ParsedQuery>,
779        params: Option<P>,
780        qid: Option<Qid>,
781    ) -> Result<()> {
782        let (query, qid) = record_query(qid, query.into(), self.client_id);
783        let stream = self.query_raw(query, params, qid).await?;
784        tokio::pin!(stream);
785        while let Some(next) = stream.next().await {
786            drop(next?);
787        }
788        Ok(())
789    }
790
791    /// Executes a `ClickHouse` query without processing the response stream.
792    ///
793    /// This method sends a query to `ClickHouse` and immediately discards the response
794    /// stream without checking for errors or processing data. It is a lightweight
795    /// alternative to [`Client::execute`], suitable for fire-and-forget scenarios where
796    /// the query's outcome is not critical. For safer execution, use [`Client::execute`].
797    ///
798    /// # Parameters
799    /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
800    /// - `qid`: Optional query ID for tracking and debugging.
801    ///
802    /// # Returns
803    /// A [`Result`] indicating whether the query was sent successfully.
804    ///
805    /// # Errors
806    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
807    /// - Fails if the connection to `ClickHouse` is interrupted.
808    ///
809    /// # Examples
810    /// ```rust,ignore
811    /// use clickhouse_arrow::prelude::*;
812    ///
813    /// let client = Client::builder()
814    ///     .with_endpoint("localhost:9000")
815    ///     .build::<ArrowFormat>()
816    ///     .await
817    ///     .unwrap();
818    ///
819    /// client.execute_now("INSERT INTO logs VALUES ('event')", None).await.unwrap();
820    /// println!("Log event sent!");
821    /// ```
822    #[instrument(
823        name = "clickhouse.execute_now",
824        skip_all,
825        fields(
826            db.system = "clickhouse",
827            db.format = T::FORMAT,
828            db.operation = "query",
829            clickhouse.client.id = self.client_id,
830            clickhouse.query.id
831        )
832    )]
833    pub async fn execute_now(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
834        self.execute_now_params(query, None::<QueryParams>, qid).await
835    }
836
837    /// Executes a `ClickHouse` query with query parameters without processing the response stream.
838    ///
839    /// # Parameters
840    /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
841    /// - `params`: The query parameters to provide
842    /// - `qid`: Optional query ID for tracking and debugging.
843    ///
844    /// # Returns
845    /// A [`Result`] indicating whether the query was sent successfully.
846    ///
847    /// # Errors
848    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
849    /// - Fails if the connection to `ClickHouse` is interrupted.
850    ///
851    /// # Examples
852    /// ```rust,ignore
853    /// use clickhouse_arrow::prelude::*;
854    ///
855    /// let client = Client::builder()
856    ///     .with_endpoint("localhost:9000")
857    ///     .build::<ArrowFormat>()
858    ///     .await
859    ///     .unwrap();
860    ///
861    ///
862    /// let params = Some(vec![("str", ParamValue::from("hello"))]);
863    /// let query = "INSERT INTO logs VALUES ({str:String})";
864    /// client.execute_now_params(query, params, None).await.unwrap();
865    /// println!("Log event sent!");
866    /// ```
867    #[instrument(
868        name = "clickhouse.execute_now_params",
869        skip_all,
870        fields(
871            db.system = "clickhouse",
872            db.format = T::FORMAT,
873            db.operation = "query",
874            clickhouse.client.id = self.client_id,
875            clickhouse.query.id
876        )
877    )]
878    pub async fn execute_now_params<P: Into<QueryParams>>(
879        &self,
880        query: impl Into<ParsedQuery>,
881        params: Option<P>,
882        qid: Option<Qid>,
883    ) -> Result<()> {
884        let (query, qid) = record_query(qid, query.into(), self.client_id);
885        drop(self.query_raw(query, params, qid).await?);
886        Ok(())
887    }
888
889    /// Creates a new database in `ClickHouse` using a DDL statement.
890    ///
891    /// This method issues a `CREATE DATABASE` statement for the specified database. If no
892    /// database is provided, it uses the client's default database from the connection
893    /// metadata. The `default` database cannot be created, as it is reserved by `ClickHouse`.
894    ///
895    /// # Parameters
896    /// - `database`: Optional name of the database to create. If `None`, uses the client's default
897    ///   database.
898    /// - `qid`: Optional query ID for tracking and debugging.
899    ///
900    /// # Returns
901    /// A [`Result`] indicating success or failure of the operation.
902    ///
903    /// # Errors
904    /// - Fails if the database name is invalid or reserved (e.g., `default`).
905    /// - Fails if the query execution encounters a `ClickHouse` error.
906    /// - Fails if the connection is interrupted.
907    ///
908    /// # Examples
909    /// ```rust,ignore
910    /// use clickhouse_arrow::client::{Client, ClientBuilder};
911    ///
912    /// let client = ClientBuilder::new()
913    ///     .destination("localhost:9000")
914    ///     .build_native()
915    ///     .await?;
916    ///
917    /// client.create_database(Some("my_db"), None).await?;
918    /// ```
919    #[instrument(
920        name = "clickhouse.create_database",
921        skip_all
922        fields(db.system = "clickhouse", db.operation = "create.database")
923    )]
924    pub async fn create_database(&self, database: Option<&str>, qid: Option<Qid>) -> Result<()> {
925        let database = database.unwrap_or(self.connection.database());
926        let database = database.to_lowercase();
927        if &database == "default" {
928            warn!("Exiting, cannot create `default` database");
929            return Ok(());
930        }
931
932        let stmt = create_db_statement(&database)?;
933        self.execute(stmt, qid).await?;
934        Ok(())
935    }
936
937    /// Drops a database in `ClickHouse` using a DDL statement.
938    ///
939    /// This method issues a `DROP DATABASE` statement for the specified database. The
940    /// `default` database cannot be dropped, as it is reserved by `ClickHouse`. If the client
941    /// is connected to a non-default database, dropping a different database is not allowed
942    /// to prevent accidental data loss.
943    ///
944    /// # Parameters
945    /// - `database`: Name of the database to drop.
946    /// - `sync`: If `true`, the operation waits for `ClickHouse` to complete the drop
947    ///   synchronously.
948    /// - `qid`: Optional query ID for tracking and debugging.
949    ///
950    /// # Returns
951    /// A [`Result`] indicating success or failure of the operation.
952    ///
953    /// # Errors
954    /// - Fails if the database is `default` (reserved).
955    /// - Fails if the client is connected to a non-default database different from `database`.
956    /// - Fails if the query execution encounters a `ClickHouse` error.
957    ///
958    /// # Examples
959    /// ```rust,ignore
960    /// use clickhouse_arrow::prelude::*;
961    ///
962    /// let client = Client::builder()
963    ///     .destination("localhost:9000")
964    ///     .database("default") // Must be connected to default to drop 'other' databases
965    ///     .build::<NativeFormat>()
966    ///     .await?;
967    ///
968    /// client.drop_database("my_db", true, None).await?;
969    /// ```
970    #[instrument(
971        name = "clickhouse.drop_database",
972        skip_all
973        fields(db.system = "clickhouse", db.operation = "drop.database")
974    )]
975    pub async fn drop_database(&self, database: &str, sync: bool, qid: Option<Qid>) -> Result<()> {
976        let database = database.to_lowercase();
977        if &database == "default" {
978            warn!("Exiting, cannot drop `default` database");
979            return Ok(());
980        }
981
982        // TODO: Should this check remain? Or should the query writing be modified in the case
983        // of issuing DDL statements while connected to a non-default database
984        let current_database = self.connection.database();
985        if current_database != "default"
986            && !current_database.is_empty()
987            && current_database != database
988        {
989            error!("Cannot drop database {database} while connected to {current_database}");
990            return Err(Error::InsufficientDDLScope(current_database.into()));
991        }
992
993        let stmt = drop_db_statement(&database, sync)?;
994        self.execute(stmt, qid).await?;
995        Ok(())
996    }
997}
998
999impl<T: ClientFormat> Client<T> {
1000    /// Get a reference to the underlying connection.
1001    ///
1002    /// TODO: Support reconnect.
1003    #[expect(clippy::unused_async)]
1004    async fn conn(&self) -> Result<&connection::Connection<T>> {
1005        // TODO: Add reconnection logic here if configured
1006        Ok(self.connection.as_ref())
1007    }
1008
1009    /// # Feature
1010    /// Requires the `cloud` feature to be enabled.
1011    #[cfg(feature = "cloud")]
1012    #[instrument(level = "trace", name = "clickhouse.cloud.ping")]
1013    async fn ping_cloud(
1014        domain: &str,
1015        timeout: Option<u64>,
1016        track: Option<&std::sync::atomic::AtomicBool>,
1017    ) {
1018        debug!("pinging cloud instance");
1019        if !domain.is_empty() {
1020            debug!(domain, "cloud endpoint found");
1021            // Create receiver channel to cancel ping if dropped
1022            let (_tx, rx) = oneshot::channel::<()>();
1023            cloud::ping_cloud(domain.to_string(), timeout, track, rx).await;
1024        }
1025    }
1026
1027    // Helper function to convert a receiver of data into a `ClickHouseResponse`
1028    fn insert_response(
1029        &self,
1030        rx: mpsc::Receiver<Result<T::Data>>,
1031        qid: Qid,
1032    ) -> ClickHouseResponse<()> {
1033        ClickHouseResponse::<()>::from_stream(handle_insert_response::<T>(rx, qid, self.client_id))
1034    }
1035}
1036
1037impl Client<NativeFormat> {
1038    /// Inserts rows into `ClickHouse` using the native protocol.
1039    ///
1040    /// This method sends an insert query with a collection of rows, where each row is
1041    /// a type `T` implementing [`Row`]. The rows are converted into a `ClickHouse`
1042    /// [`Block`] and sent over the native protocol. The query is executed asynchronously,
1043    /// and any response data, progress events, or errors are streamed back.
1044    ///
1045    /// Progress and profile events are dispatched to the client's event channel (see
1046    /// [`Client::subscribe_events`]). The returned [`ClickHouseResponse`] yields `()`
1047    /// on success or an error if the insert fails.
1048    ///
1049    /// # Parameters
1050    /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
1051    /// - `blocks`: An iterator of rows to insert, where each row implements [`Row`].
1052    /// - `qid`: Optional query ID for tracking and debugging.
1053    ///
1054    /// # Returns
1055    /// A [`Result`] containing a [`ClickHouseResponse<()>`] that streams the operation's
1056    /// outcome.
1057    ///
1058    /// # Errors
1059    /// - Fails if the query is malformed or the row data is invalid.
1060    /// - Fails if the connection to `ClickHouse` is interrupted.
1061    /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
1062    ///
1063    /// # Examples
1064    /// ```rust,ignore
1065    /// use clickhouse_arrow::prelude::*;
1066    ///
1067    /// let client = Client::builder()
1068    ///     .with_endpoint("localhost:9000")
1069    ///     .build_native()
1070    ///     .await
1071    ///     .unwrap();
1072    ///
1073    /// // Assume `MyRow` implements `Row`
1074    /// let rows = vec![MyRow { /* ... */ }, MyRow { /* ... */ }];
1075    /// let response = client.insert_rows("INSERT INTO my_table VALUES", rows.into_iter(), None)
1076    ///     .await
1077    ///     .unwrap();
1078    /// while let Some(result) = response.next().await {
1079    ///     result.unwrap(); // Check for errors
1080    /// }
1081    /// ```
1082    #[instrument(
1083        name = "clickhouse.insert_rows",
1084        fields(
1085            db.system = "clickhouse",
1086            db.operation = "insert",
1087            db.format = NativeFormat::FORMAT,
1088            clickhouse.client.id = self.client_id,
1089            clickhouse.query.id
1090        ),
1091        skip_all
1092    )]
1093    pub async fn insert_rows<T: Row + Send + 'static>(
1094        &self,
1095        query: impl Into<ParsedQuery>,
1096        blocks: impl Iterator<Item = T> + Send + Sync + 'static,
1097        qid: Option<Qid>,
1098    ) -> Result<ClickHouseResponse<()>> {
1099        let cid = self.client_id;
1100        let (query, qid) = record_query(qid, query.into(), cid);
1101
1102        // Create metadata channel
1103        let (tx, rx) = oneshot::channel();
1104        let (header_tx, header_rx) = oneshot::channel();
1105
1106        let connection = self.conn().await?;
1107
1108        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1109        let conn_idx = connection
1110            .send_operation(
1111                Operation::Query {
1112                    query,
1113                    settings: self.settings.clone(),
1114                    params: None,
1115                    response: tx,
1116                    header: Some(header_tx),
1117                },
1118                qid,
1119                false,
1120            )
1121            .await?;
1122
1123        trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "sent query, awaiting response");
1124        let responses = rx
1125            .await
1126            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1127            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1128
1129        let header = header_rx
1130            .await
1131            .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1132        let data = Block::from_rows(blocks.collect(), header)?;
1133
1134        let (tx, rx) = oneshot::channel();
1135        let _ =
1136            connection.send_operation(Operation::Insert { data, response: tx }, qid, true).await?;
1137        rx.await.map_err(|_| {
1138            Error::Protocol(format!("Failed to receive response from insert {qid}"))
1139        })??;
1140
1141        // Decrement load balancer
1142        #[cfg(feature = "inner_pool")]
1143        connection.finish(conn_idx, Operation::<Block>::weight_query());
1144
1145        Ok(self.insert_response(responses, qid))
1146    }
1147
1148    /// Executes a `ClickHouse` query and streams deserialized rows.
1149    ///
1150    /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1151    /// each row is deserialized into type `T` implementing [`Row`]. Rows are grouped
1152    /// into `ClickHouse` blocks, and the stream yields rows as they are received. Use
1153    /// this method for type-safe access to query results in native format.
1154    ///
1155    /// Progress and profile events are dispatched to the client's event channel (see
1156    /// [`Client::subscribe_events`]).
1157    ///
1158    /// # Parameters
1159    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1160    /// - `qid`: Optional query ID for tracking and debugging.
1161    ///
1162    /// # Returns
1163    /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1164    /// rows of type `T`.
1165    ///
1166    /// # Errors
1167    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1168    /// - Fails if row deserialization fails (e.g., schema mismatch).
1169    /// - Fails if the connection to `ClickHouse` is interrupted.
1170    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1171    ///
1172    /// # Examples
1173    /// ```rust,ignore
1174    /// use clickhouse_arrow::prelude::*;
1175    ///
1176    /// let client = Client::builder()
1177    ///     .with_endpoint("localhost:9000")
1178    ///     .build_native()
1179    ///     .await
1180    ///     .unwrap();
1181    ///
1182    /// // Assume `MyRow` implements `Row`
1183    /// let mut response = client.query::<MyRow>("SELECT * FROM my_table", None).await.unwrap();
1184    /// while let Some(row) = response.next().await {
1185    ///     let row = row.unwrap();
1186    ///     println!("Row: {:?}", row);
1187    /// }
1188    /// ```
1189    #[instrument(
1190        name = "clickhouse.query",
1191        skip_all,
1192        fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1193    )]
1194    pub async fn query<T: Row + Send + 'static>(
1195        &self,
1196        query: impl Into<ParsedQuery>,
1197        qid: Option<Qid>,
1198    ) -> Result<ClickHouseResponse<T>> {
1199        self.query_params(query, None, qid).await
1200    }
1201
1202    /// Executes a `ClickHouse` query with parameters and streams deserialized rows.
1203    ///
1204    /// # Parameters
1205    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1206    /// - `params`: The query parameters to provide
1207    /// - `qid`: Optional query ID for tracking and debugging.
1208    ///
1209    /// # Returns
1210    /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1211    /// rows of type `T`.
1212    ///
1213    /// # Errors
1214    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1215    /// - Fails if row deserialization fails (e.g., schema mismatch).
1216    /// - Fails if the connection to `ClickHouse` is interrupted.
1217    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1218    ///
1219    /// # Examples
1220    /// ```rust,ignore
1221    /// use clickhouse_arrow::prelude::*;
1222    ///
1223    /// let client = Client::builder()
1224    ///     .with_endpoint("localhost:9000")
1225    ///     .build_native()
1226    ///     .await
1227    ///     .unwrap();
1228    ///
1229    /// // Assume `MyRow` implements `Row`
1230    /// let params = Some(vec![("name", ParamValue::from("my_table"))]);
1231    /// let query = "SELECT * FROM {name:Identifier}";
1232    /// let mut response = client.query_params::<MyRow>(query, params, None).await.unwrap();
1233    /// while let Some(row) = response.next().await {
1234    ///     let row = row.unwrap();
1235    ///     println!("Row: {:?}", row);
1236    /// }
1237    /// ```
1238    #[instrument(
1239        name = "clickhouse.query_params",
1240        skip_all,
1241        fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1242    )]
1243    pub async fn query_params<T: Row + Send + 'static>(
1244        &self,
1245        query: impl Into<ParsedQuery>,
1246        params: Option<QueryParams>,
1247        qid: Option<Qid>,
1248    ) -> Result<ClickHouseResponse<T>> {
1249        let (query, qid) = record_query(qid, query.into(), self.client_id);
1250        let raw = self.query_raw(query, params, qid).await?;
1251        Ok(ClickHouseResponse::new(Box::pin(raw.flat_map(|block| {
1252            match block {
1253                Ok(mut block) => stream::iter(
1254                    block
1255                        .take_iter_rows()
1256                        .filter(|x| !x.is_empty())
1257                        .map(T::deserialize_row)
1258                        .map(|maybe| maybe.inspect_err(|error| error!(?error, "deserializing row")))
1259                        .collect::<Vec<_>>(),
1260                ),
1261                Err(e) => stream::iter(vec![Err(e)]),
1262            }
1263        }))))
1264    }
1265
1266    /// Executes a `ClickHouse` query and returns the first row, discarding the rest.
1267    ///
1268    /// This method sends a query to `ClickHouse` and returns the first row deserialized
1269    /// into type `T` implementing [`Row`], or `None` if the result is empty. It is
1270    /// useful for queries expected to return a single row (e.g., `SELECT COUNT(*)`).
1271    /// For streaming multiple rows, use [`Client::query`].
1272    ///
1273    /// Progress and profile events are dispatched to the client's event channel (see
1274    /// [`Client::subscribe_events`]).
1275    ///
1276    /// # Parameters
1277    /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1278    /// - `qid`: Optional query ID for tracking and debugging.
1279    ///
1280    /// # Returns
1281    /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1282    /// `None` if no rows are returned.
1283    ///
1284    /// # Errors
1285    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1286    /// - Fails if row deserialization fails (e.g., schema mismatch).
1287    /// - Fails if the connection to `ClickHouse` is interrupted.
1288    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1289    ///
1290    /// # Examples
1291    /// ```rust,ignore
1292    /// use clickhouse_arrow::prelude::*;
1293    ///
1294    /// let client = Client::builder()
1295    ///     .with_endpoint("localhost:9000")
1296    ///     .build_native()
1297    ///     .await
1298    ///     .unwrap();
1299    ///
1300    /// // Assume `MyRow` implements `Row`
1301    /// let row = client.query_one::<MyRow>("SELECT name FROM users WHERE id = 1", None)
1302    ///     .await
1303    ///     .unwrap();
1304    /// if let Some(row) = row {
1305    ///     println!("Found row: {:?}", row);
1306    /// }
1307    /// ```
1308    #[instrument(
1309        name = "clickhouse.query_one",
1310        skip_all,
1311        fields(
1312            db.system = "clickhouse",
1313            db.operation = "query",
1314            db.format = NativeFormat::FORMAT,
1315            clickhouse.client.id = self.client_id,
1316            clickhouse.query.id
1317        )
1318    )]
1319    pub async fn query_one<T: Row + Send + 'static>(
1320        &self,
1321        query: impl Into<ParsedQuery>,
1322        qid: Option<Qid>,
1323    ) -> Result<Option<T>> {
1324        self.query_one_params(query, None, qid).await
1325    }
1326
1327    /// Executes a `ClickHouse` query with parameters and returns the first row, discarding the
1328    /// rest.
1329    ///
1330    /// # Parameters
1331    /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1332    /// - `params`: The query parameters to provide
1333    /// - `qid`: Optional query ID for tracking and debugging.
1334    ///
1335    /// # Returns
1336    /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1337    /// `None` if no rows are returned.
1338    ///
1339    /// # Errors
1340    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1341    /// - Fails if row deserialization fails (e.g., schema mismatch).
1342    /// - Fails if the connection to `ClickHouse` is interrupted.
1343    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1344    ///
1345    /// # Examples
1346    /// ```rust,ignore
1347    /// use clickhouse_arrow::prelude::*;
1348    ///
1349    /// let client = Client::builder()
1350    ///     .with_endpoint("localhost:9000")
1351    ///     .build_native()
1352    ///     .await
1353    ///     .unwrap();
1354    ///
1355    /// // Assume `MyRow` implements `Row`
1356    /// let params = Some(vec![
1357    ///     ("str", ParamValue::from("name")),
1358    /// ].into());
1359    /// let query = "SELECT {str:String} FROM users WHERE id = 1";
1360    /// let row = client.query_one_params::<MyRow>(query, params, None)
1361    ///     .await
1362    ///     .unwrap();
1363    /// if let Some(row) = row {
1364    ///     println!("Found row: {:?}", row);
1365    /// }
1366    /// ```
1367    #[instrument(
1368        name = "clickhouse.query_one_params",
1369        skip_all,
1370        fields(
1371            db.system = "clickhouse",
1372            db.operation = "query",
1373            db.format = NativeFormat::FORMAT,
1374            clickhouse.client.id = self.client_id,
1375            clickhouse.query.id
1376        )
1377    )]
1378    pub async fn query_one_params<T: Row + Send + 'static>(
1379        &self,
1380        query: impl Into<ParsedQuery>,
1381        params: Option<QueryParams>,
1382        qid: Option<Qid>,
1383    ) -> Result<Option<T>> {
1384        let mut stream = self.query_params::<T>(query, params, qid).await?;
1385        stream.next().await.transpose()
1386    }
1387
1388    /// Creates a `ClickHouse` table from a Rust struct that implements the `Row` trait.
1389    ///
1390    /// This method generates and executes a `CREATE TABLE` DDL statement based on the
1391    /// structure of the provided `Row` type. The table schema is automatically derived
1392    /// from the struct fields and their types.
1393    ///
1394    /// # Arguments
1395    /// * `database` - Optional database name. If None, uses the client's default database
1396    /// * `table` - The name of the table to create
1397    /// * `options` - Table creation options including engine type, order by, and partition by
1398    /// * `query_id` - Optional query ID for tracking and debugging
1399    ///
1400    /// # Type Parameters
1401    /// * `T` - A type that implements the `Row` trait, typically a struct with the `#[derive(Row)]`
1402    ///   macro
1403    ///
1404    /// # Example
1405    /// ```ignore
1406    /// #[derive(Row)]
1407    /// struct User {
1408    ///     id: u32,
1409    ///     name: String,
1410    ///     created_at: DateTime,
1411    /// }
1412    ///
1413    /// let options = CreateOptions::new("MergeTree")
1414    ///     .with_order_by(&["id"]);
1415    ///
1416    /// client.create_table::<User>(Some("analytics"), "users", &options, None).await?;
1417    /// ```
1418    ///
1419    /// # Errors
1420    /// - Returns an error if the table creation fails
1421    /// - Returns an error if the database/table names are invalid
1422    /// - Returns an error if the connection is lost
1423    #[instrument(
1424        name = "clickhouse.create_table",
1425        skip_all
1426        fields(
1427            db.system = "clickhouse",
1428            db.operation = "create.table",
1429            db.format = NativeFormat::FORMAT,
1430        )
1431    )]
1432    pub async fn create_table<T: Row>(
1433        &self,
1434        database: Option<&str>,
1435        table: &str,
1436        options: &CreateOptions,
1437        qid: Option<Qid>,
1438    ) -> Result<()> {
1439        let database = database.unwrap_or(self.connection.database());
1440        let stmt = create_table_statement_from_native::<T>(Some(database), table, options)?;
1441        self.execute(stmt, qid).await?;
1442        Ok(())
1443    }
1444}
1445
1446impl Client<ArrowFormat> {
1447    /// Executes a `ClickHouse` query and streams Arrow [`RecordBatch`] results.
1448    ///
1449    /// This method sends a query to `ClickHouse` and returns a stream of [`RecordBatch`]
1450    /// instances, each containing a chunk of the query results in Apache Arrow format.
1451    /// Use this method for efficient integration with Arrow-based data processing
1452    /// pipelines. For row-based access, consider [`Client::query_rows`].
1453    ///
1454    /// Progress and profile events are dispatched to the client's event channel (see
1455    /// [`Client::subscribe_events`]).
1456    ///
1457    /// # Parameters
1458    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1459    /// - `qid`: Optional query ID for tracking and debugging.
1460    ///
1461    /// # Returns
1462    /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1463    /// query results.
1464    ///
1465    /// # Errors
1466    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1467    /// - Fails if the connection to `ClickHouse` is interrupted.
1468    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1469    ///
1470    /// # Examples
1471    /// ```rust,ignore
1472    /// use clickhouse_arrow::prelude::*;
1473    ///
1474    /// let client = Client::builder()
1475    ///     .with_endpoint("localhost:9000")
1476    ///     .build_arrow()
1477    ///     .await
1478    ///     .unwrap();
1479    ///
1480    /// let mut response = client.query("SELECT * FROM my_table", None).await.unwrap();
1481    /// while let Some(batch) = response.next().await {
1482    ///     let batch = batch.unwrap();
1483    ///     println!("Received batch with {} rows", batch.num_rows());
1484    /// }
1485    /// ```
1486    #[instrument(
1487        skip_all,
1488        fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1489    )]
1490    pub async fn query(
1491        &self,
1492        query: impl Into<ParsedQuery>,
1493        qid: Option<Qid>,
1494    ) -> Result<ClickHouseResponse<RecordBatch>> {
1495        self.query_params(query, None, qid).await
1496    }
1497
1498    /// Executes a `ClickHouse` query with parameters and streams Arrow [`RecordBatch`] results.
1499    ///
1500    /// # Parameters
1501    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1502    /// - `params`: The query parameters to provide
1503    /// - `qid`: Optional query ID for tracking and debugging.
1504    ///
1505    /// # Returns
1506    /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1507    /// query results.
1508    ///
1509    /// # Errors
1510    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1511    /// - Fails if the connection to `ClickHouse` is interrupted.
1512    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1513    ///
1514    /// # Examples
1515    /// ```rust,ignore
1516    /// use clickhouse_arrow::prelude::*;
1517    ///
1518    /// let client = Client::builder()
1519    ///     .with_endpoint("localhost:9000")
1520    ///     .build_arrow()
1521    ///     .await
1522    ///     .unwrap();
1523    ///
1524    /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1525    /// let query = "SELECT * FROM {name:Identifier}";
1526    /// let mut response = client.query_params(query, params, None).await.unwrap();
1527    /// while let Some(batch) = response.next().await {
1528    ///     let batch = batch.unwrap();
1529    ///     println!("Received batch with {} rows", batch.num_rows());
1530    /// }
1531    /// ```
1532    #[instrument(
1533        skip_all,
1534        fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1535    )]
1536    pub async fn query_params(
1537        &self,
1538        query: impl Into<ParsedQuery>,
1539        params: Option<QueryParams>,
1540        qid: Option<Qid>,
1541    ) -> Result<ClickHouseResponse<RecordBatch>> {
1542        let (query, qid) = record_query(qid, query.into(), self.client_id);
1543        Ok(ClickHouseResponse::new(Box::pin(self.query_raw(query, params, qid).await?)))
1544    }
1545
1546    /// Executes a `ClickHouse` query and streams rows as column-major values.
1547    ///
1548    /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1549    /// each row is represented as a `Vec<Value>` containing column values. The data is
1550    /// transposed from Arrow [`RecordBatch`] format to row-major format, making it
1551    /// convenient for row-based processing. For direct Arrow access, use
1552    /// [`Client::query`].
1553    ///
1554    /// Progress and profile events are dispatched to the client's event channel (see
1555    /// [`Client::subscribe_events`]).
1556    ///
1557    /// # Parameters
1558    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1559    /// - `qid`: Optional query ID for tracking and debugging.
1560    ///
1561    /// # Returns
1562    /// A [`Result`] containing a [`ClickHouseResponse<Vec<Value>>`] that streams rows.
1563    ///
1564    /// # Errors
1565    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1566    /// - Fails if the connection to `ClickHouse` is interrupted.
1567    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1568    ///
1569    /// # Examples
1570    /// ```rust,ignore
1571    /// use clickhouse_arrow::prelude::*;
1572    ///
1573    /// let client = Client::builder()
1574    ///     .with_endpoint("localhost:9000")
1575    ///     .build_arrow()
1576    ///     .await
1577    ///     .unwrap();
1578    ///
1579    /// let mut response = client.query_rows("SELECT * FROM my_table", None).await.unwrap();
1580    /// while let Some(row) = response.next().await {
1581    ///     let row = row.unwrap();
1582    ///     println!("Row values: {:?}", row);
1583    /// }
1584    /// ```
1585    #[instrument(
1586        name = "clickhouse.query_rows",
1587        fields(
1588            db.system = "clickhouse",
1589            db.operation = "query",
1590            db.format = ArrowFormat::FORMAT,
1591            clickhouse.client.id = self.client_id,
1592            clickhouse.query.id
1593        ),
1594        skip_all
1595    )]
1596    pub async fn query_rows(
1597        &self,
1598        query: impl Into<ParsedQuery>,
1599        qid: Option<Qid>,
1600    ) -> Result<ClickHouseResponse<Vec<Value>>> {
1601        let (query, qid) = record_query(qid, query.into(), self.client_id);
1602        let connection = self.conn().await?;
1603
1604        // Create metadata channel
1605        let (tx, rx) = oneshot::channel();
1606        let (header_tx, header_rx) = oneshot::channel();
1607
1608        #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1609        let conn_idx = connection
1610            .send_operation(
1611                Operation::Query {
1612                    query,
1613                    settings: self.settings.clone(),
1614                    // TODO: Add arg for params
1615                    params: None,
1616                    response: tx,
1617                    header: Some(header_tx),
1618                },
1619                qid,
1620                true,
1621            )
1622            .await?;
1623
1624        trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
1625        let responses = rx
1626            .await
1627            .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1628            .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1629
1630        let header = header_rx
1631            .await
1632            .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1633
1634        let response = create_response_stream::<ArrowFormat>(responses, qid, self.client_id)
1635            .map(move |batch| (header.clone(), batch))
1636            .map(|(header, batch)| {
1637                let batch = batch?;
1638                let batch_iter = batch_to_rows(&batch, Some(&header))?;
1639                Ok::<_, Error>(stream::iter(batch_iter))
1640            })
1641            .try_flatten();
1642
1643        // Decrement load balancer
1644        #[cfg(feature = "inner_pool")]
1645        connection.finish(conn_idx, Operation::<RecordBatch>::weight_insert_many());
1646
1647        Ok(ClickHouseResponse::from_stream(response))
1648    }
1649
1650    /// Executes a `ClickHouse` query and returns the first column of the first batch.
1651    ///
1652    /// This method sends a query to `ClickHouse` and returns the first column of the
1653    /// first [`RecordBatch`] as an Arrow [`ArrayRef`], or `None` if the result is empty.
1654    /// It is useful for queries that return a single column (e.g., `SELECT id FROM
1655    /// my_table`). For full batch access, use [`Client::query`].
1656    ///
1657    /// Progress and profile events are dispatched to the client's event channel (see
1658    /// [`Client::subscribe_events`]).
1659    ///
1660    /// # Parameters
1661    /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1662    /// - `qid`: Optional query ID for tracking and debugging.
1663    ///
1664    /// # Returns
1665    /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1666    /// the first batch, or `None` if no data is returned.
1667    ///
1668    /// # Errors
1669    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1670    /// - Fails if the connection to `ClickHouse` is interrupted.
1671    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1672    ///
1673    /// # Examples
1674    /// ```rust,ignore
1675    /// use clickhouse_arrow::prelude::*;
1676    ///
1677    /// let client = Client::builder()
1678    ///     .with_endpoint("localhost:9000")
1679    ///     .build_arrow()
1680    ///     .await
1681    ///     .unwrap();
1682    ///
1683    /// let column = client.query_column("SELECT id FROM my_table", None)
1684    ///     .await
1685    ///     .unwrap();
1686    /// if let Some(col) = column {
1687    ///     println!("Column data: {:?}", col);
1688    /// }
1689    /// ```
1690    #[instrument(
1691        name = "clickhouse.query_column",
1692        skip_all,
1693        fields(
1694            db.system = "clickhouse",
1695            db.operation = "query",
1696            db.format = ArrowFormat::FORMAT,
1697            clickhouse.client.id = self.client_id,
1698            clickhouse.query.id
1699        )
1700    )]
1701    pub async fn query_column(
1702        &self,
1703        query: impl Into<ParsedQuery>,
1704        qid: Option<Qid>,
1705    ) -> Result<Option<ArrayRef>> {
1706        self.query_column_params(query, None, qid).await
1707    }
1708
1709    /// Executes a `ClickHouse` query with parameters and returns the first column of the first
1710    /// batch.
1711    ///
1712    /// # Parameters
1713    /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1714    /// - `params`: The query parameters to provide
1715    /// - `qid`: Optional query ID for tracking and debugging.
1716    ///
1717    /// # Returns
1718    /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1719    /// the first batch, or `None` if no data is returned.
1720    ///
1721    /// # Errors
1722    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1723    /// - Fails if the connection to `ClickHouse` is interrupted.
1724    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1725    ///
1726    /// # Examples
1727    /// ```rust,ignore
1728    /// use clickhouse_arrow::prelude::*;
1729    ///
1730    /// let client = Client::builder()
1731    ///     .with_endpoint("localhost:9000")
1732    ///     .build_arrow()
1733    ///     .await
1734    ///     .unwrap();
1735    ///
1736    /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1737    /// let query = "SELECT id FROM {name:Identifier}";
1738    /// let column = client.query_column_params("SELECT id FROM my_table", params, None)
1739    ///     .await
1740    ///     .unwrap();
1741    /// if let Some(col) = column {
1742    ///     println!("Column data: {:?}", col);
1743    /// }
1744    /// ```
1745    #[instrument(
1746        name = "clickhouse.query_column_params",
1747        skip_all,
1748        fields(
1749            db.system = "clickhouse",
1750            db.operation = "query",
1751            db.format = ArrowFormat::FORMAT,
1752            clickhouse.client.id = self.client_id,
1753            clickhouse.query.id
1754        )
1755    )]
1756    pub async fn query_column_params(
1757        &self,
1758        query: impl Into<ParsedQuery>,
1759        params: Option<QueryParams>,
1760        qid: Option<Qid>,
1761    ) -> Result<Option<ArrayRef>> {
1762        let mut stream = self.query_params(query, params, qid).await?;
1763        let Some(batch) = stream.next().await.transpose()? else {
1764            return Ok(None);
1765        };
1766
1767        if batch.num_rows() == 0 { Ok(None) } else { Ok(Some(Arc::clone(batch.column(0)))) }
1768    }
1769
1770    /// Executes a `ClickHouse` query and returns the first row as a [`RecordBatch`].
1771    ///
1772    /// This method sends a query to `ClickHouse` and returns the first row of the first
1773    /// [`RecordBatch`], or `None` if the result is empty. The returned [`RecordBatch`]
1774    /// contains a single row. It is useful for queries expected to return a single row
1775    /// (e.g., `SELECT * FROM users WHERE id = 1`). For streaming multiple rows, use
1776    /// [`Client::query`].
1777    ///
1778    /// Progress and profile events are dispatched to the client's event channel (see
1779    /// [`Client::subscribe_events`]).
1780    ///
1781    /// # Parameters
1782    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1783    /// - `qid`: Optional query ID for tracking and debugging.
1784    ///
1785    /// # Returns
1786    /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1787    /// `None` if no rows are returned.
1788    ///
1789    /// # Errors
1790    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1791    /// - Fails if the connection to `ClickHouse` is interrupted.
1792    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1793    ///
1794    /// # Examples
1795    /// ```rust,ignore
1796    /// use clickhouse_arrow::prelude::*;
1797    ///
1798    /// let client = Client::builder()
1799    ///     .with_endpoint("localhost:9000")
1800    ///     .build_arrow()
1801    ///     .await
1802    ///     .unwrap();
1803    ///
1804    /// let batch = client.query_one("SELECT * FROM users WHERE id = 1", None)
1805    ///     .await
1806    ///     .unwrap();
1807    /// if let Some(row) = batch {
1808    ///     println!("Row data: {:?}", row);
1809    /// }
1810    /// ```
1811    #[instrument(
1812        name = "clickhouse.query_one",
1813        skip_all
1814        fields(
1815            db.system = "clickhouse",
1816            db.operation = "query",
1817            db.format = ArrowFormat::FORMAT,
1818            clickhouse.client.id = self.client_id,
1819            clickhouse.query.id
1820        )
1821    )]
1822    pub async fn query_one(
1823        &self,
1824        query: impl Into<ParsedQuery>,
1825        qid: Option<Qid>,
1826    ) -> Result<Option<RecordBatch>> {
1827        self.query_one_params(query, None, qid).await
1828    }
1829
1830    /// Executes a `ClickHouse` query with parameters and returns the first row as a
1831    /// [`RecordBatch`].
1832    ///
1833    /// # Parameters
1834    /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1835    /// - `params`: The query parameters to provide
1836    /// - `qid`: Optional query ID for tracking and debugging.
1837    ///
1838    /// # Returns
1839    /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1840    /// `None` if no rows are returned.
1841    ///
1842    /// # Errors
1843    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1844    /// - Fails if the connection to `ClickHouse` is interrupted.
1845    /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1846    ///
1847    /// # Examples
1848    /// ```rust,ignore
1849    /// use clickhouse_arrow::prelude::*;
1850    ///
1851    /// let client = Client::builder()
1852    ///     .with_endpoint("localhost:9000")
1853    ///     .build_arrow()
1854    ///     .await
1855    ///     .unwrap();
1856    ///
1857    /// let params = Some(vec![("id", ParamValue::from(1))]);
1858    /// let batch = client.query_one_params("SELECT * FROM users WHERE id = {id:UInt64}", None)
1859    ///     .await
1860    ///     .unwrap();
1861    /// if let Some(row) = batch {
1862    ///     println!("Row data: {:?}", row);
1863    /// }
1864    /// ```
1865    #[instrument(
1866        name = "clickhouse.query_one_params",
1867        skip_all
1868        fields(
1869            db.system = "clickhouse",
1870            db.operation = "query",
1871            db.format = ArrowFormat::FORMAT,
1872            clickhouse.client.id = self.client_id,
1873            clickhouse.query.id
1874        )
1875    )]
1876    pub async fn query_one_params(
1877        &self,
1878        query: impl Into<ParsedQuery>,
1879        params: Option<QueryParams>,
1880        qid: Option<Qid>,
1881    ) -> Result<Option<RecordBatch>> {
1882        let stream = self.query_params(query, params, qid).await?;
1883        tokio::pin!(stream);
1884
1885        let Some(batch) = stream.next().await.transpose()? else {
1886            return Ok(None);
1887        };
1888
1889        if batch.num_rows() == 0 {
1890            Ok(None)
1891        } else {
1892            Ok(Some(take_record_batch(&batch, &arrow::array::UInt32Array::from(vec![0]))?))
1893        }
1894    }
1895
1896    /// Inserts a `RecordBatch` into `ClickHouse` by splitting it into smaller batches with a
1897    /// maximum number of rows.
1898    ///
1899    /// This method takes a `RecordBatch`, splits it into multiple `RecordBatch`es with at most
1900    /// `max` rows each, and inserts them into `ClickHouse` using the provided query. The last
1901    /// batch may have fewer than `max` rows if the total number of rows is not evenly divisible
1902    /// by `max`. It is useful for large inserts where row limits are needed to manage memory or
1903    /// server load. For unsplit inserts, use [`Client::insert`].
1904    ///
1905    /// Progress and profile events are dispatched to the client's event channel (see
1906    /// [`Client::subscribe_events`]).
1907    ///
1908    /// # Parameters
1909    /// - `query`: The SQL query to execute (e.g., `"INSERT INTO users VALUES"`).
1910    /// - `batch`: The `RecordBatch` containing the data to insert.
1911    /// - `max`: The maximum number of rows per split `RecordBatch`. Must be non-zero to avoid an
1912    ///   empty insert.
1913    /// - `qid`: Optional query ID for tracking and debugging.
1914    ///
1915    /// # Returns
1916    /// A [`Result`] containing a [`Stream`] of [`Result<()>`], where each item represents the
1917    /// completion of an individual batch insert. The stream yields `Ok(())` for each successful
1918    /// batch or an error if an insert fails.
1919    ///
1920    /// # Errors
1921    /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1922    /// - Fails if the connection to `ClickHouse` is interrupted.
1923    /// - Fails if `ClickHouse` returns an exception (e.g., table not found or schema mismatch).
1924    ///
1925    /// # Examples
1926    /// ```rust,ignore
1927    /// use clickhouse_arrow::prelude::*;
1928    /// use arrow::record_batch::RecordBatch;
1929    ///
1930    /// let client = Client::builder()
1931    ///     .with_endpoint("localhost:9000")
1932    ///     .build_arrow()
1933    ///     .await
1934    ///     .unwrap();
1935    ///
1936    /// // Assume `batch` is a RecordBatch with 10 rows
1937    /// let batch: RecordBatch = // ...;
1938    /// let stream = client.insert_max_rows("INSERT INTO users VALUES", batch, 3, None)
1939    ///     .await
1940    ///     .unwrap();
1941    ///
1942    /// let mut stream = std::pin::pin!(stream);
1943    /// while let Some(result) = stream.next().await {
1944    ///     result.unwrap(); // Handle each batch insert result
1945    /// }
1946    /// ```
1947    ///
1948    /// For a `RecordBatch` with 10 rows and `max = 3`, this inserts:
1949    /// * Batch 0: 3 rows
1950    /// * Batch 1: 3 rows
1951    /// * Batch 2: 3 rows
1952    /// * Batch 3: 1 row
1953    #[instrument(
1954        name = "clickhouse.insert_max_rows",
1955        skip_all
1956        fields(
1957            db.system = "clickhouse",
1958            db.operation = "insert",
1959            db.format = ArrowFormat::FORMAT,
1960            clickhouse.client.id = self.client_id,
1961            clickhouse.query.id
1962        )
1963    )]
1964    pub async fn insert_max_rows(
1965        &self,
1966        query: impl Into<ParsedQuery>,
1967        batch: RecordBatch,
1968        max: usize,
1969        qid: Option<Qid>,
1970    ) -> Result<impl Stream<Item = Result<()>> + '_> {
1971        let (query, qid) = record_query(qid, query.into(), self.client_id);
1972        let batches = crate::arrow::utils::split_record_batch(batch, max);
1973        self.insert_many(query, batches, Some(qid)).await
1974    }
1975
1976    /// Fetches the list of database names (schemas) in `ClickHouse`.
1977    ///
1978    /// This method queries `ClickHouse` to retrieve the names of all databases
1979    /// accessible to the client. It is useful for exploring the database structure or
1980    /// validating database existence before performing operations.
1981    ///
1982    /// # Parameters
1983    /// - `qid`: Optional query ID for tracking and debugging.
1984    ///
1985    /// # Returns
1986    /// A [`Result`] containing a `Vec<String>` of database names.
1987    ///
1988    /// # Errors
1989    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
1990    /// - Fails if the connection to `ClickHouse` is interrupted.
1991    ///
1992    /// # Examples
1993    /// ```rust,ignore
1994    /// use clickhouse_arrow::prelude::*;
1995    ///
1996    /// let client = Client::builder()
1997    ///     .with_endpoint("localhost:9000")
1998    ///     .build_arrow()
1999    ///     .await
2000    ///     .unwrap();
2001    ///
2002    /// let schemas = client.fetch_schemas(None).await.unwrap();
2003    /// println!("Databases: {:?}", schemas);
2004    /// ```
2005    #[instrument(
2006        name = "clickhouse.fetch_schemas",
2007        skip_all
2008        fields(
2009            db.system = "clickhouse",
2010            db.operation = "query",
2011            db.format = ArrowFormat::FORMAT,
2012            clickhouse.client.id = self.client_id,
2013            clickhouse.query.id
2014        )
2015    )]
2016    pub async fn fetch_schemas(&self, qid: Option<Qid>) -> Result<Vec<String>> {
2017        crate::arrow::schema::fetch_databases(self, qid).await
2018    }
2019
2020    /// Fetches all tables across all databases in `ClickHouse`.
2021    ///
2022    /// This method queries `ClickHouse` to retrieve a mapping of database names to
2023    /// their table names. It is useful for discovering the full schema structure of
2024    /// the `ClickHouse` instance.
2025    ///
2026    /// # Parameters
2027    /// - `qid`: Optional query ID for tracking and debugging.
2028    ///
2029    /// # Returns
2030    /// A [`Result`] containing a `HashMap<String, Vec<String>>`, where each key is a
2031    /// database name and the value is a list of table names in that database.
2032    ///
2033    /// # Errors
2034    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2035    /// - Fails if the connection to `ClickHouse` is interrupted.
2036    ///
2037    /// # Examples
2038    /// ```rust,ignore
2039    /// use clickhouse_arrow::prelude::*;
2040    ///
2041    /// let client = Client::builder()
2042    ///     .with_endpoint("localhost:9000")
2043    ///     .build_arrow()
2044    ///     .await
2045    ///     .unwrap();
2046    ///
2047    /// let tables = client.fetch_all_tables(None).await.unwrap();
2048    /// for (db, tables) in tables {
2049    ///     println!("Database {} has tables: {:?}", db, tables);
2050    /// }
2051    /// ```
2052    #[instrument(
2053        name = "clickhouse.fetch_all_tables",
2054        skip_all
2055        fields(
2056            db.system = "clickhouse",
2057            db.operation = "query",
2058            db.format = ArrowFormat::FORMAT,
2059            clickhouse.client.id = self.client_id,
2060            clickhouse.query.id
2061        )
2062    )]
2063    pub async fn fetch_all_tables(&self, qid: Option<Qid>) -> Result<HashMap<String, Vec<String>>> {
2064        crate::arrow::schema::fetch_all_tables(self, qid).await
2065    }
2066
2067    /// Fetches the list of table names in a specific `ClickHouse` database.
2068    ///
2069    /// This method queries `ClickHouse` to retrieve the names of all tables in the
2070    /// specified database (or the client's default database if `None`). It is useful
2071    /// for exploring the schema of a specific database.
2072    ///
2073    /// # Parameters
2074    /// - `database`: Optional database name. If `None`, uses the client's default database.
2075    /// - `qid`: Optional query ID for tracking and debugging.
2076    ///
2077    /// # Returns
2078    /// A [`Result`] containing a `Vec<String>` of table names.
2079    ///
2080    /// # Errors
2081    /// - Fails if the database does not exist or is inaccessible.
2082    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2083    /// - Fails if the connection to `ClickHouse` is interrupted.
2084    ///
2085    /// # Examples
2086    /// ```rust,ignore
2087    /// use clickhouse_arrow::prelude::*;
2088    ///
2089    /// let client = Client::builder()
2090    ///     .with_endpoint("localhost:9000")
2091    ///     .build_arrow()
2092    ///     .await
2093    ///     .unwrap();
2094    ///
2095    /// let tables = client.fetch_tables(Some("my_db"), None).await.unwrap();
2096    /// println!("Tables in my_db: {:?}", tables);
2097    /// ```
2098    #[instrument(
2099        name = "clickhouse.fetch_tables",
2100        skip_all
2101        fields(
2102            db.system = "clickhouse",
2103            db.operation = "query",
2104            db.format = ArrowFormat::FORMAT,
2105            clickhouse.client.id = self.client_id,
2106            clickhouse.query.id
2107        )
2108    )]
2109    pub async fn fetch_tables(
2110        &self,
2111        database: Option<&str>,
2112        qid: Option<Qid>,
2113    ) -> Result<Vec<String>> {
2114        let database = database.unwrap_or(self.connection.database());
2115        crate::arrow::schema::fetch_tables(self, database, qid).await
2116    }
2117
2118    /// Fetches the schema of specified tables in a `ClickHouse` database.
2119    ///
2120    /// This method queries `ClickHouse` to retrieve the Arrow schemas of the specified
2121    /// tables in the given database (or the client's default database if `None`). If
2122    /// the `tables` list is empty, it fetches schemas for all tables in the database.
2123    /// The result is a mapping of table names to their corresponding Arrow [`SchemaRef`].
2124    ///
2125    /// # Parameters
2126    /// - `database`: Optional database name. If `None`, uses the client's default database.
2127    /// - `tables`: A list of table names to fetch schemas for. An empty list fetches all tables.
2128    /// - `qid`: Optional query ID for tracking and debugging.
2129    ///
2130    /// # Returns
2131    /// A [`Result`] containing a `HashMap<String, SchemaRef>`, mapping table names to
2132    /// their schemas.
2133    ///
2134    /// # Errors
2135    /// - Fails if the database or any table does not exist or is inaccessible.
2136    /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2137    /// - Fails if the connection to `ClickHouse` is interrupted.
2138    ///
2139    /// # Examples
2140    /// ```rust,ignore
2141    /// use clickhouse_arrow::prelude::*;
2142    ///
2143    /// let client = Client::builder()
2144    ///     .with_endpoint("localhost:9000")
2145    ///     .build_arrow()
2146    ///     .await
2147    ///     .unwrap();
2148    ///
2149    /// let schemas = client.fetch_schema(Some("my_db"), &["my_table"], None)
2150    ///     .await
2151    ///     .unwrap();
2152    /// for (table, schema) in schemas {
2153    ///     println!("Table {} schema: {:?}", table, schema);
2154    /// }
2155    /// ```
2156    #[instrument(
2157        name = "clickhouse.fetch_schema",
2158        skip_all
2159        fields(
2160            db.system = "clickhouse",
2161            db.operation = "query",
2162            db.format = ArrowFormat::FORMAT,
2163            clickhouse.client.id = self.client_id,
2164            clickhouse.query.id
2165        )
2166    )]
2167    pub async fn fetch_schema(
2168        &self,
2169        database: Option<&str>,
2170        tables: &[&str],
2171        qid: Option<Qid>,
2172    ) -> Result<HashMap<String, SchemaRef>> {
2173        let database = database.unwrap_or(self.connection.database());
2174        let options = self.connection.metadata().arrow_options;
2175        crate::arrow::schema::fetch_schema(self, database, tables, qid, options).await
2176    }
2177
2178    /// Issues a `CREATE TABLE` DDL statement for a table using Arrow schema.
2179    ///
2180    /// Creates a table in the specified database (or the client's default database if
2181    /// `None`) based on the provided Arrow [`SchemaRef`]. The `options` parameter allows
2182    /// customization of table properties, such as engine type and partitioning. This
2183    /// method is specific to [`ArrowClient`] for seamless integration with Arrow-based
2184    /// data pipelines.
2185    ///
2186    /// # Parameters
2187    /// - `database`: Optional database name. If `None`, uses the client's default database.
2188    /// - `table`: Name of the table to create.
2189    /// - `schema`: The Arrow schema defining the table's structure.
2190    /// - `options`: Configuration for table creation (e.g., engine, partitioning).
2191    /// - `qid`: Optional query ID for tracking and debugging.
2192    ///
2193    /// # Returns
2194    /// A [`Result`] indicating success or failure of the operation.
2195    ///
2196    /// # Errors
2197    /// - Fails if the provided schema is invalid or incompatible with `ClickHouse`.
2198    /// - Fails if the database does not exist or is inaccessible.
2199    /// - Fails if the query execution encounters a `ClickHouse` error.
2200    ///
2201    /// # Examples
2202    /// ```rust,ignore
2203    /// use clickhouse_arrow::prelude::*;
2204    /// use arrow::datatypes::{Schema, SchemaRef};
2205    ///
2206    /// let client = Client::builder()
2207    ///     .with_endpoint("localhost:9000")
2208    ///     .build_arrow()
2209    ///     .await
2210    ///     .unwrap();
2211    ///
2212    /// // Assume `schema` is a valid Arrow schema
2213    /// let schema: SchemaRef = Arc::new(Schema::new(vec![/* ... */]));
2214    /// let options = CreateOptions::default();
2215    /// client.create_table(Some("my_db"), "my_table", &schema, &options, None)
2216    ///     .await
2217    ///     .unwrap();
2218    /// ```
2219    #[instrument(
2220        name = "clickhouse.create_table",
2221        skip_all
2222        fields(
2223            db.system = "clickhouse",
2224            db.operation = "create.table",
2225            db.format = ArrowFormat::FORMAT,
2226            clickhouse.client.id = self.client_id,
2227            clickhouse.query.id
2228        )
2229    )]
2230    pub async fn create_table(
2231        &self,
2232        database: Option<&str>,
2233        table: &str,
2234        schema: &SchemaRef,
2235        options: &CreateOptions,
2236        qid: Option<Qid>,
2237    ) -> Result<()> {
2238        let database = database.unwrap_or(self.connection.database());
2239        let arrow_options = self.connection.metadata().arrow_options;
2240        let stmt = create_table_statement_from_arrow(
2241            Some(database),
2242            table,
2243            schema,
2244            options,
2245            Some(arrow_options),
2246        )?;
2247        self.execute(stmt, qid).await?;
2248        Ok(())
2249    }
2250}
2251
2252impl<T: ClientFormat> Drop for Client<T> {
2253    fn drop(&mut self) {
2254        trace!({ ATT_CID } = self.client_id, "Client dropped");
2255    }
2256}
2257
2258/// Simple helper to log query id and client id
2259fn record_query(qid: Option<Qid>, query: ParsedQuery, cid: u16) -> (String, Qid) {
2260    let qid = qid.unwrap_or_default();
2261    let _ = Span::current().record(ATT_QID, tracing::field::display(qid));
2262    let query = query.0;
2263    trace!(query, { ATT_CID } = cid, "Querying clickhouse");
2264    (query, qid)
2265}