1use crate::{
2 block::Block,
3 connection::{
4 Connection,
5 ConnectionOptions,
6 },
7 io::{
8 BlockReader,
9 BlockWriter,
10 },
11 protocol::{
12 ClientCode,
13 CompressionMethod,
14 ServerCode,
15 },
16 query::{
17 ClientInfo,
18 Profile,
19 Progress,
20 Query,
21 ServerInfo,
22 },
23 Error,
24 Result,
25};
26use std::time::Duration;
27use tracing::debug;
28
29#[cfg(feature = "tls")]
30use crate::ssl::SSLOptions;
31
32#[derive(Clone, Debug, PartialEq, Eq)]
34pub struct Endpoint {
35 pub host: String,
37 pub port: u16,
39}
40
41impl Endpoint {
42 pub fn new(host: impl Into<String>, port: u16) -> Self {
44 Self { host: host.into(), port }
45 }
46}
47
48#[derive(Clone, Debug)]
50pub struct ClientOptions {
51 pub host: String,
53 pub port: u16,
55 pub endpoints: Vec<Endpoint>,
57 pub database: String,
59 pub user: String,
61 pub password: String,
63 pub compression: Option<CompressionMethod>,
65 pub max_compression_chunk_size: usize,
67 pub client_info: ClientInfo,
69 pub connection_options: ConnectionOptions,
71 #[cfg(feature = "tls")]
73 pub ssl_options: Option<SSLOptions>,
74 pub send_retries: u32,
76 pub retry_timeout: Duration,
78 pub ping_before_query: bool,
80 pub rethrow_exceptions: bool,
82}
83
84impl Default for ClientOptions {
85 fn default() -> Self {
86 Self {
87 host: "localhost".to_string(),
88 port: 9000,
89 endpoints: Vec::new(),
90 database: "default".to_string(),
91 user: "default".to_string(),
92 password: String::new(),
93 compression: Some(CompressionMethod::Lz4),
94 max_compression_chunk_size: 65535,
95 client_info: ClientInfo::default(),
96 connection_options: ConnectionOptions::default(),
97 #[cfg(feature = "tls")]
98 ssl_options: None,
99 send_retries: 1,
100 retry_timeout: Duration::from_secs(5),
101 ping_before_query: false,
102 rethrow_exceptions: true,
103 }
104 }
105}
106
107impl ClientOptions {
108 pub fn new(host: impl Into<String>, port: u16) -> Self {
110 Self { host: host.into(), port, ..Default::default() }
111 }
112
113 pub fn endpoints(mut self, endpoints: Vec<Endpoint>) -> Self {
115 self.endpoints = endpoints;
116 self
117 }
118
119 pub fn add_endpoint(mut self, host: impl Into<String>, port: u16) -> Self {
121 self.endpoints.push(Endpoint::new(host, port));
122 self
123 }
124
125 pub fn database(mut self, database: impl Into<String>) -> Self {
127 self.database = database.into();
128 self
129 }
130
131 pub fn user(mut self, user: impl Into<String>) -> Self {
133 self.user = user.into();
134 self
135 }
136
137 pub fn password(mut self, password: impl Into<String>) -> Self {
139 self.password = password.into();
140 self
141 }
142
143 pub fn compression(mut self, method: Option<CompressionMethod>) -> Self {
145 self.compression = method;
146 self
147 }
148
149 pub fn max_compression_chunk_size(mut self, size: usize) -> Self {
151 self.max_compression_chunk_size = size;
152 self
153 }
154
155 pub fn connection_options(mut self, options: ConnectionOptions) -> Self {
157 self.connection_options = options;
158 self
159 }
160
161 pub fn send_retries(mut self, retries: u32) -> Self {
163 self.send_retries = retries;
164 self
165 }
166
167 pub fn retry_timeout(mut self, timeout: Duration) -> Self {
169 self.retry_timeout = timeout;
170 self
171 }
172
173 pub fn ping_before_query(mut self, enabled: bool) -> Self {
175 self.ping_before_query = enabled;
176 self
177 }
178
179 pub fn rethrow_exceptions(mut self, enabled: bool) -> Self {
181 self.rethrow_exceptions = enabled;
182 self
183 }
184
185 #[cfg(feature = "tls")]
187 pub fn ssl_options(mut self, options: SSLOptions) -> Self {
188 self.ssl_options = Some(options);
189 self
190 }
191
192 pub(crate) fn get_endpoints(&self) -> Vec<Endpoint> {
194 if self.endpoints.is_empty() {
195 vec![Endpoint::new(&self.host, self.port)]
196 } else {
197 self.endpoints.clone()
198 }
199 }
200}
201
202pub struct Client {
208 conn: Connection,
209 server_info: ServerInfo,
210 block_reader: BlockReader,
211 block_writer: BlockWriter,
212 options: ClientOptions,
213}
214
215impl Client {
216 pub async fn connect(options: ClientOptions) -> Result<Self> {
218 let endpoints = options.get_endpoints();
219 let mut last_error = None;
220
221 for endpoint in &endpoints {
223 for attempt in 0..options.send_retries {
224 match Self::try_connect(
225 &endpoint.host,
226 endpoint.port,
227 &options,
228 )
229 .await
230 {
231 Ok(client) => return Ok(client),
232 Err(e) => {
233 last_error = Some(e);
234
235 if attempt + 1 < options.send_retries {
237 tokio::time::sleep(options.retry_timeout).await;
238 }
239 }
240 }
241 }
242 }
243
244 Err(last_error.unwrap_or_else(|| {
246 Error::Connection("No endpoints available".to_string())
247 }))
248 }
249
250 async fn try_connect(
252 host: &str,
253 port: u16,
254 options: &ClientOptions,
255 ) -> Result<Self> {
256 let mut conn = {
258 #[cfg(feature = "tls")]
259 {
260 if let Some(ref ssl_opts) = options.ssl_options {
261 let ssl_config = ssl_opts.build_client_config()?;
263
264 let server_name = ssl_opts
267 .server_name
268 .as_deref()
269 .or(if ssl_opts.use_sni { Some(host) } else { None });
270
271 Connection::connect_with_tls(
272 host,
273 port,
274 &options.connection_options,
275 ssl_config,
276 server_name,
277 )
278 .await?
279 } else {
280 Connection::connect_with_options(
281 host,
282 port,
283 &options.connection_options,
284 )
285 .await?
286 }
287 }
288 #[cfg(not(feature = "tls"))]
289 {
290 Connection::connect_with_options(
291 host,
292 port,
293 &options.connection_options,
294 )
295 .await?
296 }
297 };
298
299 Self::send_hello(&mut conn, options).await?;
301
302 let server_info = Self::receive_hello(&mut conn).await?;
304
305 if server_info.revision >= 54458 {
308 debug!("Sending quota key addendum (empty string)...");
309 conn.write_string("").await?;
310 conn.flush().await?;
311 debug!("Addendum sent");
312 }
313
314 let mut block_reader = BlockReader::new(server_info.revision);
316 let mut block_writer = BlockWriter::new(server_info.revision);
317
318 if let Some(compression) = options.compression {
320 block_reader = block_reader.with_compression(compression);
321 block_writer = block_writer.with_compression(compression);
322 }
323
324 Ok(Self {
325 conn,
326 server_info,
327 block_reader,
328 block_writer,
329 options: options.clone(),
330 })
331 }
332
333 async fn send_hello(
335 conn: &mut Connection,
336 options: &ClientOptions,
337 ) -> Result<()> {
338 debug!("Sending client hello...");
339 conn.write_varint(ClientCode::Hello as u64).await?;
341 debug!("Sent hello code");
342
343 conn.write_string(&options.client_info.client_name).await?;
345 debug!("Sent client name: {}", options.client_info.client_name);
346 conn.write_varint(options.client_info.client_version_major).await?;
347 conn.write_varint(options.client_info.client_version_minor).await?;
348 conn.write_varint(options.client_info.client_revision).await?;
349 debug!(
350 "Sent version: {}.{}.{}",
351 options.client_info.client_version_major,
352 options.client_info.client_version_minor,
353 options.client_info.client_revision
354 );
355
356 conn.write_string(&options.database).await?;
358 conn.write_string(&options.user).await?;
359 conn.write_string(&options.password).await?;
360 debug!("Sent credentials");
361
362 conn.flush().await?;
363 debug!("Flushed");
364 Ok(())
365 }
366
367 async fn receive_hello(conn: &mut Connection) -> Result<ServerInfo> {
369 debug!("Reading server hello...");
370 let packet_type = conn.read_varint().await?;
371 debug!("Got packet type: {}", packet_type);
372
373 if packet_type != ServerCode::Hello as u64 {
374 if packet_type == ServerCode::Exception as u64 {
375 debug!("Server sent exception during handshake!");
376 let exception = Self::read_exception_from_conn(conn).await?;
377 debug!(
378 "Exception: code={}, name={}, msg={}",
379 exception.code, exception.name, exception.display_text
380 );
381 return Err(Error::Protocol(format!(
382 "ClickHouse exception during handshake: {} (code {}): {}",
383 exception.name, exception.code, exception.display_text
384 )));
385 }
386 debug!("Unexpected packet type: {}", packet_type);
387 return Err(Error::Protocol(format!(
388 "Expected Hello packet, got {}",
389 packet_type
390 )));
391 }
392
393 debug!("Reading server info...");
395 let name = conn.read_string().await?;
396 debug!("Server name: {}", name);
397 let version_major = conn.read_varint().await?;
398 let version_minor = conn.read_varint().await?;
399 let revision = conn.read_varint().await?;
400 debug!(
401 "Server version: {}.{}, revision: {}",
402 version_major, version_minor, revision
403 );
404
405 let timezone = if revision >= 54058 {
406 debug!("Reading timezone...");
407 conn.read_string().await?
408 } else {
409 String::new()
410 };
411
412 let display_name = if revision >= 54372 {
413 debug!("Reading display name...");
414 conn.read_string().await?
415 } else {
416 String::new()
417 };
418
419 let version_patch = if revision >= 54401 {
420 debug!("Reading version patch...");
421 conn.read_varint().await?
422 } else {
423 0
424 };
425
426 debug!("Server hello complete!");
427 Ok(ServerInfo {
428 name,
429 version_major,
430 version_minor,
431 version_patch,
432 revision,
433 timezone,
434 display_name,
435 })
436 }
437
438 pub async fn execute(&mut self, query: impl Into<Query>) -> Result<()> {
460 self.execute_with_id(query, "").await
461 }
462
463 pub async fn execute_with_id(
477 &mut self,
478 query: impl Into<Query>,
479 query_id: &str,
480 ) -> Result<()> {
481 let mut query = query.into();
482 if !query_id.is_empty() {
483 query = Query::new(query.text()).with_query_id(query_id);
484 }
485 self.send_query(&query).await?;
486
487 loop {
489 let packet_type = self.conn.read_varint().await?;
490
491 match packet_type {
492 code if code == ServerCode::Data as u64 => {
493 if self.server_info.revision >= 50264 {
496 let _temp_table = self.conn.read_string().await?;
497 }
498 let _block =
499 self.block_reader.read_block(&mut self.conn).await?;
500 }
501 code if code == ServerCode::Progress as u64 => {
502 let progress = self.read_progress().await?;
503
504 if let Some(callback) = query.get_on_progress() {
506 callback(&progress);
507 }
508 }
509 code if code == ServerCode::EndOfStream as u64 => {
510 break;
511 }
512 code if code == ServerCode::Exception as u64 => {
513 let exception = self.read_exception().await?;
514
515 if let Some(callback) = query.get_on_exception() {
517 callback(&exception);
518 }
519
520 return Err(Error::Protocol(format!(
521 "ClickHouse exception: {} (code {}): {}",
522 exception.name, exception.code, exception.display_text
523 )));
524 }
525 code if code == ServerCode::ProfileInfo as u64 => {
526 let rows = self.conn.read_varint().await?;
528 let blocks = self.conn.read_varint().await?;
529 let bytes = self.conn.read_varint().await?;
530 let applied_limit = self.conn.read_u8().await?;
531 let rows_before_limit = self.conn.read_varint().await?;
532 let calculated = self.conn.read_u8().await?;
533
534 let profile = Profile {
535 rows,
536 blocks,
537 bytes,
538 applied_limit: applied_limit != 0,
539 rows_before_limit,
540 calculated_rows_before_limit: calculated != 0,
541 };
542
543 if let Some(callback) = query.get_on_profile() {
545 callback(&profile);
546 }
547 }
548 code if code == ServerCode::Log as u64 => {
549 let _log_tag = self.conn.read_string().await?;
550 let uncompressed_reader =
552 BlockReader::new(self.server_info.revision);
553 let block =
554 uncompressed_reader.read_block(&mut self.conn).await?;
555
556 if let Some(callback) = query.get_on_server_log() {
558 callback(&block);
559 }
560 }
561 code if code == ServerCode::ProfileEvents as u64 => {
562 let _table_name = self.conn.read_string().await?;
563 let uncompressed_reader =
565 BlockReader::new(self.server_info.revision);
566 let block =
567 uncompressed_reader.read_block(&mut self.conn).await?;
568
569 if let Some(callback) = query.get_on_profile_events() {
571 callback(&block);
572 }
573 }
574 code if code == ServerCode::TableColumns as u64 => {
575 let _table_name = self.conn.read_string().await?;
576 let _columns_metadata = self.conn.read_string().await?;
577 }
578 _ => {
579 return Err(Error::Protocol(format!(
580 "Unexpected packet type during execute: {}",
581 packet_type
582 )));
583 }
584 }
585 }
586
587 Ok(())
588 }
589
590 pub async fn query(
596 &mut self,
597 query: impl Into<Query>,
598 ) -> Result<QueryResult> {
599 self.query_with_id(query, "").await
600 }
601
602 pub async fn query_with_id(
616 &mut self,
617 query: impl Into<Query>,
618 query_id: &str,
619 ) -> Result<QueryResult> {
620 let mut query = query.into();
621 if !query_id.is_empty() {
622 query = Query::new(query.text()).with_query_id(query_id);
623 }
624
625 self.send_query(&query).await?;
627
628 let mut blocks = Vec::new();
630 let mut progress_info = Progress::default();
631
632 loop {
633 let packet_type = self.conn.read_varint().await?;
634 debug!("Query response packet: {}", packet_type);
635
636 match packet_type {
637 code if code == ServerCode::Data as u64 => {
638 debug!("Received data packet");
639 if self.server_info.revision >= 50264 {
642 let _temp_table = self.conn.read_string().await?;
644 }
645 let block =
646 self.block_reader.read_block(&mut self.conn).await?;
647
648 if let Some(callback) = query.get_on_data_cancelable() {
650 let should_continue = callback(&block);
651 if !should_continue {
652 debug!("Query cancelled by data callback");
653 break;
654 }
655 } else if let Some(callback) = query.get_on_data() {
656 callback(&block);
657 }
658
659 if !block.is_empty() {
660 blocks.push(block);
661 }
662 }
663 code if code == ServerCode::Progress as u64 => {
664 debug!("Received progress packet");
665 let delta = self.read_progress().await?;
666 progress_info.rows += delta.rows;
667 progress_info.bytes += delta.bytes;
668 progress_info.total_rows = delta.total_rows;
669 progress_info.written_rows += delta.written_rows;
670 progress_info.written_bytes += delta.written_bytes;
671
672 if let Some(callback) = query.get_on_progress() {
674 callback(&progress_info);
675 }
676 }
677 code if code == ServerCode::EndOfStream as u64 => {
678 debug!("Received end of stream");
679 break;
680 }
681 code if code == ServerCode::ProfileInfo as u64 => {
682 debug!("Received profile info packet");
683 let rows = self.conn.read_varint().await?;
685 let blocks = self.conn.read_varint().await?;
686 let bytes = self.conn.read_varint().await?;
687 let applied_limit = self.conn.read_u8().await? != 0;
688 let rows_before_limit = self.conn.read_varint().await?;
689 let calculated_rows_before_limit =
690 self.conn.read_u8().await? != 0;
691
692 let profile = crate::query::Profile {
693 rows,
694 blocks,
695 bytes,
696 rows_before_limit,
697 applied_limit,
698 calculated_rows_before_limit,
699 };
700
701 if let Some(callback) = query.get_on_profile() {
703 callback(&profile);
704 }
705 }
706 code if code == ServerCode::Log as u64 => {
707 debug!("Received log packet");
708 let _log_tag = self.conn.read_string().await?;
710 let uncompressed_reader =
712 BlockReader::new(self.server_info.revision);
713 let block =
714 uncompressed_reader.read_block(&mut self.conn).await?;
715
716 if let Some(callback) = query.get_on_server_log() {
718 callback(&block);
719 }
720 }
721 code if code == ServerCode::ProfileEvents as u64 => {
722 debug!("Received profile events packet");
723 let _table_name = self.conn.read_string().await?;
725 let uncompressed_reader =
727 BlockReader::new(self.server_info.revision);
728 let block =
729 uncompressed_reader.read_block(&mut self.conn).await?;
730
731 if let Some(callback) = query.get_on_profile_events() {
733 callback(&block);
734 }
735 }
736 code if code == ServerCode::TableColumns as u64 => {
737 debug!("Received table columns packet (ignoring)");
738 let _table_name = self.conn.read_string().await?;
740 let _columns_metadata = self.conn.read_string().await?;
742 }
743 code if code == ServerCode::Exception as u64 => {
744 debug!("Server returned exception during query, reading details...");
745 let exception = self.read_exception().await?;
746 debug!(
747 "Exception: code={}, name={}, msg={}",
748 exception.code, exception.name, exception.display_text
749 );
750
751 if let Some(callback) = query.get_on_exception() {
753 callback(&exception);
754 }
755
756 return Err(Error::Protocol(format!(
757 "ClickHouse exception: {} ({}): {}",
758 exception.name, exception.code, exception.display_text
759 )));
760 }
761 other => {
762 debug!("Unexpected packet type: {}", other);
763 return Err(Error::Protocol(format!(
764 "Unexpected packet type: {}",
765 other
766 )));
767 }
768 }
769 }
770
771 Ok(QueryResult { blocks, progress: progress_info })
772 }
773
774 pub async fn query_with_external_data(
798 &mut self,
799 query: impl Into<Query>,
800 external_tables: &[crate::ExternalTable],
801 ) -> Result<QueryResult> {
802 self.query_with_external_data_and_id(query, "", external_tables).await
803 }
804
805 pub async fn query_with_external_data_and_id(
825 &mut self,
826 query: impl Into<Query>,
827 query_id: &str,
828 external_tables: &[crate::ExternalTable],
829 ) -> Result<QueryResult> {
830 let mut query = query.into();
831 if !query_id.is_empty() {
832 query = Query::new(query.text()).with_query_id(query_id);
833 }
834
835 self.send_query_internal(&query, false).await?;
838
839 self.send_external_tables(external_tables).await?;
841
842 self.finalize_query().await?;
844
845 let mut blocks = Vec::new();
847 let mut progress_info = Progress::default();
848
849 loop {
850 let packet_type = self.conn.read_varint().await?;
851 debug!("Query response packet: {}", packet_type);
852
853 match packet_type {
854 code if code == ServerCode::Data as u64 => {
855 debug!("Received data packet");
856 if self.server_info.revision >= 50264 {
858 let _temp_table = self.conn.read_string().await?;
859 }
860 let block =
861 self.block_reader.read_block(&mut self.conn).await?;
862
863 if let Some(callback) = query.get_on_data_cancelable() {
865 let should_continue = callback(&block);
866 if !should_continue {
867 debug!("Query cancelled by data callback");
868 break;
869 }
870 } else if let Some(callback) = query.get_on_data() {
871 callback(&block);
872 }
873
874 if !block.is_empty() {
875 blocks.push(block);
876 }
877 }
878 code if code == ServerCode::Progress as u64 => {
879 debug!("Received progress packet");
880 let delta = self.read_progress().await?;
881 progress_info.rows += delta.rows;
882 progress_info.bytes += delta.bytes;
883 progress_info.total_rows = delta.total_rows;
884 progress_info.written_rows += delta.written_rows;
885 progress_info.written_bytes += delta.written_bytes;
886
887 if let Some(callback) = query.get_on_progress() {
889 callback(&progress_info);
890 }
891 }
892 code if code == ServerCode::EndOfStream as u64 => {
893 debug!("Received end of stream");
894 break;
895 }
896 code if code == ServerCode::ProfileInfo as u64 => {
897 debug!("Received profile info packet");
898 let rows = self.conn.read_varint().await?;
899 let blocks = self.conn.read_varint().await?;
900 let bytes = self.conn.read_varint().await?;
901 let applied_limit = self.conn.read_u8().await?;
902 let rows_before_limit = self.conn.read_varint().await?;
903 let calculated = self.conn.read_u8().await?;
904
905 let profile = Profile {
906 rows,
907 blocks,
908 bytes,
909 applied_limit: applied_limit != 0,
910 rows_before_limit,
911 calculated_rows_before_limit: calculated != 0,
912 };
913
914 if let Some(callback) = query.get_on_profile() {
916 callback(&profile);
917 }
918 }
919 code if code == ServerCode::Log as u64 => {
920 debug!("Received log packet");
921 let _log_tag = self.conn.read_string().await?;
922 let uncompressed_reader =
924 BlockReader::new(self.server_info.revision);
925 let block =
926 uncompressed_reader.read_block(&mut self.conn).await?;
927
928 if let Some(callback) = query.get_on_server_log() {
930 callback(&block);
931 }
932 }
933 code if code == ServerCode::ProfileEvents as u64 => {
934 debug!("Received profile events packet");
935 let _table_name = self.conn.read_string().await?;
936 let uncompressed_reader =
938 BlockReader::new(self.server_info.revision);
939 let block =
940 uncompressed_reader.read_block(&mut self.conn).await?;
941
942 if let Some(callback) = query.get_on_profile_events() {
944 callback(&block);
945 }
946 }
947 code if code == ServerCode::TableColumns as u64 => {
948 debug!("Received table columns packet (ignoring)");
949 let _table_name = self.conn.read_string().await?;
951 let _columns_metadata = self.conn.read_string().await?;
953 }
954 code if code == ServerCode::Exception as u64 => {
955 let exception = self.read_exception().await?;
956 debug!(
957 "Received exception: {} - {}",
958 exception.name, exception.display_text
959 );
960
961 if let Some(callback) = query.get_on_exception() {
963 callback(&exception);
964 }
965
966 return Err(Error::Protocol(format!(
967 "ClickHouse exception: {} (code {}): {}",
968 exception.name, exception.code, exception.display_text
969 )));
970 }
971 other => {
972 return Err(Error::Protocol(format!(
973 "Unexpected packet type during query: {}",
974 other
975 )));
976 }
977 }
978 }
979
980 Ok(QueryResult { blocks, progress: progress_info })
981 }
982
983 async fn send_query(&mut self, query: &Query) -> Result<()> {
985 self.send_query_internal(query, true).await
986 }
987
988 async fn send_query_internal(
990 &mut self,
991 query: &Query,
992 finalize: bool,
993 ) -> Result<()> {
994 debug!("Sending query: {}", query.text());
995 self.conn.write_varint(ClientCode::Query as u64).await?;
997
998 self.conn.write_string(query.id()).await?;
1000 debug!("Sent query ID");
1001
1002 let revision = self.server_info.revision;
1004 if revision >= 54032 {
1005 debug!("Writing client info...");
1006 let info = &self.options.client_info;
1007
1008 self.conn.write_u8(1).await?; self.conn.write_string(&info.initial_user).await?;
1011 self.conn.write_string(&info.initial_query_id).await?;
1012 self.conn.write_string("127.0.0.1:0").await?; if revision >= 54449 {
1015 self.conn.write_i64(0).await?; }
1017
1018 self.conn.write_u8(info.interface_type).await?; self.conn.write_string(&info.os_user).await?;
1020 self.conn.write_string(&info.client_hostname).await?;
1021 self.conn.write_string(&info.client_name).await?;
1022 self.conn.write_varint(info.client_version_major).await?;
1023 self.conn.write_varint(info.client_version_minor).await?;
1024 self.conn.write_varint(info.client_revision).await?;
1025
1026 if revision >= 54060 {
1027 self.conn.write_string(&info.quota_key).await?;
1028 }
1029 if revision >= 54448 {
1030 self.conn.write_varint(0).await?; }
1032 if revision >= 54401 {
1033 self.conn.write_varint(info.client_version_patch).await?;
1034 }
1035 if revision >= 54442 {
1036 if let Some(ctx) = query.tracing_context() {
1038 self.conn.write_u8(1).await?; self.conn.write_u128(ctx.trace_id).await?;
1041 self.conn.write_u64(ctx.span_id).await?;
1043 self.conn.write_string(&ctx.tracestate).await?;
1045 self.conn.write_u8(ctx.trace_flags).await?;
1047 } else {
1048 self.conn.write_u8(0).await?; }
1050 }
1051 if revision >= 54453 {
1052 self.conn.write_varint(0).await?; self.conn.write_varint(0).await?; self.conn.write_varint(0).await?; }
1056
1057 debug!("Client info sent");
1058 }
1059
1060 if revision >= 54429 {
1062 debug!("Writing settings...");
1063 for (key, field) in query.settings() {
1064 self.conn.write_string(key).await?;
1065 self.conn.write_varint(field.flags).await?;
1066 self.conn.write_string(&field.value).await?;
1067 }
1068 }
1069 self.conn.write_string("").await?;
1071 debug!("Settings sent");
1072
1073 if revision >= 54441 {
1075 self.conn.write_string("").await?; }
1077
1078 debug!("Writing query stage and text...");
1080 self.conn.write_varint(2).await?; let compression_enabled =
1083 if self.options.compression.is_some() { 1u64 } else { 0u64 };
1084 self.conn.write_varint(compression_enabled).await?;
1085 self.conn.write_string(query.text()).await?;
1086
1087 if revision >= 54459 {
1089 for (key, value) in query.parameters() {
1090 self.conn.write_string(key).await?;
1091 self.conn.write_varint(2).await?; self.conn.write_quoted_string(value).await?;
1093 }
1094 self.conn.write_string("").await?;
1096 }
1097
1098 if finalize {
1100 self.finalize_query().await?;
1101 }
1102
1103 Ok(())
1104 }
1105
1106 async fn finalize_query(&mut self) -> Result<()> {
1112 debug!("Sending empty block to finalize...");
1115 self.conn.write_varint(ClientCode::Data as u64).await?;
1116 let empty_block = Block::new();
1117 let writer = if let Some(compression) = self.options.compression {
1119 BlockWriter::new(self.server_info.revision)
1120 .with_compression(compression)
1121 } else {
1122 BlockWriter::new(self.server_info.revision)
1123 };
1124 writer.write_block(&mut self.conn, &empty_block).await?;
1125
1126 self.conn.flush().await?;
1127 debug!("Query finalized");
1128 Ok(())
1129 }
1130
1131 async fn send_external_tables(
1137 &mut self,
1138 external_tables: &[crate::ExternalTable],
1139 ) -> Result<()> {
1140 for table in external_tables {
1141 if table.data.row_count() == 0 {
1143 continue;
1144 }
1145
1146 debug!("Sending external table: {}", table.name);
1147
1148 self.conn.write_varint(ClientCode::Data as u64).await?;
1150
1151 self.conn.write_string(&table.name).await?;
1154
1155 self.block_writer
1158 .write_block_with_temp_table(
1159 &mut self.conn,
1160 &table.data,
1161 false,
1162 )
1163 .await?;
1164 }
1165
1166 self.conn.flush().await?;
1167 Ok(())
1168 }
1169
1170 async fn read_progress(&mut self) -> Result<Progress> {
1172 let rows = self.conn.read_varint().await?;
1173 let bytes = self.conn.read_varint().await?;
1174 let total_rows = self.conn.read_varint().await?;
1175
1176 let (written_rows, written_bytes) = if self.server_info.revision
1177 >= 54405
1178 {
1179 (self.conn.read_varint().await?, self.conn.read_varint().await?)
1180 } else {
1181 (0, 0)
1182 };
1183
1184 Ok(Progress { rows, bytes, total_rows, written_rows, written_bytes })
1185 }
1186
1187 fn read_exception_from_conn(
1190 conn: &mut Connection,
1191 ) -> std::pin::Pin<
1192 Box<
1193 dyn std::future::Future<Output = Result<crate::query::Exception>>
1194 + '_,
1195 >,
1196 > {
1197 use crate::query::Exception;
1198 Box::pin(async move {
1199 debug!("Reading exception code...");
1200 let code = conn.read_i32().await?;
1201 debug!("Exception code: {}", code);
1202 debug!("Reading exception name...");
1203 let name = conn.read_string().await?;
1204 debug!("Exception name: {}", name);
1205 debug!("Reading exception display_text...");
1206 let display_text = conn.read_string().await?;
1207 debug!("Exception display_text length: {}", display_text.len());
1208 debug!("Reading exception stack_trace...");
1209 let stack_trace = conn.read_string().await?;
1210 debug!("Exception stack_trace length: {}", stack_trace.len());
1211
1212 let has_nested = conn.read_u8().await?;
1214 let nested = if has_nested != 0 {
1215 Some(Box::new(Self::read_exception_from_conn(conn).await?))
1216 } else {
1217 None
1218 };
1219
1220 Ok(Exception { code, name, display_text, stack_trace, nested })
1221 })
1222 }
1223
1224 fn read_exception<'a>(
1226 &'a mut self,
1227 ) -> std::pin::Pin<
1228 Box<
1229 dyn std::future::Future<Output = Result<crate::query::Exception>>
1230 + 'a,
1231 >,
1232 > {
1233 Box::pin(async move {
1234 Self::read_exception_from_conn(&mut self.conn).await
1235 })
1236 }
1237
1238 pub async fn insert(
1246 &mut self,
1247 table_name: &str,
1248 block: Block,
1249 ) -> Result<()> {
1250 self.insert_with_id(table_name, "", block).await
1251 }
1252
1253 pub async fn insert_with_id(
1271 &mut self,
1272 table_name: &str,
1273 query_id: &str,
1274 block: Block,
1275 ) -> Result<()> {
1276 let col_names: Vec<String> = (0..block.column_count())
1279 .filter_map(|i| block.column_name(i))
1280 .map(|n| format!("`{}`", n.replace("`", "``"))) .collect();
1282
1283 if col_names.is_empty() {
1284 return Err(Error::Protocol("Block has no columns".to_string()));
1285 }
1286
1287 let query_text = format!(
1288 "INSERT INTO {} ({}) VALUES",
1289 table_name,
1290 col_names.join(", ")
1291 );
1292
1293 debug!("Sending INSERT query: {}", query_text);
1294 let query = Query::new(query_text).with_query_id(query_id);
1295
1296 self.send_query(&query).await?;
1298
1299 debug!("Waiting for server Data packet...");
1302 loop {
1303 let packet_type = self.conn.read_varint().await?;
1304 debug!("INSERT wait response packet type: {}", packet_type);
1305
1306 match packet_type {
1307 code if code == ServerCode::Data as u64 => {
1308 debug!("Received Data packet, ready to send data");
1309 if self.server_info.revision >= 50264 {
1312 let _temp_table = self.conn.read_string().await?;
1313 }
1314 let _block =
1316 self.block_reader.read_block(&mut self.conn).await?;
1317 debug!("Consumed Data packet payload, stream aligned");
1318 break;
1319 }
1320 code if code == ServerCode::Progress as u64 => {
1321 debug!("Received Progress packet");
1322 let _ = self.read_progress().await?;
1323 }
1324 code if code == ServerCode::TableColumns as u64 => {
1325 debug!("Received TableColumns packet");
1326 let _table_name = self.conn.read_string().await?;
1328 let _columns_metadata = self.conn.read_string().await?;
1330 }
1331 code if code == ServerCode::Exception as u64 => {
1332 debug!("Server returned exception before accepting data");
1333 let exception = self.read_exception().await?;
1334 return Err(Error::Protocol(format!(
1335 "ClickHouse exception: {} (code {}): {}",
1336 exception.name, exception.code, exception.display_text
1337 )));
1338 }
1339 other => {
1340 return Err(Error::Protocol(format!(
1341 "Unexpected packet type while waiting for Data: {}",
1342 other
1343 )));
1344 }
1345 }
1346 }
1347
1348 debug!("Sending data block with {} rows", block.row_count());
1350 self.conn.write_varint(ClientCode::Data as u64).await?;
1351 self.block_writer.write_block(&mut self.conn, &block).await?;
1352
1353 debug!("Sending empty block to signal end");
1355 let empty_block = Block::new();
1356 self.conn.write_varint(ClientCode::Data as u64).await?;
1357 self.block_writer.write_block(&mut self.conn, &empty_block).await?;
1358
1359 debug!("Waiting for EndOfStream...");
1361 loop {
1362 let packet_type = self.conn.read_varint().await?;
1363 debug!("INSERT final response packet type: {}", packet_type);
1364
1365 match packet_type {
1366 code if code == ServerCode::EndOfStream as u64 => {
1367 debug!("Received EndOfStream, insert complete");
1368 break;
1369 }
1370 code if code == ServerCode::Data as u64 => {
1371 debug!(
1372 "Received Data packet in INSERT response (skipping)"
1373 );
1374 if self.server_info.revision >= 50264 {
1376 let _temp_table = self.conn.read_string().await?;
1377 }
1378 let _block =
1380 self.block_reader.read_block(&mut self.conn).await?;
1381 }
1382 code if code == ServerCode::Progress as u64 => {
1383 debug!("Received Progress packet");
1384 let _ = self.read_progress().await?;
1385 }
1386 code if code == ServerCode::ProfileEvents as u64 => {
1387 debug!("Received ProfileEvents packet (skipping)");
1388 let _table_name = self.conn.read_string().await?;
1389 let uncompressed_reader =
1390 BlockReader::new(self.server_info.revision);
1391 let _block =
1392 uncompressed_reader.read_block(&mut self.conn).await?;
1393 }
1394 code if code == ServerCode::TableColumns as u64 => {
1395 debug!("Received TableColumns packet (skipping)");
1396 let _table_name = self.conn.read_string().await?;
1397 let _columns_metadata = self.conn.read_string().await?;
1398 }
1399 code if code == ServerCode::Exception as u64 => {
1400 debug!("Server returned exception after sending data");
1401 let exception = self.read_exception().await?;
1402 return Err(Error::Protocol(format!(
1403 "ClickHouse exception: {} (code {}): {}",
1404 exception.name, exception.code, exception.display_text
1405 )));
1406 }
1407 _ => {
1408 debug!("WARNING: Ignoring unexpected packet type: {} - stream may be misaligned", packet_type);
1409 }
1410 }
1411 }
1412
1413 Ok(())
1414 }
1415
1416 pub async fn ping(&mut self) -> Result<()> {
1418 debug!("Sending ping...");
1419 self.conn.write_varint(ClientCode::Ping as u64).await?;
1420 self.conn.flush().await?;
1421 debug!("Ping sent, waiting for pong...");
1422
1423 let packet_type = self.conn.read_varint().await?;
1424 debug!("Got response packet type: {}", packet_type);
1425
1426 if packet_type == ServerCode::Pong as u64 {
1427 debug!("Pong received!");
1428 Ok(())
1429 } else {
1430 debug!("Unexpected packet: {}", packet_type);
1431 Err(Error::Protocol(format!("Expected Pong, got {}", packet_type)))
1432 }
1433 }
1434
1435 pub async fn cancel(&mut self) -> Result<()> {
1442 debug!("Sending cancel...");
1443 self.conn.write_varint(ClientCode::Cancel as u64).await?;
1444 self.conn.flush().await?;
1445 debug!("Cancel sent");
1446 Ok(())
1447 }
1448
1449 pub fn server_info(&self) -> &ServerInfo {
1470 &self.server_info
1471 }
1472
1473 pub fn server_version(&self) -> (u64, u64, u64) {
1486 (
1487 self.server_info.version_major,
1488 self.server_info.version_minor,
1489 self.server_info.version_patch,
1490 )
1491 }
1492
1493 pub fn server_revision(&self) -> u64 {
1508 self.server_info.revision
1509 }
1510}
1511
1512pub struct QueryResult {
1515 pub blocks: Vec<Block>,
1517 pub progress: Progress,
1519}
1520
1521impl QueryResult {
1522 pub fn blocks(&self) -> &[Block] {
1524 &self.blocks
1525 }
1526
1527 pub fn progress(&self) -> &Progress {
1529 &self.progress
1530 }
1531
1532 pub fn total_rows(&self) -> usize {
1534 self.blocks.iter().map(|b| b.row_count()).sum()
1535 }
1536}
1537
1538#[cfg(test)]
1539#[cfg_attr(coverage_nightly, coverage(off))]
1540mod tests {
1541 use super::*;
1542
1543 #[test]
1544 fn test_client_options_default() {
1545 let opts = ClientOptions::default();
1546 assert_eq!(opts.host, "localhost");
1547 assert_eq!(opts.port, 9000);
1548 assert_eq!(opts.database, "default");
1549 }
1550
1551 #[test]
1552 fn test_client_options_builder() {
1553 let opts = ClientOptions::new("127.0.0.1", 9000)
1554 .database("test_db")
1555 .user("test_user")
1556 .password("test_pass");
1557
1558 assert_eq!(opts.host, "127.0.0.1");
1559 assert_eq!(opts.database, "test_db");
1560 assert_eq!(opts.user, "test_user");
1561 assert_eq!(opts.password, "test_pass");
1562 }
1563
1564 #[test]
1565 fn test_query_result() {
1566 let result =
1567 QueryResult { blocks: vec![], progress: Progress::default() };
1568
1569 assert_eq!(result.total_rows(), 0);
1570 }
1571}