clickhouse_arrow/client.rs
1/// The `client` module provides the primary interface for interacting with `ClickHouse`
2/// over its native protocol, with full support for Apache Arrow interoperability.
3/// The main entry point is the [`Client`] struct, which supports both native `ClickHouse`
4/// data formats ([`NativeClient`]) and Arrow-compatible formats ([`ArrowClient`]).
5///
6/// This module is designed to be thread-safe, with [`Client`] instances that can be
7/// cloned and shared across threads. It supports querying, inserting data, managing
8/// database schemas, and handling `ClickHouse` events like progress and profiling.
9mod builder;
10mod chunk;
11#[cfg(feature = "cloud")]
12mod cloud;
13pub(crate) mod connection;
14mod internal;
15mod options;
16mod reader;
17mod response;
18mod tcp;
19mod writer;
20
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::sync::atomic::AtomicU16;
24
25use arrow::array::{ArrayRef, RecordBatch};
26use arrow::compute::take_record_batch;
27use arrow::datatypes::SchemaRef;
28use futures_util::{Stream, StreamExt, TryStreamExt, stream};
29use strum::AsRefStr;
30use tokio::sync::{broadcast, mpsc, oneshot};
31
32pub use self::builder::*;
33pub use self::connection::ConnectionStatus;
34pub(crate) use self::internal::{Message, Operation};
35pub use self::options::*;
36pub use self::response::*;
37pub use self::tcp::Destination;
38use crate::arrow::utils::batch_to_rows;
39use crate::constants::*;
40use crate::formats::{ClientFormat, NativeFormat};
41use crate::native::block::Block;
42use crate::native::protocol::{CompressionMethod, ProfileEvent};
43use crate::prelude::*;
44use crate::query::{ParsedQuery, QueryParams};
45use crate::schema::CreateOptions;
46use crate::{Error, Progress, Result, Row};
47
48static CLIENT_ID: AtomicU16 = AtomicU16::new(0);
49
50/// A `ClickHouse` client configured for the native format.
51///
52/// This type alias provides a client that works with `ClickHouse`'s internal
53/// data representation, useful when you need direct control over types
54/// and don't require Arrow integration.
55pub type NativeClient = Client<NativeFormat>;
56
57/// A `ClickHouse` client configured for Apache Arrow format.
58///
59/// This type alias provides a client that works with Arrow `RecordBatch`es,
60/// enabling seamless integration with the Arrow ecosystem for data processing
61/// and analytics workflows.
62pub type ArrowClient = Client<ArrowFormat>;
63
64/// Configuration for a `ClickHouse` connection, including tracing and cloud-specific settings.
65///
66/// This struct is used to pass optional context to [`Client::connect`], enabling features
67/// like distributed tracing or cloud instance tracking.
68///
69/// # Fields
70/// - `trace`: Optional tracing context for logging and monitoring.
71/// - `cloud`: Optional cloud-specific configuration (requires the `cloud` feature).
72#[derive(Debug, Clone, Default)]
73#[cfg_attr(not(feature = "cloud"), derive(Copy))]
74pub struct ConnectionContext {
75 pub trace: Option<TraceContext>,
76 #[cfg(feature = "cloud")]
77 pub cloud: Option<Arc<std::sync::atomic::AtomicBool>>,
78}
79
80/// Emitted clickhouse events from the underlying connection
81#[derive(Debug, Clone)]
82pub struct Event {
83 pub event: ClickHouseEvent,
84 pub qid: Qid,
85 pub client_id: u16,
86}
87
88/// Profile and progress events from clickhouse
89#[derive(Debug, Clone, AsRefStr)]
90pub enum ClickHouseEvent {
91 Progress(Progress),
92 Profile(Vec<ProfileEvent>),
93}
94
95/// A thread-safe handle for interacting with a `ClickHouse` database over its native protocol.
96///
97/// The `Client` struct is the primary interface for executing queries, inserting data, and
98/// managing database schemas. It supports two data formats:
99/// - [`NativeClient`]: Uses `ClickHouse`'s native [`Block`] format for data exchange.
100/// - [`ArrowClient`]: Uses Apache Arrow's [`RecordBatch`] for seamless interoperability with Arrow
101/// ecosystems.
102///
103/// `Client` instances are lightweight and can be cloned and shared across threads. Each instance
104/// maintains a reference to an underlying connection, which is managed automatically. The client
105/// also supports event subscription for receiving progress and profiling information from
106/// `ClickHouse`.
107///
108/// # Usage
109/// Create a `Client` using the [`ClientBuilder`] for a fluent configuration experience, or use
110/// [`Client::connect`] for direct connection setup.
111///
112/// # Examples
113/// ```rust,ignore
114/// use clickhouse_arrow::prelude::*;
115/// use clickhouse_arrow::arrow;
116/// use futures_util::StreamExt;
117///
118/// let client = Client::builder()
119/// .destination("localhost:9000")
120/// .username("default")
121/// .build::<ArrowFormat>()
122/// .await?;
123///
124/// // Execute a query
125/// let batch = client
126/// .query("SELECT 1")
127/// .await?
128/// .collect::<Vec<_>>()
129/// .await
130/// .into_iter()
131/// .collect::<Result<Vec<_>>>()?;
132/// arrow::util::pretty::print_batches(batch)?;
133/// ```
134#[derive(Clone, Debug)]
135pub struct Client<T: ClientFormat> {
136 pub client_id: u16,
137 connection: Arc<connection::Connection<T>>,
138 events: Arc<broadcast::Sender<Event>>,
139 settings: Option<Arc<Settings>>,
140}
141
142impl<T: ClientFormat> Client<T> {
143 /// Get an instance of [`ClientBuilder`] which allows creating a `Client` using a builder
144 /// Creates a new [`ClientBuilder`] for configuring and building a `ClickHouse` client.
145 ///
146 /// This method provides a fluent interface to set up a `Client` with custom connection
147 /// parameters, such as the server address, credentials, TLS, and compression. The
148 /// builder can create either a single [`Client`] or a connection pool (with the `pool`
149 /// feature enabled).
150 ///
151 /// Use this method when you need fine-grained control over the client configuration.
152 /// For simple connections, you can also use [`Client::connect`] directly.
153 ///
154 /// # Returns
155 /// A [`ClientBuilder`] instance ready for configuration.
156 ///
157 /// # Examples
158 /// ```rust,ignore
159 /// use clickhouse_arrow::prelude::*;
160 ///
161 /// let builder = Client::builder()
162 /// .with_endpoint("localhost:9000")
163 /// .with_username("default")
164 /// .with_password("");
165 /// ```
166 pub fn builder() -> ClientBuilder { ClientBuilder::new() }
167
168 /// Establishes a connection to a `ClickHouse` server over TCP, with optional TLS support.
169 ///
170 /// This method creates a new [`Client`] instance connected to the specified `destination`.
171 /// The connection can be configured using [`ClientOptions`], which allows setting parameters
172 /// like username, password, TLS, and compression. Optional `settings` can be provided to
173 /// customize `ClickHouse` session behavior, and a `context` can be used for tracing or
174 /// cloud-specific configurations.
175 ///
176 /// # Parameters
177 /// - `destination`: The `ClickHouse` server address (e.g., `"localhost:9000"` or a
178 /// [`Destination`]).
179 /// - `options`: Configuration for the connection, including credentials, TLS, and cloud
180 /// settings.
181 /// - `settings`: Optional `ClickHouse` session settings (e.g., query timeouts, max rows).
182 /// - `context`: Optional connection context for tracing or cloud-specific behavior.
183 ///
184 /// # Returns
185 /// A [`Result`] containing the connected [`Client`] instance, or an error if the connection
186 /// fails.
187 ///
188 /// # Errors
189 /// - Fails if the destination cannot be resolved or the connection cannot be established.
190 /// - Fails if authentication or TLS setup encounters an issue.
191 ///
192 /// # Examples
193 /// ```rust,ignore
194 /// use clickhouse_arrow::client::{Client, ClientOptions};
195 ///
196 /// let options = ClientOptions::default()
197 /// .username("default")
198 /// .password("")
199 /// .use_tls(false);
200 ///
201 /// let client = Client::connect("localhost:9000", options, None, None).await?;
202 /// ```
203 #[instrument(
204 level = "debug",
205 name = "clickhouse.connect",
206 fields(
207 db.system = "clickhouse",
208 db.format = T::FORMAT,
209 network.transport = ?if options.use_tls { "tls" } else { "tcp" }
210 ),
211 skip_all
212 )]
213 pub async fn connect<A: Into<Destination>>(
214 destination: A,
215 options: ClientOptions,
216 settings: Option<Arc<Settings>>,
217 context: Option<ConnectionContext>,
218 ) -> Result<Self> {
219 let context = context.unwrap_or_default();
220 let trace_ctx = context.trace.unwrap_or_default();
221 let _ = trace_ctx.link(&Span::current());
222
223 let client_id = CLIENT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
224
225 // Resolve the destination
226 let destination: Destination = destination.into();
227 let addrs = destination.resolve(options.ipv4_only).await?;
228
229 #[cfg(feature = "cloud")]
230 {
231 // Ping the cloud instance if requested
232 if let Some(domain) = options.domain.as_ref().filter(|_| options.ext.cloud.wakeup) {
233 let cloud_track = context.cloud.as_deref();
234 Self::ping_cloud(domain, options.ext.cloud.timeout, cloud_track).await;
235 }
236 }
237
238 if let Some(addr) = addrs.first() {
239 let _ = Span::current()
240 .record("server.address", tracing::field::debug(&addr.ip()))
241 .record("server.port", addr.port());
242 debug!(server.address = %addr.ip(), server.port = addr.port(), "Initiating connection");
243 }
244
245 let (event_tx, _) = broadcast::channel(EVENTS_CAPACITY);
246 let events = Arc::new(event_tx);
247 let conn_ev = Arc::clone(&events);
248
249 let conn =
250 connection::Connection::connect(client_id, addrs, options, conn_ev, trace_ctx).await?;
251 let connection = Arc::new(conn);
252
253 debug!("created connection successfully");
254
255 Ok(Client { client_id, connection, events, settings })
256 }
257
258 /// Retrieves the status of the underlying `ClickHouse` connection.
259 ///
260 /// This method returns the current [`ConnectionStatus`] of the client's connection,
261 /// indicating whether it is active, idle, or disconnected. Useful for monitoring
262 /// the health of the connection before executing queries.
263 ///
264 /// # Returns
265 /// A [`ConnectionStatus`] enum describing the connection state.
266 ///
267 /// # Examples
268 /// ```rust,ignore
269 /// use clickhouse_arrow::prelude::*;
270 ///
271 /// let client = Client::<ArrowFormat>::builder()
272 /// .with_endpoint("localhost:9000")
273 /// .build()
274 /// .await
275 /// .unwrap();
276 ///
277 /// let status = client.status();
278 /// println!("Connection status: {status:?}");
279 /// ```
280 pub fn status(&self) -> ConnectionStatus { self.connection.status() }
281
282 /// Subscribes to progress and profile events from `ClickHouse` queries.
283 ///
284 /// This method returns a [`broadcast::Receiver`] that delivers [`Event`] instances
285 /// containing progress updates ([`Progress`]) or profiling information ([`ProfileEvent`])
286 /// as queries execute. Events are generated asynchronously and can be used to monitor
287 /// query execution in real time.
288 ///
289 /// # Returns
290 /// A [`broadcast::Receiver<Event>`] for receiving `ClickHouse` events.
291 ///
292 /// # Examples
293 /// ```rust,ignore
294 /// use clickhouse_arrow::prelude::*;
295 /// use tokio::sync::broadcast::error::RecvError;
296 ///
297 /// let client = Client::builder()
298 /// .with_endpoint("localhost:9000")
299 /// .build_arrow()
300 /// .await
301 /// .unwrap();
302 ///
303 /// let mut receiver = client.subscribe_events();
304 /// let handle = tokio::spawn(async move {
305 /// while let Ok(event) = receiver.recv().await {
306 /// println!("Received event: {:?}", event);
307 /// }
308 /// });
309 ///
310 /// // Execute a query to generate events
311 /// client.query("SELECT * FROM large_table").await.unwrap();
312 /// ```
313 pub fn subscribe_events(&self) -> broadcast::Receiver<Event> { self.events.subscribe() }
314
315 /// Checks the health of the underlying `ClickHouse` connection.
316 ///
317 /// This method verifies that the connection is active and responsive. If `ping` is
318 /// `true`, it sends a lightweight ping to the `ClickHouse` server to confirm
319 /// connectivity. Otherwise, it checks the connection's internal state.
320 ///
321 /// # Parameters
322 /// - `ping`: If `true`, performs an active ping to the server; if `false`, checks the
323 /// connection state without network activity.
324 ///
325 /// # Returns
326 /// A [`Result`] indicating whether the connection is healthy.
327 ///
328 /// # Errors
329 /// - Fails if the connection is disconnected or unresponsive.
330 /// - Fails if the ping operation times out or encounters a network error.
331 ///
332 /// # Examples
333 /// ```rust,ignore
334 /// use clickhouse_arrow::prelude::*;
335 ///
336 /// let client = Client::builder()
337 /// .with_endpoint("localhost:9000")
338 /// .build::<ArrowFormat>()
339 /// .await
340 /// .unwrap();
341 ///
342 /// client.health_check(true).await.unwrap();
343 /// println!("Connection is healthy!");
344 /// ```
345 pub async fn health_check(&self, ping: bool) -> Result<()> {
346 trace!({ ATT_CID } = self.client_id, "sending health check w/ ping={ping}");
347 self.conn().await?.check_connection(ping).await
348 }
349
350 /// Shuts down the `ClickHouse` client and closes its connection.
351 ///
352 /// This method gracefully terminates the underlying connection, ensuring that any
353 /// pending operations are completed or canceled. After shutdown, the client cannot
354 /// be used for further operations.
355 ///
356 /// # Returns
357 /// A [`Result`] indicating whether the shutdown was successful.
358 ///
359 /// # Errors
360 /// - Fails if the connection cannot be closed due to network issues or internal errors.
361 ///
362 /// # Examples
363 /// ```rust,ignore
364 /// use clickhouse_arrow::prelude::*;
365 ///
366 /// let client = Client::builder()
367 /// .with_endpoint("localhost:9000")
368 /// .build::<ArrowFormat>()
369 /// .await
370 /// .unwrap();
371 ///
372 /// client.shutdown().await.unwrap();
373 /// println!("Client shut down successfully!");
374 /// ```
375 pub async fn shutdown(&self) -> Result<()> {
376 trace!("shutting down client");
377 self.conn().await?.shutdown().await
378 }
379
380 /// Inserts a block of data into `ClickHouse` using the native protocol.
381 ///
382 /// This method sends an insert query with a single block of data, formatted according to
383 /// the client's data format (`T: ClientFormat`). For [`NativeClient`], the data is a
384 /// [`Block`]; for [`ArrowClient`], it is a [`RecordBatch`]. The query is executed
385 /// asynchronously, and any response data, progress events, or errors are streamed back.
386 ///
387 /// Progress and profile events are dispatched to the client's event channel (see
388 /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
389 /// error if the insert fails.
390 ///
391 /// # Parameters
392 /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
393 /// - `block`: The data to insert, in the format specified by `T` ([`Block`] or
394 /// [`RecordBatch`]).
395 /// - `qid`: Optional query ID for tracking and debugging.
396 ///
397 /// # Returns
398 /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
399 /// the success or failure of processing response data.
400 ///
401 /// # Errors
402 /// - Fails if the query is malformed or the data format is invalid.
403 /// - Fails if the connection to `ClickHouse` is interrupted.
404 /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
405 ///
406 /// # Examples
407 /// ```rust,ignore
408 /// use clickhouse_arrow::prelude::*;
409 /// use arrow::record_batch::RecordBatch;
410 ///
411 /// let client = Client::builder()
412 /// .destination("localhost:9000")
413 /// .build_arrow()
414 /// .await?;
415 ///
416 /// let qid = Qid::new();
417 /// // Assume `batch` is a valid RecordBatch
418 /// let batch: RecordBatch = // ...;
419 /// let stream = client.insert("INSERT INTO my_table VALUES", batch, Some(qid)).await?;
420 /// while let Some(result) = stream.next().await {
421 /// result?; // Check for errors
422 /// }
423 /// ```
424 #[instrument(
425 level = "trace",
426 name = "clickhouse.insert",
427 skip_all
428 fields(
429 db.system = "clickhouse",
430 db.operation = "insert",
431 db.format = T::FORMAT,
432 clickhouse.client.id = self.client_id,
433 clickhouse.query.id
434 ),
435 )]
436 pub async fn insert(
437 &self,
438 query: impl Into<ParsedQuery>,
439 block: T::Data,
440 qid: Option<Qid>,
441 ) -> Result<impl Stream<Item = Result<()>> + '_> {
442 let (query, qid) = record_query(qid, query.into(), self.client_id);
443
444 // Create metadata channel
445 let (tx, rx) = oneshot::channel();
446 let connection = self.conn().await?;
447
448 // Send query
449 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
450 let conn_idx = connection
451 .send_operation(
452 Operation::Query {
453 query,
454 settings: self.settings.clone(),
455 params: None,
456 response: tx,
457 header: None,
458 },
459 qid,
460 false,
461 )
462 .await?;
463
464 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
465 let responses = rx
466 .await
467 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
468 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
469
470 // Send data
471 let (tx, rx) = oneshot::channel();
472 let _ = connection
473 .send_operation(Operation::Insert { data: block, response: tx }, qid, true)
474 .await?;
475 rx.await.map_err(|_| {
476 Error::Protocol(format!("Failed to receive response from insert {qid}"))
477 })??;
478
479 // Decrement load balancer
480 #[cfg(feature = "inner_pool")]
481 connection.finish(conn_idx, Operation::<T::Data>::weight_insert());
482
483 Ok(self.insert_response(responses, qid))
484 }
485
486 /// Inserts multiple blocks of data into `ClickHouse` using the native protocol.
487 ///
488 /// This method sends an insert query with a collection of data blocks, formatted
489 /// according to the client's data format (`T: ClientFormat`). For [`NativeClient`],
490 /// the data is a `Vec<Block>`; for [`ArrowClient`], it is a `Vec<RecordBatch>`.
491 /// The query is executed asynchronously, and any response data, progress events,
492 /// or errors are streamed back.
493 ///
494 /// Progress and profile events are dispatched to the client's event channel (see
495 /// [`Client::subscribe_events`]). The returned stream yields `()` on success or an
496 /// error if the insert fails. Use this method when inserting multiple batches of
497 /// data to reduce overhead compared to multiple [`Client::insert`] calls.
498 ///
499 /// # Parameters
500 /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
501 /// - `batch`: A vector of data blocks to insert, in the format specified by `T`.
502 /// - `qid`: Optional query ID for tracking and debugging.
503 ///
504 /// # Returns
505 /// A [`Result`] containing a stream of [`Result<()>`], where each item indicates
506 /// the success or failure of processing response data.
507 ///
508 /// # Errors
509 /// - Fails if the query is malformed or any data block is invalid.
510 /// - Fails if the connection to `ClickHouse` is interrupted.
511 /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
512 ///
513 /// # Examples
514 /// ```rust,ignore
515 /// use clickhouse_arrow::prelude::*;
516 /// use arrow::record_batch::RecordBatch;
517 ///
518 /// let client = Client::builder()
519 /// .with_endpoint("localhost:9000")
520 /// .build::<ArrowFormat>()
521 /// .await
522 /// .unwrap();
523 ///
524 /// // Assume `batches` is a Vec<RecordBatch>
525 /// let batches: Vec<RecordBatch> = vec![/* ... */];
526 /// let stream = client.insert_many("INSERT INTO my_table VALUES", batches, None).await.unwrap();
527 /// while let Some(result) = stream.next().await {
528 /// result.unwrap(); // Check for errors
529 /// }
530 /// ```
531 #[instrument(
532 name = "clickhouse.insert_many",
533 skip_all,
534 fields(
535 db.system = "clickhouse",
536 db.operation = "insert",
537 db.format = T::FORMAT,
538 clickhouse.client.id = self.client_id,
539 clickhouse.query.id
540 ),
541 )]
542 pub async fn insert_many(
543 &self,
544 query: impl Into<ParsedQuery>,
545 batch: Vec<T::Data>,
546 qid: Option<Qid>,
547 ) -> Result<impl Stream<Item = Result<()>> + '_> {
548 let (query, qid) = record_query(qid, query.into(), self.client_id);
549
550 // Create metadata channel
551 let (tx, rx) = oneshot::channel();
552 let connection = self.conn().await?;
553
554 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
555 let conn_idx = connection
556 .send_operation(
557 Operation::Query {
558 query,
559 settings: self.settings.clone(),
560 params: None,
561 response: tx,
562 header: None,
563 },
564 qid,
565 false,
566 )
567 .await?;
568
569 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
570 let responses = rx
571 .await
572 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
573 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
574
575 // Send data
576 let (tx, rx) = oneshot::channel();
577 let _ = connection
578 .send_operation(Operation::InsertMany { data: batch, response: tx }, qid, true)
579 .await?;
580 rx.await.map_err(|_| {
581 Error::Protocol(format!("Failed to receive response from insert {qid}"))
582 })??;
583
584 // Decrement load balancer
585 #[cfg(feature = "inner_pool")]
586 connection.finish(conn_idx, Operation::<T::Data>::weight_insert_many());
587
588 Ok(self.insert_response(responses, qid))
589 }
590
591 /// Executes a raw `ClickHouse` query and streams raw data in the client's format.
592 ///
593 /// This method sends a query to `ClickHouse` and returns a stream of raw data blocks
594 /// in the format specified by `T: ClientFormat` ([`Block`] for [`NativeClient`],
595 /// [`RecordBatch`] for [`ArrowClient`]). It is a low-level method suitable for
596 /// custom processing of query results. For higher-level interfaces, consider
597 /// [`Client::query`] or [`Client::query_rows`].
598 ///
599 /// Progress and profile events are dispatched to the client's event channel (see
600 /// [`Client::subscribe_events`]).
601 ///
602 /// # Parameters
603 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
604 /// - `qid`: A unique query ID for tracking and debugging.
605 ///
606 /// # Returns
607 /// A [`Result`] containing a stream of [`Result<T::Data>`], where each item is a
608 /// data block or an error.
609 ///
610 /// # Errors
611 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
612 /// - Fails if the connection to `ClickHouse` is interrupted.
613 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
614 ///
615 /// # Examples
616 /// ```rust,ignore
617 /// use clickhouse_arrow::prelude::*;
618 ///
619 /// let client = Client::builder()
620 /// .with_endpoint("localhost:9000")
621 /// .build::<ArrowFormat>()
622 /// .await
623 /// .unwrap();
624 ///
625 /// let qid = Qid::new();
626 /// let mut stream = client.query_raw("SELECT * FROM my_table", qid).await.unwrap();
627 /// while let Some(block) = stream.next().await {
628 /// let batch = block.unwrap();
629 /// println!("Received batch with {} rows", batch.num_rows());
630 /// }
631 /// ```
632 #[instrument(
633 name = "clickhouse.query",
634 skip_all
635 fields(
636 db.system = "clickhouse",
637 db.operation = "query",
638 db.format = T::FORMAT,
639 clickhouse.client.id = self.client_id,
640 clickhouse.query.id = %qid
641 ),
642 )]
643 pub async fn query_raw<P: Into<QueryParams>>(
644 &self,
645 query: String,
646 params: Option<P>,
647 qid: Qid,
648 ) -> Result<impl Stream<Item = Result<T::Data>> + 'static> {
649 // Create metadata channel
650 let (tx, rx) = oneshot::channel();
651 let connection = self.conn().await?;
652
653 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
654 let conn_idx = connection
655 .send_operation(
656 Operation::Query {
657 query,
658 settings: self.settings.clone(),
659 params: params.map(Into::into),
660 response: tx,
661 header: None,
662 },
663 qid,
664 true,
665 )
666 .await?;
667
668 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
669
670 let responses = rx
671 .await
672 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
673 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
674 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
675
676 // Decrement load balancer
677 #[cfg(feature = "inner_pool")]
678 connection.finish(conn_idx, Operation::<T::Data>::weight_query());
679
680 Ok(create_response_stream::<T>(responses, qid, self.client_id))
681 }
682
683 /// Executes a `ClickHouse` query and discards all returned data.
684 ///
685 /// This method sends a query to `ClickHouse` and processes the response stream to
686 /// check for errors, but discards any returned data blocks. It is useful for
687 /// queries that modify data (e.g., `INSERT`, `UPDATE`, `DELETE`) or DDL statements
688 /// where the result data is not needed. For queries that return data, use
689 /// [`Client::query`] or [`Client::query_raw`].
690 ///
691 /// # Parameters
692 /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
693 /// - `qid`: Optional query ID for tracking and debugging.
694 ///
695 /// # Returns
696 /// A [`Result`] indicating whether the query executed successfully.
697 ///
698 /// # Errors
699 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
700 /// - Fails if the connection to `ClickHouse` is interrupted.
701 /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
702 ///
703 /// # Examples
704 /// ```rust,ignore
705 /// use clickhouse_arrow::prelude::*;
706 ///
707 /// let client = Client::builder()
708 /// .with_endpoint("localhost:9000")
709 /// .build_arrow()
710 /// .await
711 /// .unwrap();
712 ///
713 /// client.execute("DROP TABLE IF EXISTS my_table", None).await.unwrap();
714 /// println!("Table dropped successfully!");
715 /// ```
716 #[instrument(
717 name = "clickhouse.execute",
718 skip_all,
719 fields(
720 db.system = "clickhouse",
721 db.format = T::FORMAT,
722 db.operation = "query",
723 clickhouse.client.id = self.client_id,
724 clickhouse.query.id
725 )
726 )]
727 pub async fn execute(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
728 self.execute_params(query, None::<QueryParams>, qid).await
729 }
730
731 /// Executes a `ClickHouse` query with query parameters and discards all returned data.
732 ///
733 /// # Parameters
734 /// - `query`: The SQL query to execute (e.g., `"DROP TABLE my_table"`).
735 /// - `params`: The query parameters to provide
736 /// - `qid`: Optional query ID for tracking and debugging.
737 ///
738 /// # Returns
739 /// A [`Result`] indicating whether the query executed successfully.
740 ///
741 /// # Errors
742 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
743 /// - Fails if the connection to `ClickHouse` is interrupted.
744 /// - Fails if `ClickHouse` returns an exception (e.g., permission denied).
745 ///
746 /// # Examples
747 /// ```rust,ignore
748 /// use clickhouse_arrow::prelude::*;
749 ///
750 /// let client = Client::builder()
751 /// .with_endpoint("localhost:9000")
752 /// .build_arrow()
753 /// .await
754 /// .unwrap();
755 ///
756 /// let params = Some(vec![
757 /// ("str", ParamValue::from("hello")),
758 /// ("num", ParamValue::from(42)),
759 /// ("array", ParamValue::from("['a', 'b', 'c']")),
760 /// ]);
761 /// let query = "SELECT {num:Int64}, {str:String}, {array:Array(String)}";
762 /// client.execute_params(query, params, None).await.unwrap();
763 /// println!("Table dropped successfully!");
764 /// ```
765 #[instrument(
766 name = "clickhouse.execute_params",
767 skip_all,
768 fields(
769 db.system = "clickhouse",
770 db.format = T::FORMAT,
771 db.operation = "query",
772 clickhouse.client.id = self.client_id,
773 clickhouse.query.id
774 )
775 )]
776 pub async fn execute_params<P: Into<QueryParams>>(
777 &self,
778 query: impl Into<ParsedQuery>,
779 params: Option<P>,
780 qid: Option<Qid>,
781 ) -> Result<()> {
782 let (query, qid) = record_query(qid, query.into(), self.client_id);
783 let stream = self.query_raw(query, params, qid).await?;
784 tokio::pin!(stream);
785 while let Some(next) = stream.next().await {
786 drop(next?);
787 }
788 Ok(())
789 }
790
791 /// Executes a `ClickHouse` query without processing the response stream.
792 ///
793 /// This method sends a query to `ClickHouse` and immediately discards the response
794 /// stream without checking for errors or processing data. It is a lightweight
795 /// alternative to [`Client::execute`], suitable for fire-and-forget scenarios where
796 /// the query's outcome is not critical. For safer execution, use [`Client::execute`].
797 ///
798 /// # Parameters
799 /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
800 /// - `qid`: Optional query ID for tracking and debugging.
801 ///
802 /// # Returns
803 /// A [`Result`] indicating whether the query was sent successfully.
804 ///
805 /// # Errors
806 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
807 /// - Fails if the connection to `ClickHouse` is interrupted.
808 ///
809 /// # Examples
810 /// ```rust,ignore
811 /// use clickhouse_arrow::prelude::*;
812 ///
813 /// let client = Client::builder()
814 /// .with_endpoint("localhost:9000")
815 /// .build::<ArrowFormat>()
816 /// .await
817 /// .unwrap();
818 ///
819 /// client.execute_now("INSERT INTO logs VALUES ('event')", None).await.unwrap();
820 /// println!("Log event sent!");
821 /// ```
822 #[instrument(
823 name = "clickhouse.execute_now",
824 skip_all,
825 fields(
826 db.system = "clickhouse",
827 db.format = T::FORMAT,
828 db.operation = "query",
829 clickhouse.client.id = self.client_id,
830 clickhouse.query.id
831 )
832 )]
833 pub async fn execute_now(&self, query: impl Into<ParsedQuery>, qid: Option<Qid>) -> Result<()> {
834 self.execute_now_params(query, None::<QueryParams>, qid).await
835 }
836
837 /// Executes a `ClickHouse` query with query parameters without processing the response stream.
838 ///
839 /// # Parameters
840 /// - `query`: The SQL query to execute (e.g., `"INSERT INTO my_table VALUES (1)"`).
841 /// - `params`: The query parameters to provide
842 /// - `qid`: Optional query ID for tracking and debugging.
843 ///
844 /// # Returns
845 /// A [`Result`] indicating whether the query was sent successfully.
846 ///
847 /// # Errors
848 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
849 /// - Fails if the connection to `ClickHouse` is interrupted.
850 ///
851 /// # Examples
852 /// ```rust,ignore
853 /// use clickhouse_arrow::prelude::*;
854 ///
855 /// let client = Client::builder()
856 /// .with_endpoint("localhost:9000")
857 /// .build::<ArrowFormat>()
858 /// .await
859 /// .unwrap();
860 ///
861 ///
862 /// let params = Some(vec![("str", ParamValue::from("hello"))]);
863 /// let query = "INSERT INTO logs VALUES ({str:String})";
864 /// client.execute_now_params(query, params, None).await.unwrap();
865 /// println!("Log event sent!");
866 /// ```
867 #[instrument(
868 name = "clickhouse.execute_now_params",
869 skip_all,
870 fields(
871 db.system = "clickhouse",
872 db.format = T::FORMAT,
873 db.operation = "query",
874 clickhouse.client.id = self.client_id,
875 clickhouse.query.id
876 )
877 )]
878 pub async fn execute_now_params<P: Into<QueryParams>>(
879 &self,
880 query: impl Into<ParsedQuery>,
881 params: Option<P>,
882 qid: Option<Qid>,
883 ) -> Result<()> {
884 let (query, qid) = record_query(qid, query.into(), self.client_id);
885 drop(self.query_raw(query, params, qid).await?);
886 Ok(())
887 }
888
889 /// Creates a new database in `ClickHouse` using a DDL statement.
890 ///
891 /// This method issues a `CREATE DATABASE` statement for the specified database. If no
892 /// database is provided, it uses the client's default database from the connection
893 /// metadata. The `default` database cannot be created, as it is reserved by `ClickHouse`.
894 ///
895 /// # Parameters
896 /// - `database`: Optional name of the database to create. If `None`, uses the client's default
897 /// database.
898 /// - `qid`: Optional query ID for tracking and debugging.
899 ///
900 /// # Returns
901 /// A [`Result`] indicating success or failure of the operation.
902 ///
903 /// # Errors
904 /// - Fails if the database name is invalid or reserved (e.g., `default`).
905 /// - Fails if the query execution encounters a `ClickHouse` error.
906 /// - Fails if the connection is interrupted.
907 ///
908 /// # Examples
909 /// ```rust,ignore
910 /// use clickhouse_arrow::client::{Client, ClientBuilder};
911 ///
912 /// let client = ClientBuilder::new()
913 /// .destination("localhost:9000")
914 /// .build_native()
915 /// .await?;
916 ///
917 /// client.create_database(Some("my_db"), None).await?;
918 /// ```
919 #[instrument(
920 name = "clickhouse.create_database",
921 skip_all
922 fields(db.system = "clickhouse", db.operation = "create.database")
923 )]
924 pub async fn create_database(&self, database: Option<&str>, qid: Option<Qid>) -> Result<()> {
925 let database = database.unwrap_or(self.connection.database());
926 let database = database.to_lowercase();
927 if &database == "default" {
928 warn!("Exiting, cannot create `default` database");
929 return Ok(());
930 }
931
932 let stmt = create_db_statement(&database)?;
933 self.execute(stmt, qid).await?;
934 Ok(())
935 }
936
937 /// Drops a database in `ClickHouse` using a DDL statement.
938 ///
939 /// This method issues a `DROP DATABASE` statement for the specified database. The
940 /// `default` database cannot be dropped, as it is reserved by `ClickHouse`. If the client
941 /// is connected to a non-default database, dropping a different database is not allowed
942 /// to prevent accidental data loss.
943 ///
944 /// # Parameters
945 /// - `database`: Name of the database to drop.
946 /// - `sync`: If `true`, the operation waits for `ClickHouse` to complete the drop
947 /// synchronously.
948 /// - `qid`: Optional query ID for tracking and debugging.
949 ///
950 /// # Returns
951 /// A [`Result`] indicating success or failure of the operation.
952 ///
953 /// # Errors
954 /// - Fails if the database is `default` (reserved).
955 /// - Fails if the client is connected to a non-default database different from `database`.
956 /// - Fails if the query execution encounters a `ClickHouse` error.
957 ///
958 /// # Examples
959 /// ```rust,ignore
960 /// use clickhouse_arrow::prelude::*;
961 ///
962 /// let client = Client::builder()
963 /// .destination("localhost:9000")
964 /// .database("default") // Must be connected to default to drop 'other' databases
965 /// .build::<NativeFormat>()
966 /// .await?;
967 ///
968 /// client.drop_database("my_db", true, None).await?;
969 /// ```
970 #[instrument(
971 name = "clickhouse.drop_database",
972 skip_all
973 fields(db.system = "clickhouse", db.operation = "drop.database")
974 )]
975 pub async fn drop_database(&self, database: &str, sync: bool, qid: Option<Qid>) -> Result<()> {
976 let database = database.to_lowercase();
977 if &database == "default" {
978 warn!("Exiting, cannot drop `default` database");
979 return Ok(());
980 }
981
982 // TODO: Should this check remain? Or should the query writing be modified in the case
983 // of issuing DDL statements while connected to a non-default database
984 let current_database = self.connection.database();
985 if current_database != "default"
986 && !current_database.is_empty()
987 && current_database != database
988 {
989 error!("Cannot drop database {database} while connected to {current_database}");
990 return Err(Error::InsufficientDDLScope(current_database.into()));
991 }
992
993 let stmt = drop_db_statement(&database, sync)?;
994 self.execute(stmt, qid).await?;
995 Ok(())
996 }
997}
998
999impl<T: ClientFormat> Client<T> {
1000 /// Get a reference to the underlying connection.
1001 ///
1002 /// TODO: Support reconnect.
1003 #[expect(clippy::unused_async)]
1004 async fn conn(&self) -> Result<&connection::Connection<T>> {
1005 // TODO: Add reconnection logic here if configured
1006 Ok(self.connection.as_ref())
1007 }
1008
1009 /// # Feature
1010 /// Requires the `cloud` feature to be enabled.
1011 #[cfg(feature = "cloud")]
1012 #[instrument(level = "trace", name = "clickhouse.cloud.ping")]
1013 async fn ping_cloud(
1014 domain: &str,
1015 timeout: Option<u64>,
1016 track: Option<&std::sync::atomic::AtomicBool>,
1017 ) {
1018 debug!("pinging cloud instance");
1019 if !domain.is_empty() {
1020 debug!(domain, "cloud endpoint found");
1021 // Create receiver channel to cancel ping if dropped
1022 let (_tx, rx) = oneshot::channel::<()>();
1023 cloud::ping_cloud(domain.to_string(), timeout, track, rx).await;
1024 }
1025 }
1026
1027 // Helper function to convert a receiver of data into a `ClickHouseResponse`
1028 fn insert_response(
1029 &self,
1030 rx: mpsc::Receiver<Result<T::Data>>,
1031 qid: Qid,
1032 ) -> ClickHouseResponse<()> {
1033 ClickHouseResponse::<()>::from_stream(handle_insert_response::<T>(rx, qid, self.client_id))
1034 }
1035}
1036
1037impl Client<NativeFormat> {
1038 /// Inserts rows into `ClickHouse` using the native protocol.
1039 ///
1040 /// This method sends an insert query with a collection of rows, where each row is
1041 /// a type `T` implementing [`Row`]. The rows are converted into a `ClickHouse`
1042 /// [`Block`] and sent over the native protocol. The query is executed asynchronously,
1043 /// and any response data, progress events, or errors are streamed back.
1044 ///
1045 /// Progress and profile events are dispatched to the client's event channel (see
1046 /// [`Client::subscribe_events`]). The returned [`ClickHouseResponse`] yields `()`
1047 /// on success or an error if the insert fails.
1048 ///
1049 /// # Parameters
1050 /// - `query`: The insert query (e.g., `"INSERT INTO my_table VALUES"`).
1051 /// - `blocks`: An iterator of rows to insert, where each row implements [`Row`].
1052 /// - `qid`: Optional query ID for tracking and debugging.
1053 ///
1054 /// # Returns
1055 /// A [`Result`] containing a [`ClickHouseResponse<()>`] that streams the operation's
1056 /// outcome.
1057 ///
1058 /// # Errors
1059 /// - Fails if the query is malformed or the row data is invalid.
1060 /// - Fails if the connection to `ClickHouse` is interrupted.
1061 /// - Fails if `ClickHouse` returns an exception (e.g., schema mismatch).
1062 ///
1063 /// # Examples
1064 /// ```rust,ignore
1065 /// use clickhouse_arrow::prelude::*;
1066 ///
1067 /// let client = Client::builder()
1068 /// .with_endpoint("localhost:9000")
1069 /// .build_native()
1070 /// .await
1071 /// .unwrap();
1072 ///
1073 /// // Assume `MyRow` implements `Row`
1074 /// let rows = vec![MyRow { /* ... */ }, MyRow { /* ... */ }];
1075 /// let response = client.insert_rows("INSERT INTO my_table VALUES", rows.into_iter(), None)
1076 /// .await
1077 /// .unwrap();
1078 /// while let Some(result) = response.next().await {
1079 /// result.unwrap(); // Check for errors
1080 /// }
1081 /// ```
1082 #[instrument(
1083 name = "clickhouse.insert_rows",
1084 fields(
1085 db.system = "clickhouse",
1086 db.operation = "insert",
1087 db.format = NativeFormat::FORMAT,
1088 clickhouse.client.id = self.client_id,
1089 clickhouse.query.id
1090 ),
1091 skip_all
1092 )]
1093 pub async fn insert_rows<T: Row + Send + 'static>(
1094 &self,
1095 query: impl Into<ParsedQuery>,
1096 blocks: impl Iterator<Item = T> + Send + Sync + 'static,
1097 qid: Option<Qid>,
1098 ) -> Result<ClickHouseResponse<()>> {
1099 let cid = self.client_id;
1100 let (query, qid) = record_query(qid, query.into(), cid);
1101
1102 // Create metadata channel
1103 let (tx, rx) = oneshot::channel();
1104 let (header_tx, header_rx) = oneshot::channel();
1105
1106 let connection = self.conn().await?;
1107
1108 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1109 let conn_idx = connection
1110 .send_operation(
1111 Operation::Query {
1112 query,
1113 settings: self.settings.clone(),
1114 params: None,
1115 response: tx,
1116 header: Some(header_tx),
1117 },
1118 qid,
1119 false,
1120 )
1121 .await?;
1122
1123 trace!({ ATT_CID } = cid, { ATT_QID } = %qid, "sent query, awaiting response");
1124 let responses = rx
1125 .await
1126 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1127 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1128
1129 let header = header_rx
1130 .await
1131 .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1132 let data = Block::from_rows(blocks.collect(), header)?;
1133
1134 let (tx, rx) = oneshot::channel();
1135 let _ =
1136 connection.send_operation(Operation::Insert { data, response: tx }, qid, true).await?;
1137 rx.await.map_err(|_| {
1138 Error::Protocol(format!("Failed to receive response from insert {qid}"))
1139 })??;
1140
1141 // Decrement load balancer
1142 #[cfg(feature = "inner_pool")]
1143 connection.finish(conn_idx, Operation::<Block>::weight_query());
1144
1145 Ok(self.insert_response(responses, qid))
1146 }
1147
1148 /// Executes a `ClickHouse` query and streams deserialized rows.
1149 ///
1150 /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1151 /// each row is deserialized into type `T` implementing [`Row`]. Rows are grouped
1152 /// into `ClickHouse` blocks, and the stream yields rows as they are received. Use
1153 /// this method for type-safe access to query results in native format.
1154 ///
1155 /// Progress and profile events are dispatched to the client's event channel (see
1156 /// [`Client::subscribe_events`]).
1157 ///
1158 /// # Parameters
1159 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1160 /// - `qid`: Optional query ID for tracking and debugging.
1161 ///
1162 /// # Returns
1163 /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1164 /// rows of type `T`.
1165 ///
1166 /// # Errors
1167 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1168 /// - Fails if row deserialization fails (e.g., schema mismatch).
1169 /// - Fails if the connection to `ClickHouse` is interrupted.
1170 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1171 ///
1172 /// # Examples
1173 /// ```rust,ignore
1174 /// use clickhouse_arrow::prelude::*;
1175 ///
1176 /// let client = Client::builder()
1177 /// .with_endpoint("localhost:9000")
1178 /// .build_native()
1179 /// .await
1180 /// .unwrap();
1181 ///
1182 /// // Assume `MyRow` implements `Row`
1183 /// let mut response = client.query::<MyRow>("SELECT * FROM my_table", None).await.unwrap();
1184 /// while let Some(row) = response.next().await {
1185 /// let row = row.unwrap();
1186 /// println!("Row: {:?}", row);
1187 /// }
1188 /// ```
1189 #[instrument(
1190 name = "clickhouse.query",
1191 skip_all,
1192 fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1193 )]
1194 pub async fn query<T: Row + Send + 'static>(
1195 &self,
1196 query: impl Into<ParsedQuery>,
1197 qid: Option<Qid>,
1198 ) -> Result<ClickHouseResponse<T>> {
1199 self.query_params(query, None, qid).await
1200 }
1201
1202 /// Executes a `ClickHouse` query with parameters and streams deserialized rows.
1203 ///
1204 /// # Parameters
1205 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1206 /// - `params`: The query parameters to provide
1207 /// - `qid`: Optional query ID for tracking and debugging.
1208 ///
1209 /// # Returns
1210 /// A [`Result`] containing a [`ClickHouseResponse<T>`] that streams deserialized
1211 /// rows of type `T`.
1212 ///
1213 /// # Errors
1214 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1215 /// - Fails if row deserialization fails (e.g., schema mismatch).
1216 /// - Fails if the connection to `ClickHouse` is interrupted.
1217 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1218 ///
1219 /// # Examples
1220 /// ```rust,ignore
1221 /// use clickhouse_arrow::prelude::*;
1222 ///
1223 /// let client = Client::builder()
1224 /// .with_endpoint("localhost:9000")
1225 /// .build_native()
1226 /// .await
1227 /// .unwrap();
1228 ///
1229 /// // Assume `MyRow` implements `Row`
1230 /// let params = Some(vec![("name", ParamValue::from("my_table"))]);
1231 /// let query = "SELECT * FROM {name:Identifier}";
1232 /// let mut response = client.query_params::<MyRow>(query, params, None).await.unwrap();
1233 /// while let Some(row) = response.next().await {
1234 /// let row = row.unwrap();
1235 /// println!("Row: {:?}", row);
1236 /// }
1237 /// ```
1238 #[instrument(
1239 name = "clickhouse.query_params",
1240 skip_all,
1241 fields(db.system = "clickhouse", db.operation = "query", db.format = NativeFormat::FORMAT)
1242 )]
1243 pub async fn query_params<T: Row + Send + 'static>(
1244 &self,
1245 query: impl Into<ParsedQuery>,
1246 params: Option<QueryParams>,
1247 qid: Option<Qid>,
1248 ) -> Result<ClickHouseResponse<T>> {
1249 let (query, qid) = record_query(qid, query.into(), self.client_id);
1250 let raw = self.query_raw(query, params, qid).await?;
1251 Ok(ClickHouseResponse::new(Box::pin(raw.flat_map(|block| {
1252 match block {
1253 Ok(mut block) => stream::iter(
1254 block
1255 .take_iter_rows()
1256 .filter(|x| !x.is_empty())
1257 .map(T::deserialize_row)
1258 .map(|maybe| maybe.inspect_err(|error| error!(?error, "deserializing row")))
1259 .collect::<Vec<_>>(),
1260 ),
1261 Err(e) => stream::iter(vec![Err(e)]),
1262 }
1263 }))))
1264 }
1265
1266 /// Executes a `ClickHouse` query and returns the first row, discarding the rest.
1267 ///
1268 /// This method sends a query to `ClickHouse` and returns the first row deserialized
1269 /// into type `T` implementing [`Row`], or `None` if the result is empty. It is
1270 /// useful for queries expected to return a single row (e.g., `SELECT COUNT(*)`).
1271 /// For streaming multiple rows, use [`Client::query`].
1272 ///
1273 /// Progress and profile events are dispatched to the client's event channel (see
1274 /// [`Client::subscribe_events`]).
1275 ///
1276 /// # Parameters
1277 /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1278 /// - `qid`: Optional query ID for tracking and debugging.
1279 ///
1280 /// # Returns
1281 /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1282 /// `None` if no rows are returned.
1283 ///
1284 /// # Errors
1285 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1286 /// - Fails if row deserialization fails (e.g., schema mismatch).
1287 /// - Fails if the connection to `ClickHouse` is interrupted.
1288 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1289 ///
1290 /// # Examples
1291 /// ```rust,ignore
1292 /// use clickhouse_arrow::prelude::*;
1293 ///
1294 /// let client = Client::builder()
1295 /// .with_endpoint("localhost:9000")
1296 /// .build_native()
1297 /// .await
1298 /// .unwrap();
1299 ///
1300 /// // Assume `MyRow` implements `Row`
1301 /// let row = client.query_one::<MyRow>("SELECT name FROM users WHERE id = 1", None)
1302 /// .await
1303 /// .unwrap();
1304 /// if let Some(row) = row {
1305 /// println!("Found row: {:?}", row);
1306 /// }
1307 /// ```
1308 #[instrument(
1309 name = "clickhouse.query_one",
1310 skip_all,
1311 fields(
1312 db.system = "clickhouse",
1313 db.operation = "query",
1314 db.format = NativeFormat::FORMAT,
1315 clickhouse.client.id = self.client_id,
1316 clickhouse.query.id
1317 )
1318 )]
1319 pub async fn query_one<T: Row + Send + 'static>(
1320 &self,
1321 query: impl Into<ParsedQuery>,
1322 qid: Option<Qid>,
1323 ) -> Result<Option<T>> {
1324 self.query_one_params(query, None, qid).await
1325 }
1326
1327 /// Executes a `ClickHouse` query with parameters and returns the first row, discarding the
1328 /// rest.
1329 ///
1330 /// # Parameters
1331 /// - `query`: The SQL query to execute (e.g., `"SELECT name FROM users WHERE id = 1"`).
1332 /// - `params`: The query parameters to provide
1333 /// - `qid`: Optional query ID for tracking and debugging.
1334 ///
1335 /// # Returns
1336 /// A [`Result`] containing an `Option<T>`, where `T` is the deserialized row, or
1337 /// `None` if no rows are returned.
1338 ///
1339 /// # Errors
1340 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1341 /// - Fails if row deserialization fails (e.g., schema mismatch).
1342 /// - Fails if the connection to `ClickHouse` is interrupted.
1343 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1344 ///
1345 /// # Examples
1346 /// ```rust,ignore
1347 /// use clickhouse_arrow::prelude::*;
1348 ///
1349 /// let client = Client::builder()
1350 /// .with_endpoint("localhost:9000")
1351 /// .build_native()
1352 /// .await
1353 /// .unwrap();
1354 ///
1355 /// // Assume `MyRow` implements `Row`
1356 /// let params = Some(vec![
1357 /// ("str", ParamValue::from("name")),
1358 /// ].into());
1359 /// let query = "SELECT {str:String} FROM users WHERE id = 1";
1360 /// let row = client.query_one_params::<MyRow>(query, params, None)
1361 /// .await
1362 /// .unwrap();
1363 /// if let Some(row) = row {
1364 /// println!("Found row: {:?}", row);
1365 /// }
1366 /// ```
1367 #[instrument(
1368 name = "clickhouse.query_one_params",
1369 skip_all,
1370 fields(
1371 db.system = "clickhouse",
1372 db.operation = "query",
1373 db.format = NativeFormat::FORMAT,
1374 clickhouse.client.id = self.client_id,
1375 clickhouse.query.id
1376 )
1377 )]
1378 pub async fn query_one_params<T: Row + Send + 'static>(
1379 &self,
1380 query: impl Into<ParsedQuery>,
1381 params: Option<QueryParams>,
1382 qid: Option<Qid>,
1383 ) -> Result<Option<T>> {
1384 let mut stream = self.query_params::<T>(query, params, qid).await?;
1385 stream.next().await.transpose()
1386 }
1387
1388 /// Creates a `ClickHouse` table from a Rust struct that implements the `Row` trait.
1389 ///
1390 /// This method generates and executes a `CREATE TABLE` DDL statement based on the
1391 /// structure of the provided `Row` type. The table schema is automatically derived
1392 /// from the struct fields and their types.
1393 ///
1394 /// # Arguments
1395 /// * `database` - Optional database name. If None, uses the client's default database
1396 /// * `table` - The name of the table to create
1397 /// * `options` - Table creation options including engine type, order by, and partition by
1398 /// * `query_id` - Optional query ID for tracking and debugging
1399 ///
1400 /// # Type Parameters
1401 /// * `T` - A type that implements the `Row` trait, typically a struct with the `#[derive(Row)]`
1402 /// macro
1403 ///
1404 /// # Example
1405 /// ```ignore
1406 /// #[derive(Row)]
1407 /// struct User {
1408 /// id: u32,
1409 /// name: String,
1410 /// created_at: DateTime,
1411 /// }
1412 ///
1413 /// let options = CreateOptions::new("MergeTree")
1414 /// .with_order_by(&["id"]);
1415 ///
1416 /// client.create_table::<User>(Some("analytics"), "users", &options, None).await?;
1417 /// ```
1418 ///
1419 /// # Errors
1420 /// - Returns an error if the table creation fails
1421 /// - Returns an error if the database/table names are invalid
1422 /// - Returns an error if the connection is lost
1423 #[instrument(
1424 name = "clickhouse.create_table",
1425 skip_all
1426 fields(
1427 db.system = "clickhouse",
1428 db.operation = "create.table",
1429 db.format = NativeFormat::FORMAT,
1430 )
1431 )]
1432 pub async fn create_table<T: Row>(
1433 &self,
1434 database: Option<&str>,
1435 table: &str,
1436 options: &CreateOptions,
1437 qid: Option<Qid>,
1438 ) -> Result<()> {
1439 let database = database.unwrap_or(self.connection.database());
1440 let stmt = create_table_statement_from_native::<T>(Some(database), table, options)?;
1441 self.execute(stmt, qid).await?;
1442 Ok(())
1443 }
1444}
1445
1446impl Client<ArrowFormat> {
1447 /// Executes a `ClickHouse` query and streams Arrow [`RecordBatch`] results.
1448 ///
1449 /// This method sends a query to `ClickHouse` and returns a stream of [`RecordBatch`]
1450 /// instances, each containing a chunk of the query results in Apache Arrow format.
1451 /// Use this method for efficient integration with Arrow-based data processing
1452 /// pipelines. For row-based access, consider [`Client::query_rows`].
1453 ///
1454 /// Progress and profile events are dispatched to the client's event channel (see
1455 /// [`Client::subscribe_events`]).
1456 ///
1457 /// # Parameters
1458 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1459 /// - `qid`: Optional query ID for tracking and debugging.
1460 ///
1461 /// # Returns
1462 /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1463 /// query results.
1464 ///
1465 /// # Errors
1466 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1467 /// - Fails if the connection to `ClickHouse` is interrupted.
1468 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1469 ///
1470 /// # Examples
1471 /// ```rust,ignore
1472 /// use clickhouse_arrow::prelude::*;
1473 ///
1474 /// let client = Client::builder()
1475 /// .with_endpoint("localhost:9000")
1476 /// .build_arrow()
1477 /// .await
1478 /// .unwrap();
1479 ///
1480 /// let mut response = client.query("SELECT * FROM my_table", None).await.unwrap();
1481 /// while let Some(batch) = response.next().await {
1482 /// let batch = batch.unwrap();
1483 /// println!("Received batch with {} rows", batch.num_rows());
1484 /// }
1485 /// ```
1486 #[instrument(
1487 skip_all,
1488 fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1489 )]
1490 pub async fn query(
1491 &self,
1492 query: impl Into<ParsedQuery>,
1493 qid: Option<Qid>,
1494 ) -> Result<ClickHouseResponse<RecordBatch>> {
1495 self.query_params(query, None, qid).await
1496 }
1497
1498 /// Executes a `ClickHouse` query with parameters and streams Arrow [`RecordBatch`] results.
1499 ///
1500 /// # Parameters
1501 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1502 /// - `params`: The query parameters to provide
1503 /// - `qid`: Optional query ID for tracking and debugging.
1504 ///
1505 /// # Returns
1506 /// A [`Result`] containing a [`ClickHouseResponse<RecordBatch>`] that streams
1507 /// query results.
1508 ///
1509 /// # Errors
1510 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1511 /// - Fails if the connection to `ClickHouse` is interrupted.
1512 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1513 ///
1514 /// # Examples
1515 /// ```rust,ignore
1516 /// use clickhouse_arrow::prelude::*;
1517 ///
1518 /// let client = Client::builder()
1519 /// .with_endpoint("localhost:9000")
1520 /// .build_arrow()
1521 /// .await
1522 /// .unwrap();
1523 ///
1524 /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1525 /// let query = "SELECT * FROM {name:Identifier}";
1526 /// let mut response = client.query_params(query, params, None).await.unwrap();
1527 /// while let Some(batch) = response.next().await {
1528 /// let batch = batch.unwrap();
1529 /// println!("Received batch with {} rows", batch.num_rows());
1530 /// }
1531 /// ```
1532 #[instrument(
1533 skip_all,
1534 fields(db.system = "clickhouse", db.operation = "query", clickhouse.query.id)
1535 )]
1536 pub async fn query_params(
1537 &self,
1538 query: impl Into<ParsedQuery>,
1539 params: Option<QueryParams>,
1540 qid: Option<Qid>,
1541 ) -> Result<ClickHouseResponse<RecordBatch>> {
1542 let (query, qid) = record_query(qid, query.into(), self.client_id);
1543 Ok(ClickHouseResponse::new(Box::pin(self.query_raw(query, params, qid).await?)))
1544 }
1545
1546 /// Executes a `ClickHouse` query and streams rows as column-major values.
1547 ///
1548 /// This method sends a query to `ClickHouse` and returns a stream of rows, where
1549 /// each row is represented as a `Vec<Value>` containing column values. The data is
1550 /// transposed from Arrow [`RecordBatch`] format to row-major format, making it
1551 /// convenient for row-based processing. For direct Arrow access, use
1552 /// [`Client::query`].
1553 ///
1554 /// Progress and profile events are dispatched to the client's event channel (see
1555 /// [`Client::subscribe_events`]).
1556 ///
1557 /// # Parameters
1558 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM my_table"`).
1559 /// - `qid`: Optional query ID for tracking and debugging.
1560 ///
1561 /// # Returns
1562 /// A [`Result`] containing a [`ClickHouseResponse<Vec<Value>>`] that streams rows.
1563 ///
1564 /// # Errors
1565 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1566 /// - Fails if the connection to `ClickHouse` is interrupted.
1567 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1568 ///
1569 /// # Examples
1570 /// ```rust,ignore
1571 /// use clickhouse_arrow::prelude::*;
1572 ///
1573 /// let client = Client::builder()
1574 /// .with_endpoint("localhost:9000")
1575 /// .build_arrow()
1576 /// .await
1577 /// .unwrap();
1578 ///
1579 /// let mut response = client.query_rows("SELECT * FROM my_table", None).await.unwrap();
1580 /// while let Some(row) = response.next().await {
1581 /// let row = row.unwrap();
1582 /// println!("Row values: {:?}", row);
1583 /// }
1584 /// ```
1585 #[instrument(
1586 name = "clickhouse.query_rows",
1587 fields(
1588 db.system = "clickhouse",
1589 db.operation = "query",
1590 db.format = ArrowFormat::FORMAT,
1591 clickhouse.client.id = self.client_id,
1592 clickhouse.query.id
1593 ),
1594 skip_all
1595 )]
1596 pub async fn query_rows(
1597 &self,
1598 query: impl Into<ParsedQuery>,
1599 qid: Option<Qid>,
1600 ) -> Result<ClickHouseResponse<Vec<Value>>> {
1601 let (query, qid) = record_query(qid, query.into(), self.client_id);
1602 let connection = self.conn().await?;
1603
1604 // Create metadata channel
1605 let (tx, rx) = oneshot::channel();
1606 let (header_tx, header_rx) = oneshot::channel();
1607
1608 #[cfg_attr(not(feature = "inner_pool"), expect(unused_variables))]
1609 let conn_idx = connection
1610 .send_operation(
1611 Operation::Query {
1612 query,
1613 settings: self.settings.clone(),
1614 // TODO: Add arg for params
1615 params: None,
1616 response: tx,
1617 header: Some(header_tx),
1618 },
1619 qid,
1620 true,
1621 )
1622 .await?;
1623
1624 trace!({ ATT_CID } = self.client_id, { ATT_QID } = %qid, "sent query, awaiting response");
1625 let responses = rx
1626 .await
1627 .map_err(|_| Error::Protocol(format!("Failed to receive response for query {qid}")))?
1628 .inspect_err(|error| error!(?error, { ATT_QID } = %qid, "Error receiving header"))?;
1629
1630 let header = header_rx
1631 .await
1632 .map_err(|_| Error::Protocol(format!("Failed to receive header for query {qid}")))?;
1633
1634 let response = create_response_stream::<ArrowFormat>(responses, qid, self.client_id)
1635 .map(move |batch| (header.clone(), batch))
1636 .map(|(header, batch)| {
1637 let batch = batch?;
1638 let batch_iter = batch_to_rows(&batch, Some(&header))?;
1639 Ok::<_, Error>(stream::iter(batch_iter))
1640 })
1641 .try_flatten();
1642
1643 // Decrement load balancer
1644 #[cfg(feature = "inner_pool")]
1645 connection.finish(conn_idx, Operation::<RecordBatch>::weight_insert_many());
1646
1647 Ok(ClickHouseResponse::from_stream(response))
1648 }
1649
1650 /// Executes a `ClickHouse` query and returns the first column of the first batch.
1651 ///
1652 /// This method sends a query to `ClickHouse` and returns the first column of the
1653 /// first [`RecordBatch`] as an Arrow [`ArrayRef`], or `None` if the result is empty.
1654 /// It is useful for queries that return a single column (e.g., `SELECT id FROM
1655 /// my_table`). For full batch access, use [`Client::query`].
1656 ///
1657 /// Progress and profile events are dispatched to the client's event channel (see
1658 /// [`Client::subscribe_events`]).
1659 ///
1660 /// # Parameters
1661 /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1662 /// - `qid`: Optional query ID for tracking and debugging.
1663 ///
1664 /// # Returns
1665 /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1666 /// the first batch, or `None` if no data is returned.
1667 ///
1668 /// # Errors
1669 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1670 /// - Fails if the connection to `ClickHouse` is interrupted.
1671 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1672 ///
1673 /// # Examples
1674 /// ```rust,ignore
1675 /// use clickhouse_arrow::prelude::*;
1676 ///
1677 /// let client = Client::builder()
1678 /// .with_endpoint("localhost:9000")
1679 /// .build_arrow()
1680 /// .await
1681 /// .unwrap();
1682 ///
1683 /// let column = client.query_column("SELECT id FROM my_table", None)
1684 /// .await
1685 /// .unwrap();
1686 /// if let Some(col) = column {
1687 /// println!("Column data: {:?}", col);
1688 /// }
1689 /// ```
1690 #[instrument(
1691 name = "clickhouse.query_column",
1692 skip_all,
1693 fields(
1694 db.system = "clickhouse",
1695 db.operation = "query",
1696 db.format = ArrowFormat::FORMAT,
1697 clickhouse.client.id = self.client_id,
1698 clickhouse.query.id
1699 )
1700 )]
1701 pub async fn query_column(
1702 &self,
1703 query: impl Into<ParsedQuery>,
1704 qid: Option<Qid>,
1705 ) -> Result<Option<ArrayRef>> {
1706 self.query_column_params(query, None, qid).await
1707 }
1708
1709 /// Executes a `ClickHouse` query with parameters and returns the first column of the first
1710 /// batch.
1711 ///
1712 /// # Parameters
1713 /// - `query`: The SQL query to execute (e.g., `"SELECT id FROM my_table"`).
1714 /// - `params`: The query parameters to provide
1715 /// - `qid`: Optional query ID for tracking and debugging.
1716 ///
1717 /// # Returns
1718 /// A [`Result`] containing an `Option<ArrayRef>`, representing the first column of
1719 /// the first batch, or `None` if no data is returned.
1720 ///
1721 /// # Errors
1722 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1723 /// - Fails if the connection to `ClickHouse` is interrupted.
1724 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1725 ///
1726 /// # Examples
1727 /// ```rust,ignore
1728 /// use clickhouse_arrow::prelude::*;
1729 ///
1730 /// let client = Client::builder()
1731 /// .with_endpoint("localhost:9000")
1732 /// .build_arrow()
1733 /// .await
1734 /// .unwrap();
1735 ///
1736 /// let params = Some(vec![("name", ParamValue::from("my_table"))].into());
1737 /// let query = "SELECT id FROM {name:Identifier}";
1738 /// let column = client.query_column_params("SELECT id FROM my_table", params, None)
1739 /// .await
1740 /// .unwrap();
1741 /// if let Some(col) = column {
1742 /// println!("Column data: {:?}", col);
1743 /// }
1744 /// ```
1745 #[instrument(
1746 name = "clickhouse.query_column_params",
1747 skip_all,
1748 fields(
1749 db.system = "clickhouse",
1750 db.operation = "query",
1751 db.format = ArrowFormat::FORMAT,
1752 clickhouse.client.id = self.client_id,
1753 clickhouse.query.id
1754 )
1755 )]
1756 pub async fn query_column_params(
1757 &self,
1758 query: impl Into<ParsedQuery>,
1759 params: Option<QueryParams>,
1760 qid: Option<Qid>,
1761 ) -> Result<Option<ArrayRef>> {
1762 let mut stream = self.query_params(query, params, qid).await?;
1763 let Some(batch) = stream.next().await.transpose()? else {
1764 return Ok(None);
1765 };
1766
1767 if batch.num_rows() == 0 { Ok(None) } else { Ok(Some(Arc::clone(batch.column(0)))) }
1768 }
1769
1770 /// Executes a `ClickHouse` query and returns the first row as a [`RecordBatch`].
1771 ///
1772 /// This method sends a query to `ClickHouse` and returns the first row of the first
1773 /// [`RecordBatch`], or `None` if the result is empty. The returned [`RecordBatch`]
1774 /// contains a single row. It is useful for queries expected to return a single row
1775 /// (e.g., `SELECT * FROM users WHERE id = 1`). For streaming multiple rows, use
1776 /// [`Client::query`].
1777 ///
1778 /// Progress and profile events are dispatched to the client's event channel (see
1779 /// [`Client::subscribe_events`]).
1780 ///
1781 /// # Parameters
1782 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1783 /// - `qid`: Optional query ID for tracking and debugging.
1784 ///
1785 /// # Returns
1786 /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1787 /// `None` if no rows are returned.
1788 ///
1789 /// # Errors
1790 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1791 /// - Fails if the connection to `ClickHouse` is interrupted.
1792 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1793 ///
1794 /// # Examples
1795 /// ```rust,ignore
1796 /// use clickhouse_arrow::prelude::*;
1797 ///
1798 /// let client = Client::builder()
1799 /// .with_endpoint("localhost:9000")
1800 /// .build_arrow()
1801 /// .await
1802 /// .unwrap();
1803 ///
1804 /// let batch = client.query_one("SELECT * FROM users WHERE id = 1", None)
1805 /// .await
1806 /// .unwrap();
1807 /// if let Some(row) = batch {
1808 /// println!("Row data: {:?}", row);
1809 /// }
1810 /// ```
1811 #[instrument(
1812 name = "clickhouse.query_one",
1813 skip_all
1814 fields(
1815 db.system = "clickhouse",
1816 db.operation = "query",
1817 db.format = ArrowFormat::FORMAT,
1818 clickhouse.client.id = self.client_id,
1819 clickhouse.query.id
1820 )
1821 )]
1822 pub async fn query_one(
1823 &self,
1824 query: impl Into<ParsedQuery>,
1825 qid: Option<Qid>,
1826 ) -> Result<Option<RecordBatch>> {
1827 self.query_one_params(query, None, qid).await
1828 }
1829
1830 /// Executes a `ClickHouse` query with parameters and returns the first row as a
1831 /// [`RecordBatch`].
1832 ///
1833 /// # Parameters
1834 /// - `query`: The SQL query to execute (e.g., `"SELECT * FROM users WHERE id = 1"`).
1835 /// - `params`: The query parameters to provide
1836 /// - `qid`: Optional query ID for tracking and debugging.
1837 ///
1838 /// # Returns
1839 /// A [`Result`] containing an `Option<RecordBatch>`, representing the first row, or
1840 /// `None` if no rows are returned.
1841 ///
1842 /// # Errors
1843 /// - Fails if the query is malformed or unsupported by `ClickHouse`.
1844 /// - Fails if the connection to `ClickHouse` is interrupted.
1845 /// - Fails if `ClickHouse` returns an exception (e.g., table not found).
1846 ///
1847 /// # Examples
1848 /// ```rust,ignore
1849 /// use clickhouse_arrow::prelude::*;
1850 ///
1851 /// let client = Client::builder()
1852 /// .with_endpoint("localhost:9000")
1853 /// .build_arrow()
1854 /// .await
1855 /// .unwrap();
1856 ///
1857 /// let params = Some(vec![("id", ParamValue::from(1))]);
1858 /// let batch = client.query_one_params("SELECT * FROM users WHERE id = {id:UInt64}", None)
1859 /// .await
1860 /// .unwrap();
1861 /// if let Some(row) = batch {
1862 /// println!("Row data: {:?}", row);
1863 /// }
1864 /// ```
1865 #[instrument(
1866 name = "clickhouse.query_one_params",
1867 skip_all
1868 fields(
1869 db.system = "clickhouse",
1870 db.operation = "query",
1871 db.format = ArrowFormat::FORMAT,
1872 clickhouse.client.id = self.client_id,
1873 clickhouse.query.id
1874 )
1875 )]
1876 pub async fn query_one_params(
1877 &self,
1878 query: impl Into<ParsedQuery>,
1879 params: Option<QueryParams>,
1880 qid: Option<Qid>,
1881 ) -> Result<Option<RecordBatch>> {
1882 let stream = self.query_params(query, params, qid).await?;
1883 tokio::pin!(stream);
1884
1885 let Some(batch) = stream.next().await.transpose()? else {
1886 return Ok(None);
1887 };
1888
1889 if batch.num_rows() == 0 {
1890 Ok(None)
1891 } else {
1892 Ok(Some(take_record_batch(&batch, &arrow::array::UInt32Array::from(vec![0]))?))
1893 }
1894 }
1895
1896 /// Fetches the list of database names (schemas) in `ClickHouse`.
1897 ///
1898 /// This method queries `ClickHouse` to retrieve the names of all databases
1899 /// accessible to the client. It is useful for exploring the database structure or
1900 /// validating database existence before performing operations.
1901 ///
1902 /// # Parameters
1903 /// - `qid`: Optional query ID for tracking and debugging.
1904 ///
1905 /// # Returns
1906 /// A [`Result`] containing a `Vec<String>` of database names.
1907 ///
1908 /// # Errors
1909 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
1910 /// - Fails if the connection to `ClickHouse` is interrupted.
1911 ///
1912 /// # Examples
1913 /// ```rust,ignore
1914 /// use clickhouse_arrow::prelude::*;
1915 ///
1916 /// let client = Client::builder()
1917 /// .with_endpoint("localhost:9000")
1918 /// .build_arrow()
1919 /// .await
1920 /// .unwrap();
1921 ///
1922 /// let schemas = client.fetch_schemas(None).await.unwrap();
1923 /// println!("Databases: {:?}", schemas);
1924 /// ```
1925 #[instrument(
1926 name = "clickhouse.fetch_schemas",
1927 skip_all
1928 fields(
1929 db.system = "clickhouse",
1930 db.operation = "query",
1931 db.format = ArrowFormat::FORMAT,
1932 clickhouse.client.id = self.client_id,
1933 clickhouse.query.id
1934 )
1935 )]
1936 pub async fn fetch_schemas(&self, qid: Option<Qid>) -> Result<Vec<String>> {
1937 crate::arrow::schema::fetch_databases(self, qid).await
1938 }
1939
1940 /// Fetches all tables across all databases in `ClickHouse`.
1941 ///
1942 /// This method queries `ClickHouse` to retrieve a mapping of database names to
1943 /// their table names. It is useful for discovering the full schema structure of
1944 /// the `ClickHouse` instance.
1945 ///
1946 /// # Parameters
1947 /// - `qid`: Optional query ID for tracking and debugging.
1948 ///
1949 /// # Returns
1950 /// A [`Result`] containing a `HashMap<String, Vec<String>>`, where each key is a
1951 /// database name and the value is a list of table names in that database.
1952 ///
1953 /// # Errors
1954 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
1955 /// - Fails if the connection to `ClickHouse` is interrupted.
1956 ///
1957 /// # Examples
1958 /// ```rust,ignore
1959 /// use clickhouse_arrow::prelude::*;
1960 ///
1961 /// let client = Client::builder()
1962 /// .with_endpoint("localhost:9000")
1963 /// .build_arrow()
1964 /// .await
1965 /// .unwrap();
1966 ///
1967 /// let tables = client.fetch_all_tables(None).await.unwrap();
1968 /// for (db, tables) in tables {
1969 /// println!("Database {} has tables: {:?}", db, tables);
1970 /// }
1971 /// ```
1972 #[instrument(
1973 name = "clickhouse.fetch_all_tables",
1974 skip_all
1975 fields(
1976 db.system = "clickhouse",
1977 db.operation = "query",
1978 db.format = ArrowFormat::FORMAT,
1979 clickhouse.client.id = self.client_id,
1980 clickhouse.query.id
1981 )
1982 )]
1983 pub async fn fetch_all_tables(&self, qid: Option<Qid>) -> Result<HashMap<String, Vec<String>>> {
1984 crate::arrow::schema::fetch_all_tables(self, qid).await
1985 }
1986
1987 /// Fetches the list of table names in a specific `ClickHouse` database.
1988 ///
1989 /// This method queries `ClickHouse` to retrieve the names of all tables in the
1990 /// specified database (or the client's default database if `None`). It is useful
1991 /// for exploring the schema of a specific database.
1992 ///
1993 /// # Parameters
1994 /// - `database`: Optional database name. If `None`, uses the client's default database.
1995 /// - `qid`: Optional query ID for tracking and debugging.
1996 ///
1997 /// # Returns
1998 /// A [`Result`] containing a `Vec<String>` of table names.
1999 ///
2000 /// # Errors
2001 /// - Fails if the database does not exist or is inaccessible.
2002 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2003 /// - Fails if the connection to `ClickHouse` is interrupted.
2004 ///
2005 /// # Examples
2006 /// ```rust,ignore
2007 /// use clickhouse_arrow::prelude::*;
2008 ///
2009 /// let client = Client::builder()
2010 /// .with_endpoint("localhost:9000")
2011 /// .build_arrow()
2012 /// .await
2013 /// .unwrap();
2014 ///
2015 /// let tables = client.fetch_tables(Some("my_db"), None).await.unwrap();
2016 /// println!("Tables in my_db: {:?}", tables);
2017 /// ```
2018 #[instrument(
2019 name = "clickhouse.fetch_tables",
2020 skip_all
2021 fields(
2022 db.system = "clickhouse",
2023 db.operation = "query",
2024 db.format = ArrowFormat::FORMAT,
2025 clickhouse.client.id = self.client_id,
2026 clickhouse.query.id
2027 )
2028 )]
2029 pub async fn fetch_tables(
2030 &self,
2031 database: Option<&str>,
2032 qid: Option<Qid>,
2033 ) -> Result<Vec<String>> {
2034 let database = database.unwrap_or(self.connection.database());
2035 crate::arrow::schema::fetch_tables(self, database, qid).await
2036 }
2037
2038 /// Fetches the schema of specified tables in a `ClickHouse` database.
2039 ///
2040 /// This method queries `ClickHouse` to retrieve the Arrow schemas of the specified
2041 /// tables in the given database (or the client's default database if `None`). If
2042 /// the `tables` list is empty, it fetches schemas for all tables in the database.
2043 /// The result is a mapping of table names to their corresponding Arrow [`SchemaRef`].
2044 ///
2045 /// # Parameters
2046 /// - `database`: Optional database name. If `None`, uses the client's default database.
2047 /// - `tables`: A list of table names to fetch schemas for. An empty list fetches all tables.
2048 /// - `qid`: Optional query ID for tracking and debugging.
2049 ///
2050 /// # Returns
2051 /// A [`Result`] containing a `HashMap<String, SchemaRef>`, mapping table names to
2052 /// their schemas.
2053 ///
2054 /// # Errors
2055 /// - Fails if the database or any table does not exist or is inaccessible.
2056 /// - Fails if the query execution encounters a `ClickHouse` error (e.g., permission denied).
2057 /// - Fails if the connection to `ClickHouse` is interrupted.
2058 ///
2059 /// # Examples
2060 /// ```rust,ignore
2061 /// use clickhouse_arrow::prelude::*;
2062 ///
2063 /// let client = Client::builder()
2064 /// .with_endpoint("localhost:9000")
2065 /// .build_arrow()
2066 /// .await
2067 /// .unwrap();
2068 ///
2069 /// let schemas = client.fetch_schema(Some("my_db"), &["my_table"], None)
2070 /// .await
2071 /// .unwrap();
2072 /// for (table, schema) in schemas {
2073 /// println!("Table {} schema: {:?}", table, schema);
2074 /// }
2075 /// ```
2076 #[instrument(
2077 name = "clickhouse.fetch_schema",
2078 skip_all
2079 fields(
2080 db.system = "clickhouse",
2081 db.operation = "query",
2082 db.format = ArrowFormat::FORMAT,
2083 clickhouse.client.id = self.client_id,
2084 clickhouse.query.id
2085 )
2086 )]
2087 pub async fn fetch_schema(
2088 &self,
2089 database: Option<&str>,
2090 tables: &[&str],
2091 qid: Option<Qid>,
2092 ) -> Result<HashMap<String, SchemaRef>> {
2093 let database = database.unwrap_or(self.connection.database());
2094 let options = self.connection.metadata().arrow_options;
2095 crate::arrow::schema::fetch_schema(self, database, tables, qid, options).await
2096 }
2097
2098 /// Issues a `CREATE TABLE` DDL statement for a table using Arrow schema.
2099 ///
2100 /// Creates a table in the specified database (or the client's default database if
2101 /// `None`) based on the provided Arrow [`SchemaRef`]. The `options` parameter allows
2102 /// customization of table properties, such as engine type and partitioning. This
2103 /// method is specific to [`ArrowClient`] for seamless integration with Arrow-based
2104 /// data pipelines.
2105 ///
2106 /// # Parameters
2107 /// - `database`: Optional database name. If `None`, uses the client's default database.
2108 /// - `table`: Name of the table to create.
2109 /// - `schema`: The Arrow schema defining the table's structure.
2110 /// - `options`: Configuration for table creation (e.g., engine, partitioning).
2111 /// - `qid`: Optional query ID for tracking and debugging.
2112 ///
2113 /// # Returns
2114 /// A [`Result`] indicating success or failure of the operation.
2115 ///
2116 /// # Errors
2117 /// - Fails if the provided schema is invalid or incompatible with `ClickHouse`.
2118 /// - Fails if the database does not exist or is inaccessible.
2119 /// - Fails if the query execution encounters a `ClickHouse` error.
2120 ///
2121 /// # Examples
2122 /// ```rust,ignore
2123 /// use clickhouse_arrow::prelude::*;
2124 /// use arrow::datatypes::{Schema, SchemaRef};
2125 ///
2126 /// let client = Client::builder()
2127 /// .with_endpoint("localhost:9000")
2128 /// .build_arrow()
2129 /// .await
2130 /// .unwrap();
2131 ///
2132 /// // Assume `schema` is a valid Arrow schema
2133 /// let schema: SchemaRef = Arc::new(Schema::new(vec![/* ... */]));
2134 /// let options = CreateOptions::default();
2135 /// client.create_table(Some("my_db"), "my_table", &schema, &options, None)
2136 /// .await
2137 /// .unwrap();
2138 /// ```
2139 #[instrument(
2140 name = "clickhouse.create_table",
2141 skip_all
2142 fields(
2143 db.system = "clickhouse",
2144 db.operation = "create.table",
2145 db.format = ArrowFormat::FORMAT,
2146 clickhouse.client.id = self.client_id,
2147 clickhouse.query.id
2148 )
2149 )]
2150 pub async fn create_table(
2151 &self,
2152 database: Option<&str>,
2153 table: &str,
2154 schema: &SchemaRef,
2155 options: &CreateOptions,
2156 qid: Option<Qid>,
2157 ) -> Result<()> {
2158 let database = database.unwrap_or(self.connection.database());
2159 let arrow_options = self.connection.metadata().arrow_options;
2160 let stmt = create_table_statement_from_arrow(
2161 Some(database),
2162 table,
2163 schema,
2164 options,
2165 Some(arrow_options),
2166 )?;
2167 self.execute(stmt, qid).await?;
2168 Ok(())
2169 }
2170}
2171
2172impl<T: ClientFormat> Drop for Client<T> {
2173 fn drop(&mut self) {
2174 trace!({ ATT_CID } = self.client_id, "Client dropped");
2175 }
2176}
2177
2178/// Simple helper to log query id and client id
2179fn record_query(qid: Option<Qid>, query: ParsedQuery, cid: u16) -> (String, Qid) {
2180 let qid = qid.unwrap_or_default();
2181 let _ = Span::current().record(ATT_QID, tracing::field::display(qid));
2182 let query = query.0;
2183 trace!(query, { ATT_CID } = cid, "Querying clickhouse");
2184 (query, qid)
2185}
2186
2187#[cfg(test)]
2188mod tests {
2189 use std::sync::Arc;
2190
2191 use arrow::array::Int32Array;
2192 use arrow::datatypes::{DataType, Field, Schema};
2193 use arrow::record_batch::RecordBatch;
2194
2195 use super::*;
2196
2197 #[test]
2198 fn test_record_query_function() {
2199 let query = ParsedQuery("SELECT 1".to_string());
2200 let cid = 123;
2201 let qid = Qid::new();
2202
2203 let (parsed_query, returned_qid) = record_query(Some(qid), query, cid);
2204
2205 assert_eq!(parsed_query, "SELECT 1");
2206 assert_eq!(returned_qid, qid);
2207 }
2208
2209 #[test]
2210 fn test_record_query_function_no_qid() {
2211 let query = ParsedQuery("SELECT 2".to_string());
2212 let cid = 456;
2213
2214 let (parsed_query, returned_qid) = record_query(None, query, cid);
2215
2216 assert_eq!(parsed_query, "SELECT 2");
2217 // Should generate a default Qid
2218 assert!(!returned_qid.to_string().is_empty());
2219 }
2220
2221 // Helper function to create a simple test RecordBatch
2222 fn create_test_record_batch() -> RecordBatch {
2223 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
2224 let array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
2225 RecordBatch::try_new(schema, vec![array]).unwrap()
2226 }
2227
2228 #[tokio::test]
2229 async fn test_split_record_batch_function() {
2230 // Test the split_record_batch function that insert_max_rows uses
2231 let batch = create_test_record_batch();
2232 let max_rows = 2;
2233
2234 let batches = crate::arrow::utils::split_record_batch(batch, max_rows);
2235
2236 // Should split 5 rows into 3 batches: [2, 2, 1]
2237 assert_eq!(batches.len(), 3);
2238 assert_eq!(batches[0].num_rows(), 2);
2239 assert_eq!(batches[1].num_rows(), 2);
2240 assert_eq!(batches[2].num_rows(), 1);
2241 }
2242
2243 #[test]
2244 fn test_split_record_batch_edge_cases() {
2245 let batch = create_test_record_batch();
2246
2247 // Test with max_rows equal to batch size
2248 let batches = crate::arrow::utils::split_record_batch(batch.clone(), 5);
2249 assert_eq!(batches.len(), 1);
2250 assert_eq!(batches[0].num_rows(), 5);
2251
2252 // Test with max_rows larger than batch size
2253 let batches = crate::arrow::utils::split_record_batch(batch.clone(), 10);
2254 assert_eq!(batches.len(), 1);
2255 assert_eq!(batches[0].num_rows(), 5);
2256
2257 // Test with max_rows = 1
2258 let batches = crate::arrow::utils::split_record_batch(batch, 1);
2259 assert_eq!(batches.len(), 5);
2260 for batch in batches {
2261 assert_eq!(batch.num_rows(), 1);
2262 }
2263 }
2264}