1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3
4pub use self::{
5 compression::Compression,
6 row::{Row, RowOwned, RowRead, RowWrite},
7};
8#[cfg(feature = "sea-ql")]
9pub use self::{
10 data_row::{ColumnIndex, DataRow, FromValue, RowBatch, TypeError},
11 insert_data_row::DataRowInsert,
12 query::DataRowCursor,
13};
14use self::{error::Result, http_client::HttpClient};
15use crate::row_metadata::{AccessType, ColumnDefaultKind, InsertMetadata, RowMetadata};
16
17#[doc = include_str!("row_derive.md")]
18pub use clickhouse_macros::Row;
19use clickhouse_types::{Column, DataTypeNode};
20
21use crate::_priv::row_insert_metadata_query;
22use std::collections::HashSet;
23use std::{collections::HashMap, fmt::Display, sync::Arc};
24use tokio::sync::RwLock;
25
26pub mod error;
27pub mod insert;
28#[cfg(feature = "sea-ql")]
29pub mod insert_data_row;
30pub mod insert_formatted;
31#[cfg(feature = "inserter")]
32pub mod inserter;
33pub mod query;
34pub mod serde;
35pub mod sql;
36#[cfg(feature = "test-util")]
37pub mod test;
38
39pub mod schema;
40pub mod types;
41
42#[cfg(feature = "arrow")]
43pub mod arrow;
44mod bytes_ext;
45mod compression;
46mod cursors;
47#[cfg(feature = "sea-ql")]
48mod data_row;
49mod headers;
50mod http_client;
51mod request_body;
52mod response;
53mod row;
54mod row_metadata;
55mod rowbinary;
56#[cfg(feature = "inserter")]
57mod ticks;
58
59#[derive(Clone)]
67pub struct Client {
68 http: Arc<dyn HttpClient>,
69
70 url: String,
71 database: Option<String>,
72 authentication: Authentication,
73 compression: Compression,
74 roles: HashSet<String>,
75 options: HashMap<String, String>,
76 headers: HashMap<String, String>,
77 products_info: Vec<ProductInfo>,
78 validation: bool,
79 insert_metadata_cache: Arc<InsertMetadataCache>,
80
81 #[cfg(feature = "test-util")]
82 mocked: bool,
83}
84
85#[derive(Clone)]
86struct ProductInfo {
87 name: String,
88 version: String,
89}
90
91impl Display for ProductInfo {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 write!(f, "{}/{}", self.name, self.version)
94 }
95}
96
97#[derive(Clone, Debug, PartialEq)]
98pub(crate) enum Authentication {
99 Credentials {
100 user: Option<String>,
101 password: Option<String>,
102 },
103 Jwt {
104 access_token: String,
105 },
106}
107
108impl Default for Authentication {
109 fn default() -> Self {
110 Self::Credentials {
111 user: None,
112 password: None,
113 }
114 }
115}
116
117impl Default for Client {
118 fn default() -> Self {
119 Self::with_http_client(http_client::default())
120 }
121}
122
123#[derive(Default)]
126pub(crate) struct InsertMetadataCache(RwLock<HashMap<String, Arc<InsertMetadata>>>);
127
128impl Client {
129 pub fn with_http_client(client: impl HttpClient) -> Self {
133 Self {
134 http: Arc::new(client),
135 url: String::new(),
136 database: None,
137 authentication: Authentication::default(),
138 compression: Compression::default(),
139 roles: HashSet::new(),
140 options: HashMap::new(),
141 headers: HashMap::new(),
142 products_info: Vec::default(),
143 validation: true,
144 insert_metadata_cache: Arc::new(InsertMetadataCache::default()),
145 #[cfg(feature = "test-util")]
146 mocked: false,
147 }
148 }
149
150 pub fn with_url(mut self, url: impl Into<String>) -> Self {
161 self.url = url.into();
162
163 #[cfg(feature = "test-util")]
166 if let Some(url) = test::Mock::mocked_url_to_real(&self.url) {
167 self.url = url;
168 self.mocked = true;
169 }
170
171 self.insert_metadata_cache = Default::default();
173
174 self
175 }
176
177 pub fn with_database(mut self, database: impl Into<String>) -> Self {
188 self.database = Some(database.into());
189
190 self.insert_metadata_cache = Default::default();
192
193 self
194 }
195
196 pub fn with_user(mut self, user: impl Into<String>) -> Self {
207 match self.authentication {
208 Authentication::Jwt { .. } => {
209 panic!("`user` cannot be set together with `access_token`");
210 }
211 Authentication::Credentials { password, .. } => {
212 self.authentication = Authentication::Credentials {
213 user: Some(user.into()),
214 password,
215 };
216 }
217 }
218 self
219 }
220
221 pub fn with_password(mut self, password: impl Into<String>) -> Self {
232 match self.authentication {
233 Authentication::Jwt { .. } => {
234 panic!("`password` cannot be set together with `access_token`");
235 }
236 Authentication::Credentials { user, .. } => {
237 self.authentication = Authentication::Credentials {
238 user,
239 password: Some(password.into()),
240 };
241 }
242 }
243 self
244 }
245
246 pub fn with_roles(mut self, roles: impl IntoIterator<Item = impl Into<String>>) -> Self {
268 self.set_roles(roles);
269 self
270 }
271
272 pub fn with_default_roles(mut self) -> Self {
278 self.clear_roles();
279 self
280 }
281
282 pub fn with_access_token(mut self, access_token: impl Into<String>) -> Self {
296 match self.authentication {
297 Authentication::Credentials { user, password }
298 if user.is_some() || password.is_some() =>
299 {
300 panic!("`access_token` cannot be set together with `user` or `password`");
301 }
302 _ => {
303 self.authentication = Authentication::Jwt {
304 access_token: access_token.into(),
305 }
306 }
307 }
308 self
309 }
310
311 pub fn with_compression(mut self, compression: Compression) -> Self {
321 self.compression = compression;
322 self
323 }
324
325 pub fn with_option(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
333 self.options.insert(name.into(), value.into());
334 self
335 }
336
337 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
345 self.headers.insert(name.into(), value.into());
346 self
347 }
348
349 pub fn with_product_info(
388 mut self,
389 product_name: impl Into<String>,
390 product_version: impl Into<String>,
391 ) -> Self {
392 self.products_info.push(ProductInfo {
393 name: product_name.into(),
394 version: product_version.into(),
395 });
396 self
397 }
398
399 pub fn set_option(
403 &mut self,
404 name: impl Into<String>,
405 value: impl Into<String>,
406 ) -> Option<String> {
407 self.options.insert(name.into(), value.into())
408 }
409
410 pub fn get_option(&self, name: impl AsRef<str>) -> Option<&str> {
412 self.options.get(name.as_ref()).map(String::as_str)
413 }
414
415 pub async fn insert<T: Row>(&self, table: &str) -> Result<insert::Insert<T>> {
434 if self.get_validation() {
435 let metadata = self.get_insert_metadata(table).await?;
436 let row = metadata.to_row::<T>()?;
437 return Ok(insert::Insert::new(self, table, Some(row)));
438 }
439 Ok(insert::Insert::new(self, table, None))
440 }
441
442 pub async fn insert_any<T: Row>(&self, table: &str) -> Result<insert::Insert<()>> {
454 Ok(self.insert::<T>(table).await?.into_any())
455 }
456
457 #[cfg(feature = "inserter")]
459 pub fn inserter<T: Row>(&self, table: &str) -> inserter::Inserter<T> {
460 inserter::Inserter::new(self, table)
461 }
462
463 pub fn insert_formatted_with(
480 &self,
481 sql: impl Into<String>,
482 ) -> insert_formatted::InsertFormatted {
483 insert_formatted::InsertFormatted::new(self, sql.into())
484 }
485
486 pub fn query(&self, query: &str) -> query::Query {
488 query::Query::new(self, query)
489 }
490
491 pub fn with_validation(mut self, enabled: bool) -> Self {
519 self.validation = enabled;
520 self
521 }
522
523 pub async fn clear_cached_metadata(&self) {
536 self.insert_metadata_cache.0.write().await.clear();
537 }
538
539 #[inline]
542 pub(crate) fn get_validation(&self) -> bool {
543 #[cfg(feature = "test-util")]
544 if self.mocked {
545 return false;
546 }
547 self.validation
548 }
549
550 pub(crate) fn set_roles(&mut self, roles: impl IntoIterator<Item = impl Into<String>>) {
551 self.clear_roles();
552 self.roles.extend(roles.into_iter().map(Into::into));
553 }
554
555 #[inline]
556 pub(crate) fn clear_roles(&mut self) {
557 self.options.remove(settings::ROLE);
559 self.roles.clear();
560 }
561
562 #[cfg(feature = "test-util")]
570 pub fn with_mock(mut self, mock: &test::Mock) -> Self {
571 self.url = mock.real_url().to_string();
572 self.mocked = true;
573 self
574 }
575
576 async fn get_insert_metadata(&self, table_name: &str) -> Result<Arc<InsertMetadata>> {
577 {
578 let read_lock = self.insert_metadata_cache.0.read().await;
579
580 if let Some(metadata) = read_lock.get(table_name) {
582 return Ok(metadata.clone());
583 }
584 }
585
586 let mut write_lock = self.insert_metadata_cache.0.write().await;
588 let db = match self.database {
589 Some(ref db) => db,
590 None => "default",
591 };
592
593 let mut columns_cursor = self
594 .query(&row_insert_metadata_query(db, table_name))
595 .fetch::<(String, String, String)>()?;
596
597 let mut columns = Vec::new();
598 let mut column_default_kinds = Vec::new();
599 let mut column_lookup = HashMap::new();
600
601 while let Some((name, type_, default_kind)) = columns_cursor.next().await? {
602 let data_type = DataTypeNode::new(&type_)?;
603 let default_kind = default_kind.parse::<ColumnDefaultKind>()?;
604
605 column_lookup.insert(name.clone(), columns.len());
606
607 columns.push(Column { name, data_type });
608
609 column_default_kinds.push(default_kind);
610 }
611
612 let metadata = Arc::new(InsertMetadata {
613 row_metadata: RowMetadata {
614 columns,
615 access_type: AccessType::WithSeqAccess, },
617 column_default_kinds,
618 column_lookup,
619 });
620
621 write_lock.insert(table_name.to_string(), metadata.clone());
622 Ok(metadata)
623 }
624}
625
626mod formats {
627 pub(crate) const ROW_BINARY: &str = "RowBinary";
628 pub(crate) const ROW_BINARY_WITH_NAMES_AND_TYPES: &str = "RowBinaryWithNamesAndTypes";
629}
630
631mod settings {
632 pub(crate) const DATABASE: &str = "database";
633 pub(crate) const DEFAULT_FORMAT: &str = "default_format";
634 pub(crate) const COMPRESS: &str = "compress";
635 pub(crate) const DECOMPRESS: &str = "decompress";
636 pub(crate) const ROLE: &str = "role";
637 pub(crate) const QUERY: &str = "query";
638}
639
640#[doc(hidden)]
643pub mod _priv {
644 pub use crate::row::RowKind;
645
646 #[cfg(feature = "lz4")]
647 pub fn lz4_compress(uncompressed: &[u8]) -> super::Result<bytes::Bytes> {
648 crate::compression::lz4::compress(uncompressed)
649 }
650
651 pub fn row_insert_metadata_query(db: &str, table: &str) -> String {
653 let mut out = "SELECT \
654 name, \
655 type, \
656 default_kind \
657 FROM system.columns \
658 WHERE database = "
659 .to_string();
660
661 crate::sql::escape::string(db, &mut out).unwrap();
662
663 out.push_str(" AND table = ");
664
665 crate::sql::escape::string(table, &mut out).unwrap();
666
667 out
668 }
669}
670
671#[cfg(test)]
672mod client_tests {
673 use crate::_priv::RowKind;
674 use crate::row_metadata::{AccessType, RowMetadata};
675 use crate::{Authentication, Client, Row};
676 use clickhouse_types::{Column, DataTypeNode};
677
678 #[test]
679 fn it_can_use_credentials_auth() {
680 assert_eq!(
681 Client::default()
682 .with_user("bob")
683 .with_password("secret")
684 .authentication,
685 Authentication::Credentials {
686 user: Some("bob".into()),
687 password: Some("secret".into()),
688 }
689 );
690 }
691
692 #[test]
693 fn it_can_use_credentials_auth_user_only() {
694 assert_eq!(
695 Client::default().with_user("alice").authentication,
696 Authentication::Credentials {
697 user: Some("alice".into()),
698 password: None,
699 }
700 );
701 }
702
703 #[test]
704 fn it_can_use_credentials_auth_password_only() {
705 assert_eq!(
706 Client::default().with_password("secret").authentication,
707 Authentication::Credentials {
708 user: None,
709 password: Some("secret".into()),
710 }
711 );
712 }
713
714 #[test]
715 fn it_can_override_credentials_auth() {
716 assert_eq!(
717 Client::default()
718 .with_user("bob")
719 .with_password("secret")
720 .with_user("alice")
721 .with_password("something_else")
722 .authentication,
723 Authentication::Credentials {
724 user: Some("alice".into()),
725 password: Some("something_else".into()),
726 }
727 );
728 }
729
730 #[test]
731 fn it_can_use_jwt_auth() {
732 assert_eq!(
733 Client::default().with_access_token("my_jwt").authentication,
734 Authentication::Jwt {
735 access_token: "my_jwt".into(),
736 }
737 );
738 }
739
740 #[test]
741 fn it_can_override_jwt_auth() {
742 assert_eq!(
743 Client::default()
744 .with_access_token("my_jwt")
745 .with_access_token("my_jwt_2")
746 .authentication,
747 Authentication::Jwt {
748 access_token: "my_jwt_2".into(),
749 }
750 );
751 }
752
753 #[test]
754 #[should_panic(expected = "`access_token` cannot be set together with `user` or `password`")]
755 fn it_cannot_use_jwt_after_with_user() {
756 let _ = Client::default()
757 .with_user("bob")
758 .with_access_token("my_jwt");
759 }
760
761 #[test]
762 #[should_panic(expected = "`access_token` cannot be set together with `user` or `password`")]
763 fn it_cannot_use_jwt_after_with_password() {
764 let _ = Client::default()
765 .with_password("secret")
766 .with_access_token("my_jwt");
767 }
768
769 #[test]
770 #[should_panic(expected = "`access_token` cannot be set together with `user` or `password`")]
771 fn it_cannot_use_jwt_after_both_with_user_and_with_password() {
772 let _ = Client::default()
773 .with_user("alice")
774 .with_password("secret")
775 .with_access_token("my_jwt");
776 }
777
778 #[test]
779 #[should_panic(expected = "`user` cannot be set together with `access_token`")]
780 fn it_cannot_use_with_user_after_jwt() {
781 let _ = Client::default()
782 .with_access_token("my_jwt")
783 .with_user("alice");
784 }
785
786 #[test]
787 #[should_panic(expected = "`password` cannot be set together with `access_token`")]
788 fn it_cannot_use_with_password_after_jwt() {
789 let _ = Client::default()
790 .with_access_token("my_jwt")
791 .with_password("secret");
792 }
793
794 #[test]
795 fn it_sets_validation_mode() {
796 let client = Client::default();
797 assert!(client.validation);
798 let client = client.with_validation(false);
799 assert!(!client.validation);
800 let client = client.with_validation(true);
801 assert!(client.validation);
802 }
803
804 #[derive(Debug, Clone, PartialEq)]
805 struct SystemRolesRow {
806 name: String,
807 id: uuid::Uuid,
808 storage: String,
809 }
810
811 impl SystemRolesRow {
812 fn columns() -> Vec<Column> {
813 vec![
814 Column::new("name".to_string(), DataTypeNode::String),
815 Column::new("id".to_string(), DataTypeNode::UUID),
816 Column::new("storage".to_string(), DataTypeNode::String),
817 ]
818 }
819 }
820
821 impl Row for SystemRolesRow {
822 const NAME: &'static str = "SystemRolesRow";
823 const KIND: RowKind = RowKind::Struct;
824 const COLUMN_COUNT: usize = 3;
825 const COLUMN_NAMES: &'static [&'static str] = &["name", "id", "storage"];
826 type Value<'a> = SystemRolesRow;
827 }
828
829 #[test]
830 fn get_row_metadata() {
831 let metadata =
832 RowMetadata::new_for_cursor::<SystemRolesRow>(SystemRolesRow::columns()).unwrap();
833 assert_eq!(metadata.columns, SystemRolesRow::columns());
834 assert_eq!(metadata.access_type, AccessType::WithSeqAccess);
835
836 let columns = vec![
838 Column::new("id".to_string(), DataTypeNode::UUID),
839 Column::new("storage".to_string(), DataTypeNode::String),
840 Column::new("name".to_string(), DataTypeNode::String),
841 ];
842 let metadata = RowMetadata::new_for_cursor::<SystemRolesRow>(columns.clone()).unwrap();
843 assert_eq!(metadata.columns, columns);
844 assert_eq!(
845 metadata.access_type,
846 AccessType::WithMapAccess(vec![1, 2, 0]) );
848 }
849
850 #[test]
851 fn it_does_follow_previous_configuration() {
852 let client = Client::default().with_option("async_insert", "1");
853 assert_eq!(client.options, client.clone().options,);
854 }
855
856 #[test]
857 fn it_does_not_follow_future_configuration() {
858 let client = Client::default();
859 let client_clone = client.clone();
860 let client = client.with_option("async_insert", "1");
861 assert_ne!(client.options, client_clone.options,);
862 }
863
864 #[test]
865 fn it_gets_and_sets_options() {
866 let mut client = Client::default();
867
868 assert_eq!(client.set_option("foo", "foo"), None);
869 assert_eq!(client.set_option("bar", "bar"), None);
870
871 assert_eq!(client.get_option("foo"), Some("foo"));
872 assert_eq!(client.get_option("bar"), Some("bar"));
873 assert_eq!(client.get_option("baz"), None);
874
875 assert_eq!(client.set_option("foo", "foo_2"), Some("foo".to_string()));
876 assert_eq!(client.set_option("bar", "bar_2"), Some("bar".to_string()));
877 }
878}