1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum ClickHouseEngine {
17 MergeTree,
18 AggregatingMergeTree,
19 CollapsingMergeTree,
20 ReplacingMergeTree,
21 SummingMergeTree,
22 Memory,
23 Log,
24 StripeLog,
25 TinyLog,
26 Other(String),
27}
28
29impl<S> From<S> for ClickHouseEngine
30where
31 S: Into<String>,
32{
33 fn from(value: S) -> Self {
34 let engine = value.into();
35 match engine.to_uppercase().as_str() {
36 "MERGETREE" => Self::MergeTree,
37 "AGGREGATINGMERGETREE" => Self::AggregatingMergeTree,
38 "COLLAPSINGMERGETREE" => Self::CollapsingMergeTree,
39 "REPLACINGMERGETREE" => Self::ReplacingMergeTree,
40 "SUMMINGMERGETREE" => Self::SummingMergeTree,
41 "MEMORY" => Self::Memory,
42 "LOG" => Self::Log,
43 "STRIPELOG" => Self::StripeLog,
44 "TINYLOG" => Self::TinyLog,
45 _ => Self::Other(engine),
47 }
48 }
49}
50
51impl std::fmt::Display for ClickHouseEngine {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
55 Self::MergeTree => write!(f, "MergeTree"),
56 Self::AggregatingMergeTree => write!(f, "AggregatingMergeTree"),
57 Self::CollapsingMergeTree => write!(f, "CollapsingMergeTree"),
58 Self::ReplacingMergeTree => write!(f, "ReplacingMergeTree"),
59 Self::SummingMergeTree => write!(f, "SummingMergeTree"),
60 Self::Memory => write!(f, "Memory"),
61 Self::Log => write!(f, "Log"),
62 Self::StripeLog => write!(f, "StripeLog"),
63 Self::TinyLog => write!(f, "TinyLog"),
64 Self::Other(engine) => write!(f, "{engine}"),
65 }
66 }
67}
68
69#[derive(Debug, Default, Clone, PartialEq)]
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89pub struct CreateOptions {
90 pub engine: String,
91 pub order_by: Vec<String>,
92 pub primary_keys: Vec<String>,
93 pub partition_by: Option<String>,
94 pub sampling: Option<String>,
95 pub settings: Settings,
96 pub ttl: Option<String>,
97 pub schema_conversions: Option<SchemaConversions>,
98 pub defaults: Option<HashMap<String, String>>,
99 pub defaults_for_nullable: bool,
100}
101
102impl CreateOptions {
103 #[must_use]
111 pub fn new(engine: impl Into<String>) -> Self {
112 Self { engine: engine.into(), ..Default::default() }
113 }
114
115 #[must_use]
123 pub fn from_engine(engine: impl Into<ClickHouseEngine>) -> Self {
124 Self { engine: engine.into().to_string(), ..Default::default() }
125 }
126
127 #[must_use]
137 pub fn with_order_by(mut self, order_by: &[String]) -> Self {
138 self.order_by =
139 order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
140 self
141 }
142
143 #[must_use]
153 pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
154 self.primary_keys =
155 keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
156 self
157 }
158
159 #[must_use]
169 pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
170 let partition_by = partition_by.into();
171 if !partition_by.is_empty() {
172 self.partition_by = Some(partition_by);
173 }
174 self
175 }
176
177 #[must_use]
187 pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
188 let sampling = sampling.into();
189 if !sampling.is_empty() {
190 self.sampling = Some(sampling);
191 }
192 self
193 }
194
195 #[must_use]
203 pub fn with_settings(mut self, settings: Settings) -> Self {
204 self.settings = settings;
205 self
206 }
207
208 #[must_use]
218 pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
219 let ttl = ttl.into();
220 if !ttl.is_empty() {
221 self.ttl = Some(ttl);
222 }
223 self
224 }
225
226 #[must_use]
235 pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
236 where
237 SettingValue: From<S>,
238 {
239 self.settings.add_setting(name.into(), setting);
240 self
241 }
242
243 #[must_use]
251 pub fn with_defaults<I>(mut self, defaults: I) -> Self
252 where
253 I: Iterator<Item = (String, String)>,
254 {
255 self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
256 self
257 }
258
259 #[must_use]
264 pub fn with_defaults_for_nullable(mut self) -> Self {
265 self.defaults_for_nullable = true;
266 self
267 }
268
269 #[must_use]
278 pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
279 self.schema_conversions = Some(map);
280 self
281 }
282
283 pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
288
289 pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
294 self.schema_conversions.as_ref()
295 }
296
297 fn build(&self) -> Result<String> {
311 let engine = self.engine.to_string();
312 if engine.is_empty() {
313 return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
314 }
315
316 let mut options = vec![format!("ENGINE = {engine}")];
317
318 if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
320 return Ok(options.remove(0));
321 }
322
323 if self.order_by.is_empty() {
325 if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
327 {
328 return Err(Error::DDLMalformed(
329 "Cannot specify primary keys or sampling when order by is empty".into(),
330 ));
331 }
332
333 options.push("ORDER BY tuple()".into());
334 } else {
335 let order_by = self.order_by.clone();
336
337 if !self.primary_keys.is_empty()
339 && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
340 {
341 return Err(Error::DDLMalformed(format!(
342 "Primary keys but be present in order by and the ordering must match: order \
343 by = {order_by:?}, primary keys = {:?}",
344 self.primary_keys
345 )));
346 }
347
348 if let Some(sample) = self.sampling.as_ref() {
350 if !order_by.iter().any(|o| sample.contains(o.as_str())) {
351 return Err(Error::DDLMalformed(format!(
352 "Sampling must refer to a primary key: order by = {order_by:?}, \
353 sampling={:?}",
354 self.sampling
355 )));
356 }
357 }
358
359 options.push(format!("ORDER BY ({})", order_by.join(", ")));
360 }
361
362 if !self.primary_keys.is_empty() {
363 let primary_keys = self.primary_keys.clone();
364 options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
365 }
366
367 if let Some(partition) = self.partition_by.as_ref() {
368 options.push(format!("PARTITION BY {partition}"));
369 }
370
371 if let Some(sample) = self.sampling.as_ref() {
372 options.push(format!("SAMPLE BY {sample}"));
373 }
374
375 if let Some(ttl) = self.ttl.as_ref() {
376 options.push(format!("TTL {ttl}"));
377 }
378
379 if !self.settings.is_empty() {
380 options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
381 }
382
383 Ok(options.join("\n"))
384 }
385}
386
387pub(crate) fn create_db_statement(database: &str) -> Result<String> {
407 if database.is_empty() {
408 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
409 }
410
411 if database.eq_ignore_ascii_case("default") {
412 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
413 }
414
415 Ok(format!("CREATE DATABASE IF NOT EXISTS {database}"))
416}
417
418pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
439 if database.is_empty() {
440 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
441 }
442
443 if database.eq_ignore_ascii_case("default") {
444 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
445 }
446
447 let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
448 ddl.push_str(database);
449 if sync {
450 ddl.push_str(" SYNC");
451 }
452 Ok(ddl)
453}
454
455pub(crate) fn create_table_statement_from_arrow(
490 database: Option<&str>,
491 table: &str,
492 schema: &SchemaRef,
493 options: &CreateOptions,
494 arrow_options: Option<ArrowOptions>,
495) -> Result<String> {
496 if schema.fields().is_empty() {
497 return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
498 }
499 let definition = RecordBatchDefinition {
500 arrow_options,
501 schema: Arc::clone(schema),
502 defaults: options.defaults().cloned(),
503 };
504 create_table_statement(database, table, Some(definition), options)
505}
506
507pub(crate) fn create_table_statement_from_native<T: Row>(
542 database: Option<&str>,
543 table: &str,
544 options: &CreateOptions,
545) -> Result<String> {
546 create_table_statement::<T>(database, table, None, options)
547}
548
549pub(crate) fn create_table_statement<T: ColumnDefine>(
550 database: Option<&str>,
551 table: &str,
552 schema: Option<T>,
553 options: &CreateOptions,
554) -> Result<String> {
555 let column_definitions = schema
556 .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
557 .transpose()?
558 .flatten()
559 .or(T::definitions());
560
561 let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
562 return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
563 };
564
565 let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
566 let table = table.trim_matches('`');
567 let mut sql = String::new();
568 let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
569
570 let total = definitions.len();
571 for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
572 let _ = write!(sql, " {name} {type_}");
573 if let Some(d) = options
574 .defaults
575 .as_ref()
576 .and_then(|d| d.get(&name))
577 .or(default_value.map(|d| d.to_string()).as_ref())
578 {
579 let _ = write!(sql, " DEFAULT");
580 if !d.is_empty() && d != "NULL" {
581 let _ = write!(sql, " {d}");
582 }
583 } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
584 let _ = write!(sql, " DEFAULT");
585 }
586
587 if i < (total - 1) {
588 let _ = writeln!(sql, ",");
589 }
590 }
591
592 let _ = writeln!(sql, "\n)");
593 let _ = write!(sql, "{}", options.build()?);
594
595 Ok(sql)
596}
597
598pub trait ColumnDefine: Sized {
604 type DefaultValue: std::fmt::Display + std::fmt::Debug;
605
606 fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
608
609 fn runtime_definitions(
615 &self,
616 _: Option<&HashMap<String, Type>>,
617 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
618 Ok(Self::definitions())
619 }
620}
621
622impl<T: Row> ColumnDefine for T {
623 type DefaultValue = crate::Value;
624
625 fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
626
627 fn runtime_definitions(
628 &self,
629 conversions: Option<&HashMap<String, Type>>,
630 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
631 let Some(static_definitions) = Self::definitions() else {
632 return Ok(None);
633 };
634
635 if let Some(conversions) = conversions {
636 return Ok(Some(
637 static_definitions
638 .into_iter()
639 .map(|(name, type_, default_value)| {
640 let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
641 (name, resolved_type, default_value)
642 })
643 .collect::<Vec<_>>(),
644 ));
645 }
646
647 Ok(Some(static_definitions))
648 }
649}
650
651pub(crate) struct RecordBatchDefinition {
653 pub(crate) arrow_options: Option<ArrowOptions>,
654 pub(crate) schema: SchemaRef,
655 pub(crate) defaults: Option<HashMap<String, String>>,
656}
657
658impl ColumnDefine for RecordBatchDefinition {
659 type DefaultValue = String;
660
661 fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
662
663 fn runtime_definitions(
664 &self,
665 conversions: Option<&HashMap<String, Type>>,
666 ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
667 let mut fields = Vec::with_capacity(self.schema.fields.len());
668 for field in self.schema.fields() {
669 let type_ =
670 schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
671 error!("Arrow conversion failed for field {field:?}: {error}");
672 })?;
673 let default_val =
674 if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
675 if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
676 } else {
677 None
678 };
679 fields.push((field.name().to_string(), type_, default_val));
680 }
681 Ok(Some(fields))
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use std::sync::Arc;
688
689 use arrow::datatypes::{DataType, Field, Schema};
690
691 use super::{ClickHouseEngine, *};
692 use crate::Type;
693
694 #[allow(clippy::needless_pass_by_value)]
695 fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
696 assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
697 }
698
699 #[test]
700 fn test_create_options_new() {
701 let options = CreateOptions::new("MergeTree");
702 assert_eq!(options.engine, "MergeTree");
703 assert!(options.order_by.is_empty());
704 assert!(options.primary_keys.is_empty());
705 assert!(options.partition_by.is_none());
706 assert!(options.sampling.is_none());
707 assert!(options.settings.is_empty());
708 assert!(options.ttl.is_none());
709 assert!(options.defaults.is_none());
710 assert!(!options.defaults_for_nullable);
711 }
712
713 #[test]
714 fn test_create_options_with_order_by() {
715 let options = CreateOptions::new("MergeTree").with_order_by(&[
716 "id".to_string(),
717 String::new(),
718 "name".to_string(),
719 ]);
720 assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
721 }
722
723 #[test]
724 fn test_create_options_with_primary_keys() {
725 let options = CreateOptions::new("MergeTree").with_primary_keys(&[
726 "id".to_string(),
727 String::new(),
728 "name".to_string(),
729 ]);
730 assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
731 }
732
733 #[test]
734 fn test_create_options_with_partition_by() {
735 let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
736 assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
737
738 let options = CreateOptions::new("MergeTree").with_partition_by("");
739 assert_eq!(options.partition_by, None);
740 }
741
742 #[test]
743 fn test_create_options_with_sample_by() {
744 let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
745 assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
746
747 let options = CreateOptions::new("MergeTree").with_sample_by("");
748 assert_eq!(options.sampling, None);
749 }
750
751 #[test]
752 fn test_create_options_with_settings() {
753 let settings = Settings::default().with_setting("index_granularity", 4096);
754 let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
755 assert_eq!(options.settings, settings);
756 }
757
758 #[test]
759 fn test_create_options_with_setting() {
760 let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
761 assert_eq!(options.settings.encode_to_strings(), vec![
762 "index_granularity = 4096".to_string()
763 ]);
764 }
765
766 #[test]
767 fn test_create_options_with_ttl() {
768 let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
769 assert_eq!(options.ttl, Some("1 DAY".to_string()));
770
771 let options = CreateOptions::new("MergeTree").with_ttl("");
772 assert_eq!(options.ttl, None);
773 }
774
775 #[test]
776 fn test_create_options_with_defaults() {
777 let defaults = vec![
778 ("id".to_string(), "0".to_string()),
779 ("name".to_string(), "'unknown'".to_string()),
780 ];
781 let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
782 assert_eq!(
783 options.defaults,
784 Some(HashMap::from([
785 ("id".to_string(), "0".to_string()),
786 ("name".to_string(), "'unknown'".to_string()),
787 ]))
788 );
789 }
790
791 #[test]
792 fn test_create_options_with_defaults_for_nullable() {
793 let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
794 assert!(options.defaults_for_nullable);
795 }
796
797 #[test]
798 fn test_create_options_build_merge_tree() {
799 let options = CreateOptions::new("MergeTree")
800 .with_order_by(&["id".to_string(), "date".to_string()])
801 .with_primary_keys(&["id".to_string()])
802 .with_partition_by("toYYYYMM(date)")
803 .with_sample_by("cityHash64(id)")
804 .with_ttl("1 DAY")
805 .with_setting("index_granularity", 4096);
806 let sql = options.build().unwrap();
807 compare_sql(
808 sql,
809 "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
810 toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
811 4096",
812 );
813 }
814
815 #[test]
816 fn test_create_options_build_log_engine() {
817 let options = CreateOptions::new("TinyLog");
818 let sql = options.build().unwrap();
819 assert_eq!(sql, "ENGINE = TinyLog");
820 }
821
822 #[test]
823 fn test_create_options_build_empty_order_by() {
824 let options = CreateOptions::new("MergeTree");
825 let sql = options.build().unwrap();
826 compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
827 }
828
829 #[test]
830 fn test_create_options_build_invalid_engine() {
831 let options = CreateOptions::new("");
832 let result = options.build();
833 assert!(matches!(result, Err(Error::DDLMalformed(_))));
834 }
835
836 #[test]
837 fn test_create_options_build_invalid_primary_keys() {
838 let options = CreateOptions::new("MergeTree")
839 .with_order_by(&["id".to_string()])
840 .with_primary_keys(&["name".to_string()]);
841 let result = options.build();
842 assert!(matches!(result, Err(Error::DDLMalformed(_))));
843 }
844
845 #[test]
846 fn test_create_options_build_invalid_sampling() {
847 let options = CreateOptions::new("MergeTree")
848 .with_order_by(&["id".to_string()])
849 .with_sample_by("cityHash64(name)");
850 let result = options.build();
851 assert!(matches!(result, Err(Error::DDLMalformed(_))));
852 }
853
854 #[test]
855 fn test_create_db_statement() {
856 let sql = create_db_statement("my_db").unwrap();
857 assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
858
859 let result = create_db_statement("");
860 assert!(matches!(result, Err(Error::DDLMalformed(_))));
861
862 let result = create_db_statement("default");
863 assert!(matches!(result, Err(Error::DDLMalformed(_))));
864 }
865
866 #[test]
867 fn test_drop_db_statement() {
868 let sql = drop_db_statement("my_db", false).unwrap();
869 compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
870
871 let sql = drop_db_statement("my_db", true).unwrap();
872 compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
873
874 let result = drop_db_statement("", false);
875 assert!(matches!(result, Err(Error::DDLMalformed(_))));
876
877 let result = drop_db_statement("default", false);
878 assert!(matches!(result, Err(Error::DDLMalformed(_))));
879 }
880
881 #[test]
882 fn test_create_table_statement() {
883 let schema = Arc::new(Schema::new(vec![
884 Field::new("id", DataType::Int32, false),
885 Field::new("name", DataType::Utf8, true),
886 ]));
887 let options = CreateOptions::new("MergeTree")
888 .with_order_by(&["id".to_string()])
889 .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
890 .with_defaults_for_nullable();
891 let sql =
892 create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
893 compare_sql(
894 sql,
895 "CREATE TABLE IF NOT EXISTS `my_table` (\n id Int32,\n name Nullable(String) \
896 DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
897 );
898 }
899
900 #[test]
901 fn test_create_table_statement_with_database() {
902 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
903 let options = CreateOptions::new("Memory");
904 let sql =
905 create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
906 .unwrap();
907 compare_sql(
908 sql,
909 "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
910 BY tuple()",
911 );
912 }
913
914 #[test]
915 fn test_create_table_statement_empty_schema() {
916 let schema = Arc::new(Schema::empty());
917 let options = CreateOptions::new("MergeTree");
918 let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
919 assert!(matches!(result, Err(Error::DDLMalformed(_))));
920 }
921
922 #[test]
923 fn test_create_table_with_nullable_dictionary() {
924 let schema = Arc::new(Schema::new(vec![
925 Field::new(
926 "status",
927 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
928 true,
929 ),
930 Field::new("id", DataType::Int32, false),
931 ]));
932
933 let enum_i8 = HashMap::from_iter([(
934 "status".to_string(),
935 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
936 )]);
937
938 let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
939 let enum_options = options.clone().with_schema_conversions(enum_i8);
940
941 assert!(
943 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
944 );
945
946 let sql =
948 create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
949 .expect("Should generate valid SQL");
950
951 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
952 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
953 assert!(sql.contains("id Int32"));
954 assert!(sql.contains("ENGINE = MergeTree"));
955 assert!(sql.contains("ORDER BY (id)"));
956 }
957
958 #[test]
959 fn test_create_table_with_enum8() {
960 let schema = Arc::new(Schema::new(vec![
961 Field::new(
962 "status",
963 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
964 false,
965 ),
966 Field::new("id", DataType::Int32, false),
967 ]));
968
969 let enum_i8 = HashMap::from_iter([(
970 "status".to_string(),
971 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
972 )]);
973
974 let options = CreateOptions::new("MergeTree")
975 .with_order_by(&["id".to_string()])
976 .with_schema_conversions(enum_i8);
977
978 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
979 .expect("Should generate valid SQL");
980
981 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
982 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
983 assert!(sql.contains("id Int32"));
984 assert!(sql.contains("ENGINE = MergeTree"));
985 assert!(sql.contains("ORDER BY (id)"));
986 }
987
988 #[test]
989 fn test_create_table_with_enum16() {
990 let schema = Arc::new(Schema::new(vec![
991 Field::new(
992 "category",
993 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
994 false,
995 ),
996 Field::new("value", DataType::Float32, true),
997 ]));
998
999 let enum_i16 = HashMap::from_iter([(
1000 "category".to_string(),
1001 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
1002 )]);
1003 let options = CreateOptions::new("MergeTree")
1004 .with_order_by(&["category".to_string()])
1005 .with_schema_conversions(enum_i16);
1006
1007 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1008 .expect("Should generate valid SQL");
1009
1010 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1011 assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
1012 assert!(sql.contains("value Nullable(Float32)"));
1013 assert!(sql.contains("ENGINE = MergeTree"));
1014 assert!(sql.contains("ORDER BY (category)"));
1015 }
1016
1017 #[test]
1018 fn test_create_table_with_invalid_enum_type() {
1019 let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
1020
1021 let enum_i8 = HashMap::from_iter([(
1022 "status".to_string(),
1023 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1024 )]);
1025
1026 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1027
1028 let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
1029
1030 assert!(matches!(
1031 result,
1032 Err(Error::TypeConversion(msg))
1033 if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
1034 ));
1035 }
1036
1037 #[test]
1038 fn test_create_table_with_non_low_cardinality_enum() {
1039 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1040
1041 let enum_i8 = HashMap::from_iter([(
1042 "name".to_string(),
1043 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1044 )]);
1045 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1046
1047 let sql =
1048 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
1049
1050 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1051 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1052 assert!(sql.contains("ENGINE = MergeTree"));
1053 }
1054
1055 #[test]
1057 fn test_create_table_with_nullable_field_non_nullable_enum() {
1058 let schema = Arc::new(Schema::new(vec![
1059 Field::new("name", DataType::Utf8, true),
1060 Field::new("status", DataType::Utf8, false),
1061 ]));
1062
1063 let enum_i8 = HashMap::from_iter([
1064 (
1065 "name".to_string(),
1066 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1067 .into_nullable(),
1068 ),
1069 (
1070 "status".to_string(),
1071 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1072 .into_nullable(),
1073 ),
1074 ]);
1075 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1076 let arrow_options = ArrowOptions::default()
1077 .with_strings_as_strings(true)
1079 .with_use_date32_for_date(true)
1081 .with_strict_schema(false)
1083 .with_disable_strict_schema_ddl(true);
1084
1085 let sql = create_table_statement_from_arrow(
1086 None,
1087 "test_table",
1088 &schema,
1089 &options,
1090 Some(arrow_options),
1091 )
1092 .unwrap();
1093
1094 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1095 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1096 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1097 assert!(sql.contains("ENGINE = MergeTree"));
1098 }
1099
1100 #[test]
1101 fn test_create_table_with_mixed_enum_and_non_enum() {
1102 let schema = Arc::new(Schema::new(vec![
1103 Field::new(
1104 "status",
1105 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1106 true,
1107 ),
1108 Field::new("name", DataType::Utf8, true),
1109 Field::new(
1110 "category",
1111 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1112 false,
1113 ),
1114 ]));
1115
1116 let enums = HashMap::from_iter([
1117 (
1118 "status".to_string(),
1119 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1120 ),
1121 (
1122 "category".to_string(),
1123 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1124 ),
1125 ]);
1126
1127 let options = CreateOptions::new("MergeTree")
1128 .with_order_by(&["category".to_string()])
1129 .with_schema_conversions(enums);
1130
1131 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1132 .expect("Should generate valid SQL");
1133
1134 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1135 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1136 assert!(sql.contains("name Nullable(String)"));
1137 assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1138 assert!(sql.contains("ENGINE = MergeTree"));
1139 assert!(sql.contains("ORDER BY (category)"));
1140 }
1141
1142 #[test]
1143 fn test_engines() {
1144 use super::ClickHouseEngine::*;
1145
1146 let engines = [
1147 MergeTree,
1148 AggregatingMergeTree,
1149 CollapsingMergeTree,
1150 ReplacingMergeTree,
1151 SummingMergeTree,
1152 Memory,
1153 Log,
1154 StripeLog,
1155 TinyLog,
1156 Other("NonExistentEngine".into()),
1157 ];
1158
1159 for engine in engines {
1160 let engine_str = engine.to_string();
1161 let engine_from = ClickHouseEngine::from(engine_str);
1162 assert_eq!(engine, engine_from);
1163 }
1164 }
1165}