1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum ClickHouseEngine {
17 MergeTree,
18 AggregatingMergeTree,
19 CollapsingMergeTree,
20 ReplacingMergeTree,
21 SummingMergeTree,
22 Memory,
23 Log,
24 StripeLog,
25 TinyLog,
26 Other(String),
27}
28
29impl<S> From<S> for ClickHouseEngine
30where
31 S: Into<String>,
32{
33 fn from(value: S) -> Self {
34 let engine = value.into();
35 match engine.to_uppercase().as_str() {
36 "MERGETREE" => Self::MergeTree,
37 "AGGREGATINGMERGETREE" => Self::AggregatingMergeTree,
38 "COLLAPSINGMERGETREE" => Self::CollapsingMergeTree,
39 "REPLACINGMERGETREE" => Self::ReplacingMergeTree,
40 "SUMMINGMERGETREE" => Self::SummingMergeTree,
41 "MEMORY" => Self::Memory,
42 "LOG" => Self::Log,
43 "STRIPELOG" => Self::StripeLog,
44 "TINYLOG" => Self::TinyLog,
45 _ => Self::Other(engine),
47 }
48 }
49}
50
51impl std::fmt::Display for ClickHouseEngine {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
55 Self::MergeTree => write!(f, "MergeTree"),
56 Self::AggregatingMergeTree => write!(f, "AggregatingMergeTree"),
57 Self::CollapsingMergeTree => write!(f, "CollapsingMergeTree"),
58 Self::ReplacingMergeTree => write!(f, "ReplacingMergeTree"),
59 Self::SummingMergeTree => write!(f, "SummingMergeTree"),
60 Self::Memory => write!(f, "Memory"),
61 Self::Log => write!(f, "Log"),
62 Self::StripeLog => write!(f, "StripeLog"),
63 Self::TinyLog => write!(f, "TinyLog"),
64 Self::Other(engine) => write!(f, "{engine}"),
65 }
66 }
67}
68
69#[derive(Debug, Default, Clone, PartialEq)]
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89pub struct CreateOptions {
90 pub engine: String,
91 pub order_by: Vec<String>,
92 pub primary_keys: Vec<String>,
93 pub partition_by: Option<String>,
94 pub sampling: Option<String>,
95 pub settings: Settings,
96 pub ttl: Option<String>,
97 pub schema_conversions: Option<SchemaConversions>,
98 pub defaults: Option<HashMap<String, String>>,
99 pub defaults_for_nullable: bool,
100}
101
102impl CreateOptions {
103 #[must_use]
111 pub fn new(engine: impl Into<String>) -> Self {
112 Self { engine: engine.into(), ..Default::default() }
113 }
114
115 #[must_use]
123 pub fn from_engine(engine: impl Into<ClickHouseEngine>) -> Self {
124 Self { engine: engine.into().to_string(), ..Default::default() }
125 }
126
127 #[must_use]
137 pub fn with_order_by(mut self, order_by: &[String]) -> Self {
138 self.order_by =
139 order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
140 self
141 }
142
143 #[must_use]
153 pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
154 self.primary_keys =
155 keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
156 self
157 }
158
159 #[must_use]
169 pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
170 let partition_by = partition_by.into();
171 if !partition_by.is_empty() {
172 self.partition_by = Some(partition_by);
173 }
174 self
175 }
176
177 #[must_use]
187 pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
188 let sampling = sampling.into();
189 if !sampling.is_empty() {
190 self.sampling = Some(sampling);
191 }
192 self
193 }
194
195 #[must_use]
203 pub fn with_settings(mut self, settings: Settings) -> Self {
204 self.settings = settings;
205 self
206 }
207
208 #[must_use]
218 pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
219 let ttl = ttl.into();
220 if !ttl.is_empty() {
221 self.ttl = Some(ttl);
222 }
223 self
224 }
225
226 #[must_use]
235 pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
236 where
237 SettingValue: From<S>,
238 {
239 self.settings.add_setting(name.into(), setting);
240 self
241 }
242
243 #[must_use]
251 pub fn with_defaults<I>(mut self, defaults: I) -> Self
252 where
253 I: Iterator<Item = (String, String)>,
254 {
255 self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
256 self
257 }
258
259 #[must_use]
264 pub fn with_defaults_for_nullable(mut self) -> Self {
265 self.defaults_for_nullable = true;
266 self
267 }
268
269 #[must_use]
278 pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
279 self.schema_conversions = Some(map);
280 self
281 }
282
283 pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
288
289 pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
294 self.schema_conversions.as_ref()
295 }
296
297 fn build(&self) -> Result<String> {
311 let engine = self.engine.clone();
312 if engine.is_empty() {
313 return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
314 }
315
316 let mut options = vec![format!("ENGINE = {engine}")];
317
318 if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
320 return Ok(options.remove(0));
321 }
322
323 if self.order_by.is_empty() {
325 if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
327 {
328 return Err(Error::DDLMalformed(
329 "Cannot specify primary keys or sampling when order by is empty".into(),
330 ));
331 }
332
333 options.push("ORDER BY tuple()".into());
334 } else {
335 let order_by = self.order_by.clone();
336
337 if !self.primary_keys.is_empty()
339 && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
340 {
341 return Err(Error::DDLMalformed(format!(
342 "Primary keys but be present in order by and the ordering must match: order \
343 by = {order_by:?}, primary keys = {:?}",
344 self.primary_keys
345 )));
346 }
347
348 if let Some(sample) = self.sampling.as_ref()
350 && !order_by.iter().any(|o| sample.contains(o.as_str()))
351 {
352 return Err(Error::DDLMalformed(format!(
353 "Sampling must refer to a primary key: order by = {order_by:?}, sampling={:?}",
354 self.sampling
355 )));
356 }
357
358 options.push(format!("ORDER BY ({})", order_by.join(", ")));
359 }
360
361 if !self.primary_keys.is_empty() {
362 let primary_keys = self.primary_keys.clone();
363 options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
364 }
365
366 if let Some(partition) = self.partition_by.as_ref() {
367 options.push(format!("PARTITION BY {partition}"));
368 }
369
370 if let Some(sample) = self.sampling.as_ref() {
371 options.push(format!("SAMPLE BY {sample}"));
372 }
373
374 if let Some(ttl) = self.ttl.as_ref() {
375 options.push(format!("TTL {ttl}"));
376 }
377
378 if !self.settings.is_empty() {
379 options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
380 }
381
382 Ok(options.join("\n"))
383 }
384}
385
386pub(crate) fn create_db_statement(database: &str) -> Result<String> {
406 if database.is_empty() {
407 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
408 }
409
410 if database.eq_ignore_ascii_case("default") {
411 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
412 }
413
414 Ok(format!("CREATE DATABASE IF NOT EXISTS {database}"))
415}
416
417pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
438 if database.is_empty() {
439 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
440 }
441
442 if database.eq_ignore_ascii_case("default") {
443 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
444 }
445
446 let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
447 ddl.push_str(database);
448 if sync {
449 ddl.push_str(" SYNC");
450 }
451 Ok(ddl)
452}
453
454pub(crate) fn create_table_statement_from_arrow(
489 database: Option<&str>,
490 table: &str,
491 schema: &SchemaRef,
492 options: &CreateOptions,
493 arrow_options: Option<ArrowOptions>,
494) -> Result<String> {
495 if schema.fields().is_empty() {
496 return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
497 }
498 let definition = RecordBatchDefinition {
499 arrow_options,
500 schema: Arc::clone(schema),
501 defaults: options.defaults().cloned(),
502 };
503 create_table_statement(database, table, Some(definition), options)
504}
505
506pub(crate) fn create_table_statement_from_native<T: Row>(
541 database: Option<&str>,
542 table: &str,
543 options: &CreateOptions,
544) -> Result<String> {
545 create_table_statement::<T>(database, table, None, options)
546}
547
548pub(crate) fn create_table_statement<T: ColumnDefine>(
549 database: Option<&str>,
550 table: &str,
551 schema: Option<T>,
552 options: &CreateOptions,
553) -> Result<String> {
554 let column_definitions = schema
555 .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
556 .transpose()?
557 .flatten()
558 .or(T::definitions());
559
560 let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
561 return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
562 };
563
564 let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
565 let table = table.trim_matches('`');
566 let mut sql = String::new();
567 let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
568
569 let total = definitions.len();
570 for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
571 let _ = write!(sql, " {name} {type_}");
572 if let Some(d) = options
573 .defaults
574 .as_ref()
575 .and_then(|d| d.get(&name))
576 .or(default_value.map(|d| d.to_string()).as_ref())
577 {
578 let _ = write!(sql, " DEFAULT");
579 if !d.is_empty() && d != "NULL" {
580 let _ = write!(sql, " {d}");
581 }
582 } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
583 let _ = write!(sql, " DEFAULT");
584 }
585
586 if i < (total - 1) {
587 let _ = writeln!(sql, ",");
588 }
589 }
590
591 let _ = writeln!(sql, "\n)");
592 let _ = write!(sql, "{}", options.build()?);
593
594 Ok(sql)
595}
596
597pub trait ColumnDefine: Sized {
603 type DefaultValue: std::fmt::Display + std::fmt::Debug;
604
605 fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
607
608 fn runtime_definitions(
614 &self,
615 _: Option<&HashMap<String, Type>>,
616 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
617 Ok(Self::definitions())
618 }
619}
620
621impl<T: Row> ColumnDefine for T {
622 type DefaultValue = crate::Value;
623
624 fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
625
626 fn runtime_definitions(
627 &self,
628 conversions: Option<&HashMap<String, Type>>,
629 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
630 let Some(static_definitions) = Self::definitions() else {
631 return Ok(None);
632 };
633
634 if let Some(conversions) = conversions {
635 return Ok(Some(
636 static_definitions
637 .into_iter()
638 .map(|(name, type_, default_value)| {
639 let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
640 (name, resolved_type, default_value)
641 })
642 .collect::<Vec<_>>(),
643 ));
644 }
645
646 Ok(Some(static_definitions))
647 }
648}
649
650pub(crate) struct RecordBatchDefinition {
652 pub(crate) arrow_options: Option<ArrowOptions>,
653 pub(crate) schema: SchemaRef,
654 pub(crate) defaults: Option<HashMap<String, String>>,
655}
656
657impl ColumnDefine for RecordBatchDefinition {
658 type DefaultValue = String;
659
660 fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
661
662 fn runtime_definitions(
663 &self,
664 conversions: Option<&HashMap<String, Type>>,
665 ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
666 let mut fields = Vec::with_capacity(self.schema.fields.len());
667 for field in self.schema.fields() {
668 let type_ =
669 schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
670 error!("Arrow conversion failed for field {field:?}: {error}");
671 })?;
672 let default_val =
673 if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
674 if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
675 } else {
676 None
677 };
678 fields.push((field.name().clone(), type_, default_val));
679 }
680 Ok(Some(fields))
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use std::sync::Arc;
687
688 use arrow::datatypes::{DataType, Field, Schema};
689
690 use super::{ClickHouseEngine, *};
691 use crate::Type;
692
693 #[allow(clippy::needless_pass_by_value)]
694 fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
695 assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
696 }
697
698 #[test]
699 fn test_create_options_new() {
700 let options = CreateOptions::new("MergeTree");
701 assert_eq!(options.engine, "MergeTree");
702 assert!(options.order_by.is_empty());
703 assert!(options.primary_keys.is_empty());
704 assert!(options.partition_by.is_none());
705 assert!(options.sampling.is_none());
706 assert!(options.settings.is_empty());
707 assert!(options.ttl.is_none());
708 assert!(options.defaults.is_none());
709 assert!(!options.defaults_for_nullable);
710 }
711
712 #[test]
713 fn test_create_options_with_order_by() {
714 let options = CreateOptions::new("MergeTree").with_order_by(&[
715 "id".to_string(),
716 String::new(),
717 "name".to_string(),
718 ]);
719 assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
720 }
721
722 #[test]
723 fn test_create_options_with_primary_keys() {
724 let options = CreateOptions::new("MergeTree").with_primary_keys(&[
725 "id".to_string(),
726 String::new(),
727 "name".to_string(),
728 ]);
729 assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
730 }
731
732 #[test]
733 fn test_create_options_with_partition_by() {
734 let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
735 assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
736
737 let options = CreateOptions::new("MergeTree").with_partition_by("");
738 assert_eq!(options.partition_by, None);
739 }
740
741 #[test]
742 fn test_create_options_with_sample_by() {
743 let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
744 assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
745
746 let options = CreateOptions::new("MergeTree").with_sample_by("");
747 assert_eq!(options.sampling, None);
748 }
749
750 #[test]
751 fn test_create_options_with_settings() {
752 let settings = Settings::default().with_setting("index_granularity", 4096);
753 let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
754 assert_eq!(options.settings, settings);
755 }
756
757 #[test]
758 fn test_create_options_with_setting() {
759 let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
760 assert_eq!(options.settings.encode_to_strings(), vec![
761 "index_granularity = 4096".to_string()
762 ]);
763 }
764
765 #[test]
766 fn test_create_options_with_ttl() {
767 let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
768 assert_eq!(options.ttl, Some("1 DAY".to_string()));
769
770 let options = CreateOptions::new("MergeTree").with_ttl("");
771 assert_eq!(options.ttl, None);
772 }
773
774 #[test]
775 fn test_create_options_with_defaults() {
776 let defaults = vec![
777 ("id".to_string(), "0".to_string()),
778 ("name".to_string(), "'unknown'".to_string()),
779 ];
780 let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
781 assert_eq!(
782 options.defaults,
783 Some(HashMap::from([
784 ("id".to_string(), "0".to_string()),
785 ("name".to_string(), "'unknown'".to_string()),
786 ]))
787 );
788 }
789
790 #[test]
791 fn test_create_options_with_defaults_for_nullable() {
792 let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
793 assert!(options.defaults_for_nullable);
794 }
795
796 #[test]
797 fn test_create_options_build_merge_tree() {
798 let options = CreateOptions::new("MergeTree")
799 .with_order_by(&["id".to_string(), "date".to_string()])
800 .with_primary_keys(&["id".to_string()])
801 .with_partition_by("toYYYYMM(date)")
802 .with_sample_by("cityHash64(id)")
803 .with_ttl("1 DAY")
804 .with_setting("index_granularity", 4096);
805 let sql = options.build().unwrap();
806 compare_sql(
807 sql,
808 "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
809 toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
810 4096",
811 );
812 }
813
814 #[test]
815 fn test_create_options_build_log_engine() {
816 let options = CreateOptions::new("TinyLog");
817 let sql = options.build().unwrap();
818 assert_eq!(sql, "ENGINE = TinyLog");
819 }
820
821 #[test]
822 fn test_create_options_build_empty_order_by() {
823 let options = CreateOptions::new("MergeTree");
824 let sql = options.build().unwrap();
825 compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
826 }
827
828 #[test]
829 fn test_create_options_build_invalid_engine() {
830 let options = CreateOptions::new("");
831 let result = options.build();
832 assert!(matches!(result, Err(Error::DDLMalformed(_))));
833 }
834
835 #[test]
836 fn test_create_options_build_invalid_primary_keys() {
837 let options = CreateOptions::new("MergeTree")
838 .with_order_by(&["id".to_string()])
839 .with_primary_keys(&["name".to_string()]);
840 let result = options.build();
841 assert!(matches!(result, Err(Error::DDLMalformed(_))));
842 }
843
844 #[test]
845 fn test_create_options_build_invalid_sampling() {
846 let options = CreateOptions::new("MergeTree")
847 .with_order_by(&["id".to_string()])
848 .with_sample_by("cityHash64(name)");
849 let result = options.build();
850 assert!(matches!(result, Err(Error::DDLMalformed(_))));
851 }
852
853 #[test]
854 fn test_create_db_statement() {
855 let sql = create_db_statement("my_db").unwrap();
856 assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
857
858 let result = create_db_statement("");
859 assert!(matches!(result, Err(Error::DDLMalformed(_))));
860
861 let result = create_db_statement("default");
862 assert!(matches!(result, Err(Error::DDLMalformed(_))));
863 }
864
865 #[test]
866 fn test_drop_db_statement() {
867 let sql = drop_db_statement("my_db", false).unwrap();
868 compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
869
870 let sql = drop_db_statement("my_db", true).unwrap();
871 compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
872
873 let result = drop_db_statement("", false);
874 assert!(matches!(result, Err(Error::DDLMalformed(_))));
875
876 let result = drop_db_statement("default", false);
877 assert!(matches!(result, Err(Error::DDLMalformed(_))));
878 }
879
880 #[test]
881 fn test_create_table_statement() {
882 let schema = Arc::new(Schema::new(vec![
883 Field::new("id", DataType::Int32, false),
884 Field::new("name", DataType::Utf8, true),
885 ]));
886 let options = CreateOptions::new("MergeTree")
887 .with_order_by(&["id".to_string()])
888 .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
889 .with_defaults_for_nullable();
890 let sql =
891 create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
892 compare_sql(
893 sql,
894 "CREATE TABLE IF NOT EXISTS `my_table` (\n id Int32,\n name Nullable(String) \
895 DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
896 );
897 }
898
899 #[test]
900 fn test_create_table_statement_with_database() {
901 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
902 let options = CreateOptions::new("Memory");
903 let sql =
904 create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
905 .unwrap();
906 compare_sql(
907 sql,
908 "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
909 BY tuple()",
910 );
911 }
912
913 #[test]
914 fn test_create_table_statement_empty_schema() {
915 let schema = Arc::new(Schema::empty());
916 let options = CreateOptions::new("MergeTree");
917 let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
918 assert!(matches!(result, Err(Error::DDLMalformed(_))));
919 }
920
921 #[test]
922 fn test_create_table_with_nullable_dictionary() {
923 let schema = Arc::new(Schema::new(vec![
924 Field::new(
925 "status",
926 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
927 true,
928 ),
929 Field::new("id", DataType::Int32, false),
930 ]));
931
932 let enum_i8 = HashMap::from_iter([(
933 "status".to_string(),
934 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
935 )]);
936
937 let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
938 let enum_options = options.clone().with_schema_conversions(enum_i8);
939
940 assert!(
942 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
943 );
944
945 let sql =
947 create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
948 .expect("Should generate valid SQL");
949
950 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
951 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
952 assert!(sql.contains("id Int32"));
953 assert!(sql.contains("ENGINE = MergeTree"));
954 assert!(sql.contains("ORDER BY (id)"));
955 }
956
957 #[test]
958 fn test_create_table_with_enum8() {
959 let schema = Arc::new(Schema::new(vec![
960 Field::new(
961 "status",
962 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
963 false,
964 ),
965 Field::new("id", DataType::Int32, false),
966 ]));
967
968 let enum_i8 = HashMap::from_iter([(
969 "status".to_string(),
970 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
971 )]);
972
973 let options = CreateOptions::new("MergeTree")
974 .with_order_by(&["id".to_string()])
975 .with_schema_conversions(enum_i8);
976
977 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
978 .expect("Should generate valid SQL");
979
980 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
981 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
982 assert!(sql.contains("id Int32"));
983 assert!(sql.contains("ENGINE = MergeTree"));
984 assert!(sql.contains("ORDER BY (id)"));
985 }
986
987 #[test]
988 fn test_create_table_with_enum16() {
989 let schema = Arc::new(Schema::new(vec![
990 Field::new(
991 "category",
992 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
993 false,
994 ),
995 Field::new("value", DataType::Float32, true),
996 ]));
997
998 let enum_i16 = HashMap::from_iter([(
999 "category".to_string(),
1000 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
1001 )]);
1002 let options = CreateOptions::new("MergeTree")
1003 .with_order_by(&["category".to_string()])
1004 .with_schema_conversions(enum_i16);
1005
1006 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1007 .expect("Should generate valid SQL");
1008
1009 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1010 assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
1011 assert!(sql.contains("value Nullable(Float32)"));
1012 assert!(sql.contains("ENGINE = MergeTree"));
1013 assert!(sql.contains("ORDER BY (category)"));
1014 }
1015
1016 #[test]
1017 fn test_create_table_with_invalid_enum_type() {
1018 let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
1019
1020 let enum_i8 = HashMap::from_iter([(
1021 "status".to_string(),
1022 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1023 )]);
1024
1025 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1026
1027 let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
1028
1029 assert!(matches!(
1030 result,
1031 Err(Error::TypeConversion(msg))
1032 if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
1033 ));
1034 }
1035
1036 #[test]
1037 fn test_create_table_with_non_low_cardinality_enum() {
1038 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1039
1040 let enum_i8 = HashMap::from_iter([(
1041 "name".to_string(),
1042 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1043 )]);
1044 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1045
1046 let sql =
1047 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
1048
1049 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1050 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1051 assert!(sql.contains("ENGINE = MergeTree"));
1052 }
1053
1054 #[test]
1056 fn test_create_table_with_nullable_field_non_nullable_enum() {
1057 let schema = Arc::new(Schema::new(vec![
1058 Field::new("name", DataType::Utf8, true),
1059 Field::new("status", DataType::Utf8, false),
1060 ]));
1061
1062 let enum_i8 = HashMap::from_iter([
1063 (
1064 "name".to_string(),
1065 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1066 .into_nullable(),
1067 ),
1068 (
1069 "status".to_string(),
1070 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1071 .into_nullable(),
1072 ),
1073 ]);
1074 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1075 let arrow_options = ArrowOptions::default()
1076 .with_strings_as_strings(true)
1078 .with_use_date32_for_date(true)
1080 .with_strict_schema(false)
1082 .with_disable_strict_schema_ddl(true);
1083
1084 let sql = create_table_statement_from_arrow(
1085 None,
1086 "test_table",
1087 &schema,
1088 &options,
1089 Some(arrow_options),
1090 )
1091 .unwrap();
1092
1093 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1094 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1095 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1096 assert!(sql.contains("ENGINE = MergeTree"));
1097 }
1098
1099 #[test]
1100 fn test_create_table_with_mixed_enum_and_non_enum() {
1101 let schema = Arc::new(Schema::new(vec![
1102 Field::new(
1103 "status",
1104 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1105 true,
1106 ),
1107 Field::new("name", DataType::Utf8, true),
1108 Field::new(
1109 "category",
1110 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1111 false,
1112 ),
1113 ]));
1114
1115 let enums = HashMap::from_iter([
1116 (
1117 "status".to_string(),
1118 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1119 ),
1120 (
1121 "category".to_string(),
1122 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1123 ),
1124 ]);
1125
1126 let options = CreateOptions::new("MergeTree")
1127 .with_order_by(&["category".to_string()])
1128 .with_schema_conversions(enums);
1129
1130 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1131 .expect("Should generate valid SQL");
1132
1133 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1134 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1135 assert!(sql.contains("name Nullable(String)"));
1136 assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1137 assert!(sql.contains("ENGINE = MergeTree"));
1138 assert!(sql.contains("ORDER BY (category)"));
1139 }
1140
1141 #[test]
1142 fn test_engines() {
1143 use super::ClickHouseEngine::*;
1144
1145 let engines = [
1146 MergeTree,
1147 AggregatingMergeTree,
1148 CollapsingMergeTree,
1149 ReplacingMergeTree,
1150 SummingMergeTree,
1151 Memory,
1152 Log,
1153 StripeLog,
1154 TinyLog,
1155 Other("NonExistentEngine".into()),
1156 ];
1157
1158 for engine in engines {
1159 let engine_str = engine.to_string();
1160 let engine_from = ClickHouseEngine::from(engine_str);
1161 assert_eq!(engine, engine_from);
1162 }
1163 }
1164}