1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3
4pub use self::{
5 compression::Compression,
6 query_summary::QuerySummary,
7 row::{Row, RowOwned, RowRead, RowWrite},
8};
9#[cfg(feature = "sea-ql")]
10pub use self::{
11 data_row::{ColumnIndex, DataRow, FromValue, RowBatch, TypeError},
12 insert_data_row::DataRowInsert,
13 query::DataRowCursor,
14};
15use self::{error::Result, http_client::HttpClient};
16use crate::row_metadata::{AccessType, ColumnDefaultKind, InsertMetadata, RowMetadata};
17
18#[doc = include_str!("row_derive.md")]
19pub use clickhouse_macros::Row;
20use clickhouse_types::{Column, DataTypeNode};
21
22use crate::error::Error;
23use std::collections::HashSet;
24use std::{collections::HashMap, fmt::Display, sync::Arc};
25use tokio::sync::RwLock;
26
27pub mod error;
28pub mod insert;
29#[cfg(feature = "sea-ql")]
30pub mod insert_data_row;
31pub mod insert_formatted;
32#[cfg(feature = "inserter")]
33pub mod inserter;
34pub mod query;
35pub mod serde;
36pub mod sql;
37#[cfg(feature = "test-util")]
38pub mod test;
39
40pub mod schema;
41pub mod types;
42
43#[cfg(feature = "arrow")]
44pub mod arrow;
45mod bytes_ext;
46mod compression;
47mod cursors;
48#[cfg(feature = "sea-ql")]
49mod data_row;
50mod headers;
51mod http_client;
52mod query_summary;
53mod request_body;
54mod response;
55mod row;
56mod row_metadata;
57mod rowbinary;
58#[cfg(feature = "inserter")]
59mod ticks;
60
61#[derive(Clone)]
69pub struct Client {
70 http: Arc<dyn HttpClient>,
71
72 url: String,
73 database: Option<String>,
74 authentication: Authentication,
75 compression: Compression,
76 roles: HashSet<String>,
77 settings: HashMap<String, String>,
78 headers: HashMap<String, String>,
79 products_info: Vec<ProductInfo>,
80 validation: bool,
81 insert_metadata_cache: Arc<InsertMetadataCache>,
82
83 #[cfg(feature = "test-util")]
84 mocked: bool,
85}
86
87#[derive(Clone)]
88struct ProductInfo {
89 name: String,
90 version: String,
91}
92
93impl Display for ProductInfo {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 write!(f, "{}/{}", self.name, self.version)
96 }
97}
98
99#[derive(Clone, Debug, PartialEq)]
100pub(crate) enum Authentication {
101 Credentials {
102 user: Option<String>,
103 password: Option<String>,
104 },
105 Jwt {
106 access_token: String,
107 },
108}
109
110impl Default for Authentication {
111 fn default() -> Self {
112 Self::Credentials {
113 user: None,
114 password: None,
115 }
116 }
117}
118
119impl Default for Client {
120 fn default() -> Self {
121 Self::with_http_client(http_client::default())
122 }
123}
124
125#[derive(Default)]
128pub(crate) struct InsertMetadataCache(RwLock<HashMap<String, Arc<InsertMetadata>>>);
129
130impl Client {
131 pub fn with_http_client(client: impl HttpClient) -> Self {
135 Self {
136 http: Arc::new(client),
137 url: String::new(),
138 database: None,
139 authentication: Authentication::default(),
140 compression: Compression::default(),
141 roles: HashSet::new(),
142 settings: HashMap::new(),
143 headers: HashMap::new(),
144 products_info: Vec::default(),
145 validation: true,
146 insert_metadata_cache: Arc::new(InsertMetadataCache::default()),
147 #[cfg(feature = "test-util")]
148 mocked: false,
149 }
150 }
151
152 pub fn with_url(mut self, url: impl Into<String>) -> Self {
163 self.url = url.into();
164
165 #[cfg(feature = "test-util")]
168 if let Some(url) = test::Mock::mocked_url_to_real(&self.url) {
169 self.url = url;
170 self.mocked = true;
171 }
172
173 self.insert_metadata_cache = Default::default();
175
176 self
177 }
178
179 pub fn with_database(mut self, database: impl Into<String>) -> Self {
190 self.database = Some(database.into());
191
192 self.insert_metadata_cache = Default::default();
194
195 self
196 }
197
198 pub fn with_user(mut self, user: impl Into<String>) -> Self {
209 match self.authentication {
210 Authentication::Jwt { .. } => {
211 panic!("`user` cannot be set together with `access_token`");
212 }
213 Authentication::Credentials { password, .. } => {
214 self.authentication = Authentication::Credentials {
215 user: Some(user.into()),
216 password,
217 };
218 }
219 }
220 self
221 }
222
223 pub fn with_password(mut self, password: impl Into<String>) -> Self {
234 match self.authentication {
235 Authentication::Jwt { .. } => {
236 panic!("`password` cannot be set together with `access_token`");
237 }
238 Authentication::Credentials { user, .. } => {
239 self.authentication = Authentication::Credentials {
240 user,
241 password: Some(password.into()),
242 };
243 }
244 }
245 self
246 }
247
248 pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
270 self.set_roles(roles);
271 self
272 }
273
274 pub fn with_default_roles(mut self) -> Self {
280 self.clear_roles();
281 self
282 }
283
284 pub fn with_access_token(mut self, access_token: impl Into<String>) -> Self {
298 match self.authentication {
299 Authentication::Credentials { user, password }
300 if user.is_some() || password.is_some() =>
301 {
302 panic!("`access_token` cannot be set together with `user` or `password`");
303 }
304 _ => {
305 self.authentication = Authentication::Jwt {
306 access_token: access_token.into(),
307 }
308 }
309 }
310 self
311 }
312
313 pub fn with_compression(mut self, compression: Compression) -> Self {
325 self.compression = compression;
326 self
327 }
328
329 #[deprecated(since = "0.14.3", note = "please use `with_setting` instead")]
337 pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
338 self.settings.insert(name.into(), value.into());
339 self
340 }
341
342 pub fn with_setting(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
350 self.settings.insert(name.into(), value.into());
351 self
352 }
353
354 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
362 self.headers.insert(name.into(), value.into());
363 self
364 }
365
366 pub fn with_product_info(
405 mut self,
406 product_name: impl Into<String>,
407 product_version: impl Into<String>,
408 ) -> Self {
409 self.products_info.push(ProductInfo {
410 name: product_name.into(),
411 version: product_version.into(),
412 });
413 self
414 }
415
416 #[deprecated(since = "0.14.3", note = "please use `set_setting` instead")]
420 pub fn set_option(
421 &mut self,
422 name: impl Into<String>,
423 value: impl Into<String>,
424 ) -> Option<String> {
425 self.settings.insert(name.into(), value.into())
426 }
427
428 pub fn set_setting(
432 &mut self,
433 name: impl Into<String>,
434 value: impl Into<String>,
435 ) -> Option<String> {
436 self.settings.insert(name.into(), value.into())
437 }
438
439 #[deprecated(since = "0.14.3", note = "please use `get_setting` instead")]
441 pub fn get_option(&self, name: impl AsRef<str>) -> Option<&str> {
442 self.settings.get(name.as_ref()).map(String::as_str)
443 }
444
445 pub fn get_setting(&self, name: impl AsRef<str>) -> Option<&str> {
447 self.settings.get(name.as_ref()).map(String::as_str)
448 }
449
450 pub async fn insert<T: Row>(&self, table: &str) -> Result<insert::Insert<T>> {
473 let mut escaped_table_name = String::new();
474 sql::escape::identifier(table, &mut escaped_table_name)
475 .map_err(|e| Error::Other(format!("error escaping table name: {e:?}").into()))?;
477
478 self.insert_unescaped(&escaped_table_name).await
479 }
480
481 pub async fn insert_unescaped<T: Row>(
485 &self,
486 raw_table_name: &str,
487 ) -> Result<insert::Insert<T>> {
488 if self.get_validation() {
489 let metadata = self.get_insert_metadata(raw_table_name).await?;
490 let row = metadata.to_row::<T>()?;
491 return Ok(insert::Insert::new(self, raw_table_name, Some(row)));
492 }
493 Ok(insert::Insert::new(self, raw_table_name, None))
494 }
495
496 pub async fn insert_any<T: Row>(&self, table: &str) -> Result<insert::Insert<()>> {
508 Ok(self.insert::<T>(table).await?.into_any())
509 }
510
511 #[cfg(feature = "inserter")]
513 pub fn inserter<T: Row>(&self, table: &str) -> inserter::Inserter<T> {
514 inserter::Inserter::new(self, table)
515 }
516
517 pub fn insert_formatted_with(
534 &self,
535 sql: impl Into<String>,
536 ) -> insert_formatted::InsertFormatted {
537 insert_formatted::InsertFormatted::new(self, sql.into(), None)
539 }
540
541 pub fn query(&self, query: &str) -> query::Query {
543 query::Query::new(self, query)
544 }
545
546 pub fn with_validation(mut self, enabled: bool) -> Self {
574 self.validation = enabled;
575 self
576 }
577
578 pub async fn clear_cached_metadata(&self) {
591 self.insert_metadata_cache.0.write().await.clear();
592 }
593
594 #[inline]
597 pub(crate) fn get_validation(&self) -> bool {
598 #[cfg(feature = "test-util")]
599 if self.mocked {
600 return false;
601 }
602 self.validation
603 }
604
605 pub(crate) fn set_roles(&mut self, roles: impl IntoIterator<Item = impl Into<String>>) {
606 self.clear_roles();
607 self.roles.extend(roles.into_iter().map(Into::into));
608 }
609
610 #[inline]
611 pub(crate) fn clear_roles(&mut self) {
612 self.settings.remove(settings::ROLE);
614 self.roles.clear();
615 }
616
617 #[cfg(feature = "test-util")]
625 pub fn with_mock(mut self, mock: &test::Mock) -> Self {
626 self.url = mock.real_url().to_string();
627 self.mocked = true;
628 self
629 }
630
631 async fn get_insert_metadata(&self, raw_table_name: &str) -> Result<Arc<InsertMetadata>> {
632 #[derive(::serde::Deserialize, clickhouse_macros::Row)]
633 #[clickhouse(crate = "self")]
634 #[expect(dead_code)]
636 struct DescribeColumn {
637 name: String,
638 r#type: String,
639 default_type: String,
640 default_expression: String,
641 comment: String,
642 codec_expression: String,
643 ttl_expression: String,
644 }
645
646 {
647 let read_lock = self.insert_metadata_cache.0.read().await;
648
649 if let Some(metadata) = read_lock.get(raw_table_name) {
650 return Ok(metadata.clone());
651 }
652 }
653
654 let mut write_lock = self.insert_metadata_cache.0.write().await;
656
657 let mut columns_cursor = self
658 .query(&_priv::row_insert_metadata_query(raw_table_name))
659 .with_setting("describe_include_subcolumns", "0")
660 .fetch::<DescribeColumn>()?;
661
662 let mut columns = Vec::new();
663 let mut column_default_kinds = Vec::new();
664 let mut column_lookup = HashMap::new();
665
666 while let Some(column) = columns_cursor.next().await? {
667 let data_type = DataTypeNode::new(&column.r#type)?;
668 let default_kind = column.default_type.parse::<ColumnDefaultKind>()?;
669
670 column_lookup.insert(column.name.clone(), columns.len());
671
672 columns.push(Column {
673 name: column.name,
674 data_type,
675 });
676
677 column_default_kinds.push(default_kind);
678 }
679
680 let metadata = Arc::new(InsertMetadata {
681 row_metadata: RowMetadata {
682 columns,
683 access_type: AccessType::WithSeqAccess, },
685 column_default_kinds,
686 column_lookup,
687 });
688
689 write_lock.insert(raw_table_name.to_string(), metadata.clone());
690 Ok(metadata)
691 }
692}
693
694mod formats {
695 pub(crate) const ROW_BINARY: &str = "RowBinary";
696 pub(crate) const ROW_BINARY_WITH_NAMES_AND_TYPES: &str = "RowBinaryWithNamesAndTypes";
697}
698
699mod settings {
700 pub(crate) const DATABASE: &str = "database";
701 pub(crate) const DEFAULT_FORMAT: &str = "default_format";
702 pub(crate) const COMPRESS: &str = "compress";
703 pub(crate) const DECOMPRESS: &str = "decompress";
704 #[cfg(feature = "zstd")]
705 pub(crate) const ENABLE_HTTP_COMPRESSION: &str = "enable_http_compression";
706 pub(crate) const ROLE: &str = "role";
707 pub(crate) const QUERY: &str = "query";
708 pub(crate) const QUERY_ID: &str = "query_id";
709 pub(crate) const SESSION_ID: &str = "session_id";
710}
711
712#[doc(hidden)]
715pub mod _priv {
716 pub use crate::row::RowKind;
717
718 #[cfg(feature = "lz4")]
719 pub fn lz4_compress(uncompressed: &[u8]) -> super::Result<bytes::Bytes> {
720 crate::compression::lz4::compress(uncompressed)
721 }
722
723 #[cfg(feature = "zstd")]
724 pub fn zstd_compress(uncompressed: &[u8]) -> super::Result<bytes::Bytes> {
725 crate::compression::zstd::compress(uncompressed, None)
726 }
727
728 pub fn row_insert_metadata_query(raw_table: &str) -> String {
730 format!("DESCRIBE TABLE {raw_table}")
731 }
732}
733
734#[cfg(test)]
735mod client_tests {
736 use crate::_priv::RowKind;
737 use crate::row_metadata::{AccessType, RowMetadata};
738 use crate::{Authentication, Client, Row};
739 use clickhouse_types::{Column, DataTypeNode};
740
741 #[test]
742 fn it_can_use_credentials_auth() {
743 assert_eq!(
744 Client::default()
745 .with_user("bob")
746 .with_password("secret")
747 .authentication,
748 Authentication::Credentials {
749 user: Some("bob".into()),
750 password: Some("secret".into()),
751 }
752 );
753 }
754
755 #[test]
756 fn it_can_use_credentials_auth_user_only() {
757 assert_eq!(
758 Client::default().with_user("alice").authentication,
759 Authentication::Credentials {
760 user: Some("alice".into()),
761 password: None,
762 }
763 );
764 }
765
766 #[test]
767 fn it_can_use_credentials_auth_password_only() {
768 assert_eq!(
769 Client::default().with_password("secret").authentication,
770 Authentication::Credentials {
771 user: None,
772 password: Some("secret".into()),
773 }
774 );
775 }
776
777 #[test]
778 fn it_can_override_credentials_auth() {
779 assert_eq!(
780 Client::default()
781 .with_user("bob")
782 .with_password("secret")
783 .with_user("alice")
784 .with_password("something_else")
785 .authentication,
786 Authentication::Credentials {
787 user: Some("alice".into()),
788 password: Some("something_else".into()),
789 }
790 );
791 }
792
793 #[test]
794 fn it_can_use_jwt_auth() {
795 assert_eq!(
796 Client::default().with_access_token("my_jwt").authentication,
797 Authentication::Jwt {
798 access_token: "my_jwt".into(),
799 }
800 );
801 }
802
803 #[test]
804 fn it_can_override_jwt_auth() {
805 assert_eq!(
806 Client::default()
807 .with_access_token("my_jwt")
808 .with_access_token("my_jwt_2")
809 .authentication,
810 Authentication::Jwt {
811 access_token: "my_jwt_2".into(),
812 }
813 );
814 }
815
816 #[test]
817 #[should_panic(expected = "`access_token` cannot be set together with `user` or `password`")]
818 fn it_cannot_use_jwt_after_with_user() {
819 let _ = Client::default()
820 .with_user("bob")
821 .with_access_token("my_jwt");
822 }
823
824 #[test]
825 #[should_panic(expected = "`access_token` cannot be set together with `user` or `password`")]
826 fn it_cannot_use_jwt_after_with_password() {
827 let _ = Client::default()
828 .with_password("secret")
829 .with_access_token("my_jwt");
830 }
831
832 #[test]
833 #[should_panic(expected = "`access_token` cannot be set together with `user` or `password`")]
834 fn it_cannot_use_jwt_after_both_with_user_and_with_password() {
835 let _ = Client::default()
836 .with_user("alice")
837 .with_password("secret")
838 .with_access_token("my_jwt");
839 }
840
841 #[test]
842 #[should_panic(expected = "`user` cannot be set together with `access_token`")]
843 fn it_cannot_use_with_user_after_jwt() {
844 let _ = Client::default()
845 .with_access_token("my_jwt")
846 .with_user("alice");
847 }
848
849 #[test]
850 #[should_panic(expected = "`password` cannot be set together with `access_token`")]
851 fn it_cannot_use_with_password_after_jwt() {
852 let _ = Client::default()
853 .with_access_token("my_jwt")
854 .with_password("secret");
855 }
856
857 #[test]
858 fn it_sets_validation_mode() {
859 let client = Client::default();
860 assert!(client.validation);
861 let client = client.with_validation(false);
862 assert!(!client.validation);
863 let client = client.with_validation(true);
864 assert!(client.validation);
865 }
866
867 #[derive(Debug, Clone, PartialEq)]
868 struct SystemRolesRow {
869 name: String,
870 id: uuid::Uuid,
871 storage: String,
872 }
873
874 impl SystemRolesRow {
875 fn columns() -> Vec<Column> {
876 vec![
877 Column::new("name".to_string(), DataTypeNode::String),
878 Column::new("id".to_string(), DataTypeNode::UUID),
879 Column::new("storage".to_string(), DataTypeNode::String),
880 ]
881 }
882 }
883
884 impl Row for SystemRolesRow {
885 const NAME: &'static str = "SystemRolesRow";
886 const KIND: RowKind = RowKind::Struct;
887 const COLUMN_COUNT: usize = 3;
888 const COLUMN_NAMES: &'static [&'static str] = &["name", "id", "storage"];
889 type Value<'a> = SystemRolesRow;
890 }
891
892 #[test]
893 fn get_row_metadata() {
894 let metadata =
895 RowMetadata::new_for_cursor::<SystemRolesRow>(SystemRolesRow::columns()).unwrap();
896 assert_eq!(metadata.columns, SystemRolesRow::columns());
897 assert_eq!(metadata.access_type, AccessType::WithSeqAccess);
898
899 let columns = vec![
901 Column::new("id".to_string(), DataTypeNode::UUID),
902 Column::new("storage".to_string(), DataTypeNode::String),
903 Column::new("name".to_string(), DataTypeNode::String),
904 ];
905 let metadata = RowMetadata::new_for_cursor::<SystemRolesRow>(columns.clone()).unwrap();
906 assert_eq!(metadata.columns, columns);
907 assert_eq!(
908 metadata.access_type,
909 AccessType::WithMapAccess(vec![1, 2, 0]) );
911 }
912
913 #[test]
914 fn it_does_follow_previous_configuration() {
915 let client = Client::default().with_setting("async_insert", "1");
916 assert_eq!(client.settings, client.clone().settings,);
917 }
918
919 #[test]
920 fn it_does_not_follow_future_configuration() {
921 let client = Client::default();
922 let client_clone = client.clone();
923 let client = client.with_setting("async_insert", "1");
924 assert_ne!(client.settings, client_clone.settings,);
925 }
926
927 #[test]
928 fn it_gets_and_sets_settings() {
929 let mut client = Client::default();
930
931 assert_eq!(client.set_setting("foo", "foo"), None);
932 assert_eq!(client.set_setting("bar", "bar"), None);
933
934 assert_eq!(client.get_setting("foo"), Some("foo"));
935 assert_eq!(client.get_setting("bar"), Some("bar"));
936 assert_eq!(client.get_setting("baz"), None);
937
938 assert_eq!(client.set_setting("foo", "foo_2"), Some("foo".to_string()));
939 assert_eq!(client.set_setting("bar", "bar_2"), Some("bar".to_string()));
940 }
941}