Skip to main content

clickhouse_native_client/
client.rs

1use crate::{
2    block::Block,
3    connection::{
4        Connection,
5        ConnectionOptions,
6    },
7    io::{
8        BlockReader,
9        BlockWriter,
10    },
11    protocol::{
12        ClientCode,
13        CompressionMethod,
14        ServerCode,
15    },
16    query::{
17        ClientInfo,
18        Profile,
19        Progress,
20        Query,
21        ServerInfo,
22    },
23    Error,
24    Result,
25};
26use std::time::Duration;
27use tracing::debug;
28
29#[cfg(feature = "tls")]
30use crate::ssl::SSLOptions;
31
32/// Endpoint configuration (host + port)
33#[derive(Clone, Debug, PartialEq, Eq)]
34pub struct Endpoint {
35    /// Server host
36    pub host: String,
37    /// Server port
38    pub port: u16,
39}
40
41impl Endpoint {
42    /// Create a new endpoint
43    pub fn new(host: impl Into<String>, port: u16) -> Self {
44        Self { host: host.into(), port }
45    }
46}
47
48/// Client options
49#[derive(Clone, Debug)]
50pub struct ClientOptions {
51    /// Server host (used if endpoints is empty)
52    pub host: String,
53    /// Server port (used if endpoints is empty)
54    pub port: u16,
55    /// Multiple endpoints for failover (if empty, uses host+port)
56    pub endpoints: Vec<Endpoint>,
57    /// Database name
58    pub database: String,
59    /// Username
60    pub user: String,
61    /// Password
62    pub password: String,
63    /// Compression method
64    pub compression: Option<CompressionMethod>,
65    /// Maximum compression chunk size (default: 65535)
66    pub max_compression_chunk_size: usize,
67    /// Client information
68    pub client_info: ClientInfo,
69    /// Connection timeout and TCP options
70    pub connection_options: ConnectionOptions,
71    /// SSL/TLS options (requires 'tls' feature)
72    #[cfg(feature = "tls")]
73    pub ssl_options: Option<SSLOptions>,
74    /// Number of send retries (default: 1, no retry)
75    pub send_retries: u32,
76    /// Timeout between retry attempts (default: 5 seconds)
77    pub retry_timeout: Duration,
78    /// Send ping before each query (default: false)
79    pub ping_before_query: bool,
80    /// Rethrow server exceptions (default: true)
81    pub rethrow_exceptions: bool,
82}
83
84impl Default for ClientOptions {
85    fn default() -> Self {
86        Self {
87            host: "localhost".to_string(),
88            port: 9000,
89            endpoints: Vec::new(),
90            database: "default".to_string(),
91            user: "default".to_string(),
92            password: String::new(),
93            compression: Some(CompressionMethod::Lz4),
94            max_compression_chunk_size: 65535,
95            client_info: ClientInfo::default(),
96            connection_options: ConnectionOptions::default(),
97            #[cfg(feature = "tls")]
98            ssl_options: None,
99            send_retries: 1,
100            retry_timeout: Duration::from_secs(5),
101            ping_before_query: false,
102            rethrow_exceptions: true,
103        }
104    }
105}
106
107impl ClientOptions {
108    /// Create new client options with host and port
109    pub fn new(host: impl Into<String>, port: u16) -> Self {
110        Self { host: host.into(), port, ..Default::default() }
111    }
112
113    /// Set multiple endpoints for failover
114    pub fn endpoints(mut self, endpoints: Vec<Endpoint>) -> Self {
115        self.endpoints = endpoints;
116        self
117    }
118
119    /// Add an endpoint for failover
120    pub fn add_endpoint(mut self, host: impl Into<String>, port: u16) -> Self {
121        self.endpoints.push(Endpoint::new(host, port));
122        self
123    }
124
125    /// Set the database
126    pub fn database(mut self, database: impl Into<String>) -> Self {
127        self.database = database.into();
128        self
129    }
130
131    /// Set the username
132    pub fn user(mut self, user: impl Into<String>) -> Self {
133        self.user = user.into();
134        self
135    }
136
137    /// Set the password
138    pub fn password(mut self, password: impl Into<String>) -> Self {
139        self.password = password.into();
140        self
141    }
142
143    /// Set compression method
144    pub fn compression(mut self, method: Option<CompressionMethod>) -> Self {
145        self.compression = method;
146        self
147    }
148
149    /// Set maximum compression chunk size
150    pub fn max_compression_chunk_size(mut self, size: usize) -> Self {
151        self.max_compression_chunk_size = size;
152        self
153    }
154
155    /// Set connection options (timeouts, TCP settings)
156    pub fn connection_options(mut self, options: ConnectionOptions) -> Self {
157        self.connection_options = options;
158        self
159    }
160
161    /// Set number of send retries
162    pub fn send_retries(mut self, retries: u32) -> Self {
163        self.send_retries = retries;
164        self
165    }
166
167    /// Set retry timeout
168    pub fn retry_timeout(mut self, timeout: Duration) -> Self {
169        self.retry_timeout = timeout;
170        self
171    }
172
173    /// Enable/disable ping before query
174    pub fn ping_before_query(mut self, enabled: bool) -> Self {
175        self.ping_before_query = enabled;
176        self
177    }
178
179    /// Enable/disable exception rethrowing
180    pub fn rethrow_exceptions(mut self, enabled: bool) -> Self {
181        self.rethrow_exceptions = enabled;
182        self
183    }
184
185    /// Set SSL/TLS options (requires 'tls' feature)
186    #[cfg(feature = "tls")]
187    pub fn ssl_options(mut self, options: SSLOptions) -> Self {
188        self.ssl_options = Some(options);
189        self
190    }
191
192    /// Get all endpoints (including host+port if endpoints is empty)
193    pub(crate) fn get_endpoints(&self) -> Vec<Endpoint> {
194        if self.endpoints.is_empty() {
195            vec![Endpoint::new(&self.host, self.port)]
196        } else {
197            self.endpoints.clone()
198        }
199    }
200}
201
202/// Async ClickHouse client using the native TCP protocol.
203///
204/// Create a client by calling [`Client::connect`] with [`ClientOptions`].
205/// The client holds a single TCP connection and is not `Clone`; for
206/// concurrent access, create multiple client instances.
207pub struct Client {
208    conn: Connection,
209    server_info: ServerInfo,
210    block_reader: BlockReader,
211    block_writer: BlockWriter,
212    options: ClientOptions,
213}
214
215impl Client {
216    /// Connect to ClickHouse server with retry and endpoint failover
217    pub async fn connect(options: ClientOptions) -> Result<Self> {
218        let endpoints = options.get_endpoints();
219        let mut last_error = None;
220
221        // Try each endpoint with retries
222        for endpoint in &endpoints {
223            for attempt in 0..options.send_retries {
224                match Self::try_connect(
225                    &endpoint.host,
226                    endpoint.port,
227                    &options,
228                )
229                .await
230                {
231                    Ok(client) => return Ok(client),
232                    Err(e) => {
233                        last_error = Some(e);
234
235                        // Wait before retry (except for last attempt)
236                        if attempt + 1 < options.send_retries {
237                            tokio::time::sleep(options.retry_timeout).await;
238                        }
239                    }
240                }
241            }
242        }
243
244        // All endpoints and retries failed
245        Err(last_error.unwrap_or_else(|| {
246            Error::Connection("No endpoints available".to_string())
247        }))
248    }
249
250    /// Try to connect to a specific endpoint
251    async fn try_connect(
252        host: &str,
253        port: u16,
254        options: &ClientOptions,
255    ) -> Result<Self> {
256        // Connect with or without TLS based on options
257        let mut conn = {
258            #[cfg(feature = "tls")]
259            {
260                if let Some(ref ssl_opts) = options.ssl_options {
261                    // Build SSL client config
262                    let ssl_config = ssl_opts.build_client_config()?;
263
264                    // Use server name from SSL options if provided, otherwise
265                    // use host
266                    let server_name = ssl_opts
267                        .server_name
268                        .as_deref()
269                        .or(if ssl_opts.use_sni { Some(host) } else { None });
270
271                    Connection::connect_with_tls(
272                        host,
273                        port,
274                        &options.connection_options,
275                        ssl_config,
276                        server_name,
277                    )
278                    .await?
279                } else {
280                    Connection::connect_with_options(
281                        host,
282                        port,
283                        &options.connection_options,
284                    )
285                    .await?
286                }
287            }
288            #[cfg(not(feature = "tls"))]
289            {
290                Connection::connect_with_options(
291                    host,
292                    port,
293                    &options.connection_options,
294                )
295                .await?
296            }
297        };
298
299        // Send hello
300        Self::send_hello(&mut conn, options).await?;
301
302        // Receive hello
303        let server_info = Self::receive_hello(&mut conn).await?;
304
305        // Send addendum (quota key) if server supports it
306        // DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM = 54458
307        if server_info.revision >= 54458 {
308            debug!("Sending quota key addendum (empty string)...");
309            conn.write_string("").await?;
310            conn.flush().await?;
311            debug!("Addendum sent");
312        }
313
314        // Create block reader/writer with compression
315        let mut block_reader = BlockReader::new(server_info.revision);
316        let mut block_writer = BlockWriter::new(server_info.revision);
317
318        // Enable compression on both reader and writer
319        if let Some(compression) = options.compression {
320            block_reader = block_reader.with_compression(compression);
321            block_writer = block_writer.with_compression(compression);
322        }
323
324        Ok(Self {
325            conn,
326            server_info,
327            block_reader,
328            block_writer,
329            options: options.clone(),
330        })
331    }
332
333    /// Send hello packet
334    async fn send_hello(
335        conn: &mut Connection,
336        options: &ClientOptions,
337    ) -> Result<()> {
338        debug!("Sending client hello...");
339        // Write client hello code
340        conn.write_varint(ClientCode::Hello as u64).await?;
341        debug!("Sent hello code");
342
343        // Write client name and version
344        conn.write_string(&options.client_info.client_name).await?;
345        debug!("Sent client name: {}", options.client_info.client_name);
346        conn.write_varint(options.client_info.client_version_major).await?;
347        conn.write_varint(options.client_info.client_version_minor).await?;
348        conn.write_varint(options.client_info.client_revision).await?;
349        debug!(
350            "Sent version: {}.{}.{}",
351            options.client_info.client_version_major,
352            options.client_info.client_version_minor,
353            options.client_info.client_revision
354        );
355
356        // Write database, user, password
357        conn.write_string(&options.database).await?;
358        conn.write_string(&options.user).await?;
359        conn.write_string(&options.password).await?;
360        debug!("Sent credentials");
361
362        conn.flush().await?;
363        debug!("Flushed");
364        Ok(())
365    }
366
367    /// Receive hello packet from server
368    async fn receive_hello(conn: &mut Connection) -> Result<ServerInfo> {
369        debug!("Reading server hello...");
370        let packet_type = conn.read_varint().await?;
371        debug!("Got packet type: {}", packet_type);
372
373        if packet_type != ServerCode::Hello as u64 {
374            if packet_type == ServerCode::Exception as u64 {
375                debug!("Server sent exception during handshake!");
376                let exception = Self::read_exception_from_conn(conn).await?;
377                debug!(
378                    "Exception: code={}, name={}, msg={}",
379                    exception.code, exception.name, exception.display_text
380                );
381                return Err(Error::Protocol(format!(
382                    "ClickHouse exception during handshake: {} (code {}): {}",
383                    exception.name, exception.code, exception.display_text
384                )));
385            }
386            debug!("Unexpected packet type: {}", packet_type);
387            return Err(Error::Protocol(format!(
388                "Expected Hello packet, got {}",
389                packet_type
390            )));
391        }
392
393        // Read server info
394        debug!("Reading server info...");
395        let name = conn.read_string().await?;
396        debug!("Server name: {}", name);
397        let version_major = conn.read_varint().await?;
398        let version_minor = conn.read_varint().await?;
399        let revision = conn.read_varint().await?;
400        debug!(
401            "Server version: {}.{}, revision: {}",
402            version_major, version_minor, revision
403        );
404
405        let timezone = if revision >= 54058 {
406            debug!("Reading timezone...");
407            conn.read_string().await?
408        } else {
409            String::new()
410        };
411
412        let display_name = if revision >= 54372 {
413            debug!("Reading display name...");
414            conn.read_string().await?
415        } else {
416            String::new()
417        };
418
419        let version_patch = if revision >= 54401 {
420            debug!("Reading version patch...");
421            conn.read_varint().await?
422        } else {
423            0
424        };
425
426        debug!("Server hello complete!");
427        Ok(ServerInfo {
428            name,
429            version_major,
430            version_minor,
431            version_patch,
432            revision,
433            timezone,
434            display_name,
435        })
436    }
437
438    /// Execute a DDL/DML query without returning data
439    ///
440    /// Use this for queries that don't return result sets:
441    /// - CREATE/DROP TABLE, DATABASE
442    /// - ALTER TABLE
443    /// - TRUNCATE
444    /// - Other DDL/DML operations
445    ///
446    /// For SELECT queries, use `query()` instead.
447    /// For query tracing, use `execute_with_id()`.
448    ///
449    /// # Example
450    /// ```no_run
451    /// # use clickhouse_native_client::{Client, ClientOptions};
452    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
453    /// # let mut client = Client::connect(ClientOptions::default()).await?;
454    /// client.execute("CREATE TABLE test (id UInt32) ENGINE = Memory").await?;
455    /// client.execute("DROP TABLE test").await?;
456    /// # Ok(())
457    /// # }
458    /// ```
459    pub async fn execute(&mut self, query: impl Into<Query>) -> Result<()> {
460        self.execute_with_id(query, "").await
461    }
462
463    /// Execute a DDL/DML query with a specific query ID
464    ///
465    /// The query ID is useful for query tracing and debugging.
466    ///
467    /// # Example
468    /// ```no_run
469    /// # use clickhouse_native_client::{Client, ClientOptions};
470    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
471    /// # let mut client = Client::connect(ClientOptions::default()).await?;
472    /// client.execute_with_id("CREATE TABLE test (id UInt32) ENGINE = Memory", "create-123").await?;
473    /// # Ok(())
474    /// # }
475    /// ```
476    pub async fn execute_with_id(
477        &mut self,
478        query: impl Into<Query>,
479        query_id: &str,
480    ) -> Result<()> {
481        let mut query = query.into();
482        if !query_id.is_empty() {
483            query = Query::new(query.text()).with_query_id(query_id);
484        }
485        self.send_query(&query).await?;
486
487        // Read responses until EndOfStream, but don't collect blocks
488        loop {
489            let packet_type = self.conn.read_varint().await?;
490
491            match packet_type {
492                code if code == ServerCode::Data as u64 => {
493                    // Skip data blocks (shouldn't happen for DDL, but handle
494                    // gracefully)
495                    if self.server_info.revision >= 50264 {
496                        let _temp_table = self.conn.read_string().await?;
497                    }
498                    let _block =
499                        self.block_reader.read_block(&mut self.conn).await?;
500                }
501                code if code == ServerCode::Progress as u64 => {
502                    let progress = self.read_progress().await?;
503
504                    // Invoke progress callback if present
505                    if let Some(callback) = query.get_on_progress() {
506                        callback(&progress);
507                    }
508                }
509                code if code == ServerCode::EndOfStream as u64 => {
510                    break;
511                }
512                code if code == ServerCode::Exception as u64 => {
513                    let exception = self.read_exception().await?;
514
515                    // Invoke exception callback if present
516                    if let Some(callback) = query.get_on_exception() {
517                        callback(&exception);
518                    }
519
520                    return Err(Error::Protocol(format!(
521                        "ClickHouse exception: {} (code {}): {}",
522                        exception.name, exception.code, exception.display_text
523                    )));
524                }
525                code if code == ServerCode::ProfileInfo as u64 => {
526                    // Read profile info
527                    let rows = self.conn.read_varint().await?;
528                    let blocks = self.conn.read_varint().await?;
529                    let bytes = self.conn.read_varint().await?;
530                    let applied_limit = self.conn.read_u8().await?;
531                    let rows_before_limit = self.conn.read_varint().await?;
532                    let calculated = self.conn.read_u8().await?;
533
534                    let profile = Profile {
535                        rows,
536                        blocks,
537                        bytes,
538                        applied_limit: applied_limit != 0,
539                        rows_before_limit,
540                        calculated_rows_before_limit: calculated != 0,
541                    };
542
543                    // Invoke profile callback if present
544                    if let Some(callback) = query.get_on_profile() {
545                        callback(&profile);
546                    }
547                }
548                code if code == ServerCode::Log as u64 => {
549                    let _log_tag = self.conn.read_string().await?;
550                    // Log blocks are sent uncompressed
551                    let uncompressed_reader =
552                        BlockReader::new(self.server_info.revision);
553                    let block =
554                        uncompressed_reader.read_block(&mut self.conn).await?;
555
556                    // Invoke server log callback if present
557                    if let Some(callback) = query.get_on_server_log() {
558                        callback(&block);
559                    }
560                }
561                code if code == ServerCode::ProfileEvents as u64 => {
562                    let _table_name = self.conn.read_string().await?;
563                    // ProfileEvents blocks are sent uncompressed
564                    let uncompressed_reader =
565                        BlockReader::new(self.server_info.revision);
566                    let block =
567                        uncompressed_reader.read_block(&mut self.conn).await?;
568
569                    // Invoke profile events callback if present
570                    if let Some(callback) = query.get_on_profile_events() {
571                        callback(&block);
572                    }
573                }
574                code if code == ServerCode::TableColumns as u64 => {
575                    let _table_name = self.conn.read_string().await?;
576                    let _columns_metadata = self.conn.read_string().await?;
577                }
578                _ => {
579                    return Err(Error::Protocol(format!(
580                        "Unexpected packet type during execute: {}",
581                        packet_type
582                    )));
583                }
584            }
585        }
586
587        Ok(())
588    }
589
590    /// Execute a query and return results
591    ///
592    /// For INSERT operations, use `insert()` instead.
593    /// For DDL/DML without results, use `execute()` instead.
594    /// For query tracing, use `query_with_id()`.
595    pub async fn query(
596        &mut self,
597        query: impl Into<Query>,
598    ) -> Result<QueryResult> {
599        self.query_with_id(query, "").await
600    }
601
602    /// Execute a query with a specific query ID and return results
603    ///
604    /// The query ID is useful for query tracing and debugging.
605    ///
606    /// # Example
607    /// ```no_run
608    /// # use clickhouse_native_client::{Client, ClientOptions};
609    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
610    /// # let mut client = Client::connect(ClientOptions::default()).await?;
611    /// let result = client.query_with_id("SELECT 1", "select-123").await?;
612    /// # Ok(())
613    /// # }
614    /// ```
615    pub async fn query_with_id(
616        &mut self,
617        query: impl Into<Query>,
618        query_id: &str,
619    ) -> Result<QueryResult> {
620        let mut query = query.into();
621        if !query_id.is_empty() {
622            query = Query::new(query.text()).with_query_id(query_id);
623        }
624
625        // Send query
626        self.send_query(&query).await?;
627
628        // Receive results
629        let mut blocks = Vec::new();
630        let mut progress_info = Progress::default();
631
632        loop {
633            let packet_type = self.conn.read_varint().await?;
634            debug!("Query response packet: {}", packet_type);
635
636            match packet_type {
637                code if code == ServerCode::Data as u64 => {
638                    debug!("Received data packet");
639                    // Skip temp table name if protocol supports it (matches
640                    // C++ ReceiveData)
641                    if self.server_info.revision >= 50264 {
642                        // DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES
643                        let _temp_table = self.conn.read_string().await?;
644                    }
645                    let block =
646                        self.block_reader.read_block(&mut self.conn).await?;
647
648                    // Invoke data callback if present
649                    if let Some(callback) = query.get_on_data_cancelable() {
650                        let should_continue = callback(&block);
651                        if !should_continue {
652                            debug!("Query cancelled by data callback");
653                            break;
654                        }
655                    } else if let Some(callback) = query.get_on_data() {
656                        callback(&block);
657                    }
658
659                    if !block.is_empty() {
660                        blocks.push(block);
661                    }
662                }
663                code if code == ServerCode::Progress as u64 => {
664                    debug!("Received progress packet");
665                    let delta = self.read_progress().await?;
666                    progress_info.rows += delta.rows;
667                    progress_info.bytes += delta.bytes;
668                    progress_info.total_rows = delta.total_rows;
669                    progress_info.written_rows += delta.written_rows;
670                    progress_info.written_bytes += delta.written_bytes;
671
672                    // Invoke progress callback if present
673                    if let Some(callback) = query.get_on_progress() {
674                        callback(&progress_info);
675                    }
676                }
677                code if code == ServerCode::EndOfStream as u64 => {
678                    debug!("Received end of stream");
679                    break;
680                }
681                code if code == ServerCode::ProfileInfo as u64 => {
682                    debug!("Received profile info packet");
683                    // Read ProfileInfo fields directly
684                    let rows = self.conn.read_varint().await?;
685                    let blocks = self.conn.read_varint().await?;
686                    let bytes = self.conn.read_varint().await?;
687                    let applied_limit = self.conn.read_u8().await? != 0;
688                    let rows_before_limit = self.conn.read_varint().await?;
689                    let calculated_rows_before_limit =
690                        self.conn.read_u8().await? != 0;
691
692                    let profile = crate::query::Profile {
693                        rows,
694                        blocks,
695                        bytes,
696                        rows_before_limit,
697                        applied_limit,
698                        calculated_rows_before_limit,
699                    };
700
701                    // Invoke profile callback if present
702                    if let Some(callback) = query.get_on_profile() {
703                        callback(&profile);
704                    }
705                }
706                code if code == ServerCode::Log as u64 => {
707                    debug!("Received log packet");
708                    // Skip string first (log tag)
709                    let _log_tag = self.conn.read_string().await?;
710                    // Read the log block (sent uncompressed)
711                    let uncompressed_reader =
712                        BlockReader::new(self.server_info.revision);
713                    let block =
714                        uncompressed_reader.read_block(&mut self.conn).await?;
715
716                    // Invoke server log callback if present
717                    if let Some(callback) = query.get_on_server_log() {
718                        callback(&block);
719                    }
720                }
721                code if code == ServerCode::ProfileEvents as u64 => {
722                    debug!("Received profile events packet");
723                    // Skip string first (matches C++ implementation)
724                    let _table_name = self.conn.read_string().await?;
725                    // Read ProfileEvents block (sent uncompressed)
726                    let uncompressed_reader =
727                        BlockReader::new(self.server_info.revision);
728                    let block =
729                        uncompressed_reader.read_block(&mut self.conn).await?;
730
731                    // Invoke profile events callback if present
732                    if let Some(callback) = query.get_on_profile_events() {
733                        callback(&block);
734                    }
735                }
736                code if code == ServerCode::TableColumns as u64 => {
737                    debug!("Received table columns packet (ignoring)");
738                    // Skip external table name
739                    let _table_name = self.conn.read_string().await?;
740                    // Skip columns metadata string
741                    let _columns_metadata = self.conn.read_string().await?;
742                }
743                code if code == ServerCode::Exception as u64 => {
744                    debug!("Server returned exception during query, reading details...");
745                    let exception = self.read_exception().await?;
746                    debug!(
747                        "Exception: code={}, name={}, msg={}",
748                        exception.code, exception.name, exception.display_text
749                    );
750
751                    // Invoke exception callback if present
752                    if let Some(callback) = query.get_on_exception() {
753                        callback(&exception);
754                    }
755
756                    return Err(Error::Protocol(format!(
757                        "ClickHouse exception: {} ({}): {}",
758                        exception.name, exception.code, exception.display_text
759                    )));
760                }
761                other => {
762                    debug!("Unexpected packet type: {}", other);
763                    return Err(Error::Protocol(format!(
764                        "Unexpected packet type: {}",
765                        other
766                    )));
767                }
768            }
769        }
770
771        Ok(QueryResult { blocks, progress: progress_info })
772    }
773
774    /// Execute a SELECT query with external tables for JOIN operations
775    ///
776    /// External tables allow passing temporary in-memory data to queries for
777    /// JOINs without creating actual tables in ClickHouse.
778    ///
779    /// # Example
780    /// ```no_run
781    /// # use clickhouse_native_client::{Client, ClientOptions, Block, ExternalTable};
782    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
783    /// # let mut client = Client::connect(ClientOptions::default()).await?;
784    /// // Create a block with temporary data
785    /// let mut block = Block::new();
786    /// // ... populate block with data ...
787    ///
788    /// // Create external table
789    /// let ext_table = ExternalTable::new("temp_table", block);
790    ///
791    /// // Use in query with JOIN
792    /// let query = "SELECT * FROM my_table JOIN temp_table ON my_table.id = temp_table.id";
793    /// let result = client.query_with_external_data(query, &[ext_table]).await?;
794    /// # Ok(())
795    /// # }
796    /// ```
797    pub async fn query_with_external_data(
798        &mut self,
799        query: impl Into<Query>,
800        external_tables: &[crate::ExternalTable],
801    ) -> Result<QueryResult> {
802        self.query_with_external_data_and_id(query, "", external_tables).await
803    }
804
805    /// Execute a SELECT query with external tables and a specific query ID
806    ///
807    /// Combines external table support with query ID tracing.
808    ///
809    /// # Example
810    /// ```no_run
811    /// # use clickhouse_native_client::{Client, ClientOptions, Block, ExternalTable};
812    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
813    /// # let mut client = Client::connect(ClientOptions::default()).await?;
814    /// # let mut block = Block::new();
815    /// let ext_table = ExternalTable::new("temp_table", block);
816    /// let result = client.query_with_external_data_and_id(
817    ///     "SELECT * FROM my_table JOIN temp_table ON my_table.id = temp_table.id",
818    ///     "query-123",
819    ///     &[ext_table]
820    /// ).await?;
821    /// # Ok(())
822    /// # }
823    /// ```
824    pub async fn query_with_external_data_and_id(
825        &mut self,
826        query: impl Into<Query>,
827        query_id: &str,
828        external_tables: &[crate::ExternalTable],
829    ) -> Result<QueryResult> {
830        let mut query = query.into();
831        if !query_id.is_empty() {
832            query = Query::new(query.text()).with_query_id(query_id);
833        }
834
835        // Send query WITHOUT finalization (we'll finalize after external
836        // tables)
837        self.send_query_internal(&query, false).await?;
838
839        // Send external tables data (before finalization)
840        self.send_external_tables(external_tables).await?;
841
842        // Now finalize the query with empty block
843        self.finalize_query().await?;
844
845        // Receive results (same as regular query)
846        let mut blocks = Vec::new();
847        let mut progress_info = Progress::default();
848
849        loop {
850            let packet_type = self.conn.read_varint().await?;
851            debug!("Query response packet: {}", packet_type);
852
853            match packet_type {
854                code if code == ServerCode::Data as u64 => {
855                    debug!("Received data packet");
856                    // Skip temp table name if protocol supports it
857                    if self.server_info.revision >= 50264 {
858                        let _temp_table = self.conn.read_string().await?;
859                    }
860                    let block =
861                        self.block_reader.read_block(&mut self.conn).await?;
862
863                    // Invoke data callback if present
864                    if let Some(callback) = query.get_on_data_cancelable() {
865                        let should_continue = callback(&block);
866                        if !should_continue {
867                            debug!("Query cancelled by data callback");
868                            break;
869                        }
870                    } else if let Some(callback) = query.get_on_data() {
871                        callback(&block);
872                    }
873
874                    if !block.is_empty() {
875                        blocks.push(block);
876                    }
877                }
878                code if code == ServerCode::Progress as u64 => {
879                    debug!("Received progress packet");
880                    let delta = self.read_progress().await?;
881                    progress_info.rows += delta.rows;
882                    progress_info.bytes += delta.bytes;
883                    progress_info.total_rows = delta.total_rows;
884                    progress_info.written_rows += delta.written_rows;
885                    progress_info.written_bytes += delta.written_bytes;
886
887                    // Invoke progress callback if present
888                    if let Some(callback) = query.get_on_progress() {
889                        callback(&progress_info);
890                    }
891                }
892                code if code == ServerCode::EndOfStream as u64 => {
893                    debug!("Received end of stream");
894                    break;
895                }
896                code if code == ServerCode::ProfileInfo as u64 => {
897                    debug!("Received profile info packet");
898                    let rows = self.conn.read_varint().await?;
899                    let blocks = self.conn.read_varint().await?;
900                    let bytes = self.conn.read_varint().await?;
901                    let applied_limit = self.conn.read_u8().await?;
902                    let rows_before_limit = self.conn.read_varint().await?;
903                    let calculated = self.conn.read_u8().await?;
904
905                    let profile = Profile {
906                        rows,
907                        blocks,
908                        bytes,
909                        applied_limit: applied_limit != 0,
910                        rows_before_limit,
911                        calculated_rows_before_limit: calculated != 0,
912                    };
913
914                    // Invoke profile callback if present
915                    if let Some(callback) = query.get_on_profile() {
916                        callback(&profile);
917                    }
918                }
919                code if code == ServerCode::Log as u64 => {
920                    debug!("Received log packet");
921                    let _log_tag = self.conn.read_string().await?;
922                    // Log blocks are sent uncompressed
923                    let uncompressed_reader =
924                        BlockReader::new(self.server_info.revision);
925                    let block =
926                        uncompressed_reader.read_block(&mut self.conn).await?;
927
928                    // Invoke server log callback if present
929                    if let Some(callback) = query.get_on_server_log() {
930                        callback(&block);
931                    }
932                }
933                code if code == ServerCode::ProfileEvents as u64 => {
934                    debug!("Received profile events packet");
935                    let _table_name = self.conn.read_string().await?;
936                    // ProfileEvents blocks are sent uncompressed
937                    let uncompressed_reader =
938                        BlockReader::new(self.server_info.revision);
939                    let block =
940                        uncompressed_reader.read_block(&mut self.conn).await?;
941
942                    // Invoke profile events callback if present
943                    if let Some(callback) = query.get_on_profile_events() {
944                        callback(&block);
945                    }
946                }
947                code if code == ServerCode::TableColumns as u64 => {
948                    debug!("Received table columns packet (ignoring)");
949                    // Skip external table name
950                    let _table_name = self.conn.read_string().await?;
951                    // Skip columns metadata string
952                    let _columns_metadata = self.conn.read_string().await?;
953                }
954                code if code == ServerCode::Exception as u64 => {
955                    let exception = self.read_exception().await?;
956                    debug!(
957                        "Received exception: {} - {}",
958                        exception.name, exception.display_text
959                    );
960
961                    // Invoke exception callback if present
962                    if let Some(callback) = query.get_on_exception() {
963                        callback(&exception);
964                    }
965
966                    return Err(Error::Protocol(format!(
967                        "ClickHouse exception: {} (code {}): {}",
968                        exception.name, exception.code, exception.display_text
969                    )));
970                }
971                other => {
972                    return Err(Error::Protocol(format!(
973                        "Unexpected packet type during query: {}",
974                        other
975                    )));
976                }
977            }
978        }
979
980        Ok(QueryResult { blocks, progress: progress_info })
981    }
982
983    /// Send a query packet (always finalized)
984    async fn send_query(&mut self, query: &Query) -> Result<()> {
985        self.send_query_internal(query, true).await
986    }
987
988    /// Send a query packet (internal with finalization control)
989    async fn send_query_internal(
990        &mut self,
991        query: &Query,
992        finalize: bool,
993    ) -> Result<()> {
994        debug!("Sending query: {}", query.text());
995        // Write query code
996        self.conn.write_varint(ClientCode::Query as u64).await?;
997
998        // Write query ID
999        self.conn.write_string(query.id()).await?;
1000        debug!("Sent query ID");
1001
1002        // Client info
1003        let revision = self.server_info.revision;
1004        if revision >= 54032 {
1005            debug!("Writing client info...");
1006            let info = &self.options.client_info;
1007
1008            // Write client info fields in the correct order
1009            self.conn.write_u8(1).await?; // query_kind = 1 (initial query)
1010            self.conn.write_string(&info.initial_user).await?;
1011            self.conn.write_string(&info.initial_query_id).await?;
1012            self.conn.write_string("127.0.0.1:0").await?; // initial_address (client address:port)
1013
1014            if revision >= 54449 {
1015                self.conn.write_i64(0).await?; // initial_query_start_time
1016            }
1017
1018            self.conn.write_u8(info.interface_type).await?; // interface type (1 = TCP)
1019            self.conn.write_string(&info.os_user).await?;
1020            self.conn.write_string(&info.client_hostname).await?;
1021            self.conn.write_string(&info.client_name).await?;
1022            self.conn.write_varint(info.client_version_major).await?;
1023            self.conn.write_varint(info.client_version_minor).await?;
1024            self.conn.write_varint(info.client_revision).await?;
1025
1026            if revision >= 54060 {
1027                self.conn.write_string(&info.quota_key).await?;
1028            }
1029            if revision >= 54448 {
1030                self.conn.write_varint(0).await?; // distributed_depth
1031            }
1032            if revision >= 54401 {
1033                self.conn.write_varint(info.client_version_patch).await?;
1034            }
1035            if revision >= 54442 {
1036                // OpenTelemetry tracing context
1037                if let Some(ctx) = query.tracing_context() {
1038                    self.conn.write_u8(1).await?; // have OpenTelemetry
1039                                                  // Write trace_id (128-bit)
1040                    self.conn.write_u128(ctx.trace_id).await?;
1041                    // Write span_id (64-bit)
1042                    self.conn.write_u64(ctx.span_id).await?;
1043                    // Write tracestate
1044                    self.conn.write_string(&ctx.tracestate).await?;
1045                    // Write trace_flags
1046                    self.conn.write_u8(ctx.trace_flags).await?;
1047                } else {
1048                    self.conn.write_u8(0).await?; // no OpenTelemetry
1049                }
1050            }
1051            if revision >= 54453 {
1052                self.conn.write_varint(0).await?; // collaborate_with_initiator
1053                self.conn.write_varint(0).await?; // count_participating_replicas
1054                self.conn.write_varint(0).await?; // number_of_current_replica
1055            }
1056
1057            debug!("Client info sent");
1058        }
1059
1060        // Settings
1061        if revision >= 54429 {
1062            debug!("Writing settings...");
1063            for (key, field) in query.settings() {
1064                self.conn.write_string(key).await?;
1065                self.conn.write_varint(field.flags).await?;
1066                self.conn.write_string(&field.value).await?;
1067            }
1068        }
1069        // Empty string to mark end of settings
1070        self.conn.write_string("").await?;
1071        debug!("Settings sent");
1072
1073        // Interserver secret (for servers >= 54441)
1074        if revision >= 54441 {
1075            self.conn.write_string("").await?; // empty interserver secret
1076        }
1077
1078        // Query stage, compression, text
1079        debug!("Writing query stage and text...");
1080        self.conn.write_varint(2).await?; // Stage = Complete
1081                                          // Enable compression if we have it configured
1082        let compression_enabled =
1083            if self.options.compression.is_some() { 1u64 } else { 0u64 };
1084        self.conn.write_varint(compression_enabled).await?;
1085        self.conn.write_string(query.text()).await?;
1086
1087        // Query parameters (for servers >= 54459)
1088        if revision >= 54459 {
1089            for (key, value) in query.parameters() {
1090                self.conn.write_string(key).await?;
1091                self.conn.write_varint(2).await?; // Custom type
1092                self.conn.write_quoted_string(value).await?;
1093            }
1094            // Empty string to mark end of parameters
1095            self.conn.write_string("").await?;
1096        }
1097
1098        // Conditionally finalize based on parameter
1099        if finalize {
1100            self.finalize_query().await?;
1101        }
1102
1103        Ok(())
1104    }
1105
1106    /// Finalize query by sending empty block marker
1107    ///
1108    /// Must be called after send_query_internal() to complete the query
1109    /// protocol. For most queries, use send_query() which handles this
1110    /// automatically. Only split for special cases like external tables.
1111    async fn finalize_query(&mut self) -> Result<()> {
1112        // Send empty block to finalize query (as per C++ client)
1113        // This block must respect the compression setting we told the server
1114        debug!("Sending empty block to finalize...");
1115        self.conn.write_varint(ClientCode::Data as u64).await?;
1116        let empty_block = Block::new();
1117        // Create writer that matches the compression setting
1118        let writer = if let Some(compression) = self.options.compression {
1119            BlockWriter::new(self.server_info.revision)
1120                .with_compression(compression)
1121        } else {
1122            BlockWriter::new(self.server_info.revision)
1123        };
1124        writer.write_block(&mut self.conn, &empty_block).await?;
1125
1126        self.conn.flush().await?;
1127        debug!("Query finalized");
1128        Ok(())
1129    }
1130
1131    /// Send external tables data
1132    ///
1133    /// External tables are sent as Data packets after the initial query
1134    /// packet. Each table is sent with its name and block data.
1135    /// Empty blocks are skipped to keep the connection in a consistent state.
1136    async fn send_external_tables(
1137        &mut self,
1138        external_tables: &[crate::ExternalTable],
1139    ) -> Result<()> {
1140        for table in external_tables {
1141            // Skip empty blocks to keep connection consistent
1142            if table.data.row_count() == 0 {
1143                continue;
1144            }
1145
1146            debug!("Sending external table: {}", table.name);
1147
1148            // Send Data packet type
1149            self.conn.write_varint(ClientCode::Data as u64).await?;
1150
1151            // Send table name (this serves as the temp table name for this
1152            // Data packet)
1153            self.conn.write_string(&table.name).await?;
1154
1155            // Send block data WITHOUT temp table name prefix (we already wrote
1156            // it above)
1157            self.block_writer
1158                .write_block_with_temp_table(
1159                    &mut self.conn,
1160                    &table.data,
1161                    false,
1162                )
1163                .await?;
1164        }
1165
1166        self.conn.flush().await?;
1167        Ok(())
1168    }
1169
1170    /// Read progress info
1171    async fn read_progress(&mut self) -> Result<Progress> {
1172        let rows = self.conn.read_varint().await?;
1173        let bytes = self.conn.read_varint().await?;
1174        let total_rows = self.conn.read_varint().await?;
1175
1176        let (written_rows, written_bytes) = if self.server_info.revision
1177            >= 54405
1178        {
1179            (self.conn.read_varint().await?, self.conn.read_varint().await?)
1180        } else {
1181            (0, 0)
1182        };
1183
1184        Ok(Progress { rows, bytes, total_rows, written_rows, written_bytes })
1185    }
1186
1187    /// Read exception from connection (static helper for use in contexts
1188    /// without self)
1189    fn read_exception_from_conn(
1190        conn: &mut Connection,
1191    ) -> std::pin::Pin<
1192        Box<
1193            dyn std::future::Future<Output = Result<crate::query::Exception>>
1194                + '_,
1195        >,
1196    > {
1197        use crate::query::Exception;
1198        Box::pin(async move {
1199            debug!("Reading exception code...");
1200            let code = conn.read_i32().await?;
1201            debug!("Exception code: {}", code);
1202            debug!("Reading exception name...");
1203            let name = conn.read_string().await?;
1204            debug!("Exception name: {}", name);
1205            debug!("Reading exception display_text...");
1206            let display_text = conn.read_string().await?;
1207            debug!("Exception display_text length: {}", display_text.len());
1208            debug!("Reading exception stack_trace...");
1209            let stack_trace = conn.read_string().await?;
1210            debug!("Exception stack_trace length: {}", stack_trace.len());
1211
1212            // Check for nested exception
1213            let has_nested = conn.read_u8().await?;
1214            let nested = if has_nested != 0 {
1215                Some(Box::new(Self::read_exception_from_conn(conn).await?))
1216            } else {
1217                None
1218            };
1219
1220            Ok(Exception { code, name, display_text, stack_trace, nested })
1221        })
1222    }
1223
1224    /// Read exception from server
1225    fn read_exception<'a>(
1226        &'a mut self,
1227    ) -> std::pin::Pin<
1228        Box<
1229            dyn std::future::Future<Output = Result<crate::query::Exception>>
1230                + 'a,
1231        >,
1232    > {
1233        Box::pin(async move {
1234            Self::read_exception_from_conn(&mut self.conn).await
1235        })
1236    }
1237
1238    /// Insert data into a table
1239    ///
1240    /// This method constructs an INSERT query from the block's column names
1241    /// and sends the data. Example: `client.insert("my_database.my_table",
1242    /// block).await?`
1243    ///
1244    /// For query tracing, use `insert_with_id()` to specify a query ID.
1245    pub async fn insert(
1246        &mut self,
1247        table_name: &str,
1248        block: Block,
1249    ) -> Result<()> {
1250        self.insert_with_id(table_name, "", block).await
1251    }
1252
1253    /// Insert data into a table with a specific query ID
1254    ///
1255    /// The query ID is useful for:
1256    /// - Query tracing and debugging
1257    /// - Correlating queries with logs
1258    /// - OpenTelemetry integration
1259    ///
1260    /// # Example
1261    /// ```no_run
1262    /// # use clickhouse_native_client::{Client, ClientOptions, Block};
1263    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1264    /// # let mut client = Client::connect(ClientOptions::default()).await?;
1265    /// # let block = Block::new();
1266    /// client.insert_with_id("my_table", "trace-id-12345", block).await?;
1267    /// # Ok(())
1268    /// # }
1269    /// ```
1270    pub async fn insert_with_id(
1271        &mut self,
1272        table_name: &str,
1273        query_id: &str,
1274        block: Block,
1275    ) -> Result<()> {
1276        // Build query with column names from block (matches C++
1277        // implementation)
1278        let col_names: Vec<String> = (0..block.column_count())
1279            .filter_map(|i| block.column_name(i))
1280            .map(|n| format!("`{}`", n.replace("`", "``"))) // Escape backticks
1281            .collect();
1282
1283        if col_names.is_empty() {
1284            return Err(Error::Protocol("Block has no columns".to_string()));
1285        }
1286
1287        let query_text = format!(
1288            "INSERT INTO {} ({}) VALUES",
1289            table_name,
1290            col_names.join(", ")
1291        );
1292
1293        debug!("Sending INSERT query: {}", query_text);
1294        let query = Query::new(query_text).with_query_id(query_id);
1295
1296        // Send query
1297        self.send_query(&query).await?;
1298
1299        // Wait for server to respond with Data packet (matches C++ Insert
1300        // flow)
1301        debug!("Waiting for server Data packet...");
1302        loop {
1303            let packet_type = self.conn.read_varint().await?;
1304            debug!("INSERT wait response packet type: {}", packet_type);
1305
1306            match packet_type {
1307                code if code == ServerCode::Data as u64 => {
1308                    debug!("Received Data packet, ready to send data");
1309                    // CRITICAL: Must consume the Data packet's payload to keep
1310                    // stream aligned! Skip temp table name
1311                    if self.server_info.revision >= 50264 {
1312                        let _temp_table = self.conn.read_string().await?;
1313                    }
1314                    // Read the block (likely empty, but must consume it)
1315                    let _block =
1316                        self.block_reader.read_block(&mut self.conn).await?;
1317                    debug!("Consumed Data packet payload, stream aligned");
1318                    break;
1319                }
1320                code if code == ServerCode::Progress as u64 => {
1321                    debug!("Received Progress packet");
1322                    let _ = self.read_progress().await?;
1323                }
1324                code if code == ServerCode::TableColumns as u64 => {
1325                    debug!("Received TableColumns packet");
1326                    // Skip external table name
1327                    let _table_name = self.conn.read_string().await?;
1328                    // Skip columns metadata string
1329                    let _columns_metadata = self.conn.read_string().await?;
1330                }
1331                code if code == ServerCode::Exception as u64 => {
1332                    debug!("Server returned exception before accepting data");
1333                    let exception = self.read_exception().await?;
1334                    return Err(Error::Protocol(format!(
1335                        "ClickHouse exception: {} (code {}): {}",
1336                        exception.name, exception.code, exception.display_text
1337                    )));
1338                }
1339                other => {
1340                    return Err(Error::Protocol(format!(
1341                        "Unexpected packet type while waiting for Data: {}",
1342                        other
1343                    )));
1344                }
1345            }
1346        }
1347
1348        // Now send our data block
1349        debug!("Sending data block with {} rows", block.row_count());
1350        self.conn.write_varint(ClientCode::Data as u64).await?;
1351        self.block_writer.write_block(&mut self.conn, &block).await?;
1352
1353        // Send empty block to signal end
1354        debug!("Sending empty block to signal end");
1355        let empty_block = Block::new();
1356        self.conn.write_varint(ClientCode::Data as u64).await?;
1357        self.block_writer.write_block(&mut self.conn, &empty_block).await?;
1358
1359        // Wait for EndOfStream (matches C++ flow)
1360        debug!("Waiting for EndOfStream...");
1361        loop {
1362            let packet_type = self.conn.read_varint().await?;
1363            debug!("INSERT final response packet type: {}", packet_type);
1364
1365            match packet_type {
1366                code if code == ServerCode::EndOfStream as u64 => {
1367                    debug!("Received EndOfStream, insert complete");
1368                    break;
1369                }
1370                code if code == ServerCode::Data as u64 => {
1371                    debug!(
1372                        "Received Data packet in INSERT response (skipping)"
1373                    );
1374                    // Skip temp table name if protocol supports it
1375                    if self.server_info.revision >= 50264 {
1376                        let _temp_table = self.conn.read_string().await?;
1377                    }
1378                    // Read and discard the block
1379                    let _block =
1380                        self.block_reader.read_block(&mut self.conn).await?;
1381                }
1382                code if code == ServerCode::Progress as u64 => {
1383                    debug!("Received Progress packet");
1384                    let _ = self.read_progress().await?;
1385                }
1386                code if code == ServerCode::ProfileEvents as u64 => {
1387                    debug!("Received ProfileEvents packet (skipping)");
1388                    let _table_name = self.conn.read_string().await?;
1389                    let uncompressed_reader =
1390                        BlockReader::new(self.server_info.revision);
1391                    let _block =
1392                        uncompressed_reader.read_block(&mut self.conn).await?;
1393                }
1394                code if code == ServerCode::TableColumns as u64 => {
1395                    debug!("Received TableColumns packet (skipping)");
1396                    let _table_name = self.conn.read_string().await?;
1397                    let _columns_metadata = self.conn.read_string().await?;
1398                }
1399                code if code == ServerCode::Exception as u64 => {
1400                    debug!("Server returned exception after sending data");
1401                    let exception = self.read_exception().await?;
1402                    return Err(Error::Protocol(format!(
1403                        "ClickHouse exception: {} (code {}): {}",
1404                        exception.name, exception.code, exception.display_text
1405                    )));
1406                }
1407                _ => {
1408                    debug!("WARNING: Ignoring unexpected packet type: {} - stream may be misaligned", packet_type);
1409                }
1410            }
1411        }
1412
1413        Ok(())
1414    }
1415
1416    /// Ping the server
1417    pub async fn ping(&mut self) -> Result<()> {
1418        debug!("Sending ping...");
1419        self.conn.write_varint(ClientCode::Ping as u64).await?;
1420        self.conn.flush().await?;
1421        debug!("Ping sent, waiting for pong...");
1422
1423        let packet_type = self.conn.read_varint().await?;
1424        debug!("Got response packet type: {}", packet_type);
1425
1426        if packet_type == ServerCode::Pong as u64 {
1427            debug!("Pong received!");
1428            Ok(())
1429        } else {
1430            debug!("Unexpected packet: {}", packet_type);
1431            Err(Error::Protocol(format!("Expected Pong, got {}", packet_type)))
1432        }
1433    }
1434
1435    /// Cancel the current query
1436    ///
1437    /// Sends a cancel packet to the server to stop any currently running
1438    /// query. Note: This is most useful when called with a cancelable
1439    /// callback, or when you need to cancel a long-running query from
1440    /// outside the query execution flow.
1441    pub async fn cancel(&mut self) -> Result<()> {
1442        debug!("Sending cancel...");
1443        self.conn.write_varint(ClientCode::Cancel as u64).await?;
1444        self.conn.flush().await?;
1445        debug!("Cancel sent");
1446        Ok(())
1447    }
1448
1449    /// Get server info
1450    ///
1451    /// Returns information about the connected ClickHouse server including
1452    /// name, version, revision, timezone, and display name.
1453    ///
1454    /// # Example
1455    /// ```no_run
1456    /// # use clickhouse_native_client::{Client, ClientOptions};
1457    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1458    /// # let client = Client::connect(ClientOptions::default()).await?;
1459    /// let info = client.server_info();
1460    /// println!("Server: {} v{}.{}.{}",
1461    ///     info.name,
1462    ///     info.version_major,
1463    ///     info.version_minor,
1464    ///     info.version_patch
1465    /// );
1466    /// # Ok(())
1467    /// # }
1468    /// ```
1469    pub fn server_info(&self) -> &ServerInfo {
1470        &self.server_info
1471    }
1472
1473    /// Get server version as a tuple (major, minor, patch)
1474    ///
1475    /// # Example
1476    /// ```no_run
1477    /// # use clickhouse_native_client::{Client, ClientOptions};
1478    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1479    /// # let client = Client::connect(ClientOptions::default()).await?;
1480    /// let (major, minor, patch) = client.server_version();
1481    /// println!("Server version: {}.{}.{}", major, minor, patch);
1482    /// # Ok(())
1483    /// # }
1484    /// ```
1485    pub fn server_version(&self) -> (u64, u64, u64) {
1486        (
1487            self.server_info.version_major,
1488            self.server_info.version_minor,
1489            self.server_info.version_patch,
1490        )
1491    }
1492
1493    /// Get server revision number
1494    ///
1495    /// The revision number is used for protocol feature negotiation.
1496    ///
1497    /// # Example
1498    /// ```no_run
1499    /// # use clickhouse_native_client::{Client, ClientOptions};
1500    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1501    /// # let client = Client::connect(ClientOptions::default()).await?;
1502    /// let revision = client.server_revision();
1503    /// println!("Server revision: {}", revision);
1504    /// # Ok(())
1505    /// # }
1506    /// ```
1507    pub fn server_revision(&self) -> u64 {
1508        self.server_info.revision
1509    }
1510}
1511
1512/// Result of a `SELECT` query, containing data blocks and progress
1513/// information.
1514pub struct QueryResult {
1515    /// Result blocks
1516    pub blocks: Vec<Block>,
1517    /// Progress information
1518    pub progress: Progress,
1519}
1520
1521impl QueryResult {
1522    /// Get all blocks
1523    pub fn blocks(&self) -> &[Block] {
1524        &self.blocks
1525    }
1526
1527    /// Get progress info
1528    pub fn progress(&self) -> &Progress {
1529        &self.progress
1530    }
1531
1532    /// Get total number of rows across all blocks
1533    pub fn total_rows(&self) -> usize {
1534        self.blocks.iter().map(|b| b.row_count()).sum()
1535    }
1536}
1537
1538#[cfg(test)]
1539#[cfg_attr(coverage_nightly, coverage(off))]
1540mod tests {
1541    use super::*;
1542
1543    #[test]
1544    fn test_client_options_default() {
1545        let opts = ClientOptions::default();
1546        assert_eq!(opts.host, "localhost");
1547        assert_eq!(opts.port, 9000);
1548        assert_eq!(opts.database, "default");
1549    }
1550
1551    #[test]
1552    fn test_client_options_builder() {
1553        let opts = ClientOptions::new("127.0.0.1", 9000)
1554            .database("test_db")
1555            .user("test_user")
1556            .password("test_pass");
1557
1558        assert_eq!(opts.host, "127.0.0.1");
1559        assert_eq!(opts.database, "test_db");
1560        assert_eq!(opts.user, "test_user");
1561        assert_eq!(opts.password, "test_pass");
1562    }
1563
1564    #[test]
1565    fn test_query_result() {
1566        let result =
1567            QueryResult { blocks: vec![], progress: Progress::default() };
1568
1569        assert_eq!(result.total_rows(), 0);
1570    }
1571}