1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum ClickHouseEngine {
17 MergeTree,
18 AggregatingMergeTree,
19 CollapsingMergeTree,
20 ReplacingMergeTree,
21 SummingMergeTree,
22 Memory,
23 Log,
24 StripeLog,
25 TinyLog,
26 Other(String),
27}
28
29impl<S> From<S> for ClickHouseEngine
30where
31 S: Into<String>,
32{
33 fn from(value: S) -> Self {
34 let engine = value.into();
35 match engine.to_uppercase().as_str() {
36 "MERGETREE" => Self::MergeTree,
37 "AGGREGATINGMERGETREE" => Self::AggregatingMergeTree,
38 "COLLAPSINGMERGETREE" => Self::CollapsingMergeTree,
39 "REPLACINGMERGETREE" => Self::ReplacingMergeTree,
40 "SUMMINGMERGETREE" => Self::SummingMergeTree,
41 "MEMORY" => Self::Memory,
42 "LOG" => Self::Log,
43 "STRIPELOG" => Self::StripeLog,
44 "TINYLOG" => Self::TinyLog,
45 _ => Self::Other(engine),
47 }
48 }
49}
50
51impl std::fmt::Display for ClickHouseEngine {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
55 Self::MergeTree => write!(f, "MergeTree"),
56 Self::AggregatingMergeTree => write!(f, "AggregatingMergeTree"),
57 Self::CollapsingMergeTree => write!(f, "CollapsingMergeTree"),
58 Self::ReplacingMergeTree => write!(f, "ReplacingMergeTree"),
59 Self::SummingMergeTree => write!(f, "SummingMergeTree"),
60 Self::Memory => write!(f, "Memory"),
61 Self::Log => write!(f, "Log"),
62 Self::StripeLog => write!(f, "StripeLog"),
63 Self::TinyLog => write!(f, "TinyLog"),
64 Self::Other(engine) => write!(f, "{engine}"),
65 }
66 }
67}
68
69#[derive(Debug, Default, Clone, PartialEq)]
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89pub struct CreateOptions {
90 pub engine: String,
91 pub order_by: Vec<String>,
92 pub primary_keys: Vec<String>,
93 pub partition_by: Option<String>,
94 pub sampling: Option<String>,
95 pub settings: Settings,
96 pub ttl: Option<String>,
97 pub schema_conversions: Option<SchemaConversions>,
98 pub defaults: Option<HashMap<String, String>>,
99 pub defaults_for_nullable: bool,
100}
101
102impl CreateOptions {
103 #[must_use]
111 pub fn new(engine: impl Into<String>) -> Self {
112 Self { engine: engine.into(), ..Default::default() }
113 }
114
115 #[must_use]
123 pub fn from_engine(engine: impl Into<ClickHouseEngine>) -> Self {
124 Self { engine: engine.into().to_string(), ..Default::default() }
125 }
126
127 #[must_use]
137 pub fn with_order_by(mut self, order_by: &[String]) -> Self {
138 self.order_by =
139 order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
140 self
141 }
142
143 #[must_use]
153 pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
154 self.primary_keys =
155 keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
156 self
157 }
158
159 #[must_use]
169 pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
170 let partition_by = partition_by.into();
171 if !partition_by.is_empty() {
172 self.partition_by = Some(partition_by);
173 }
174 self
175 }
176
177 #[must_use]
187 pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
188 let sampling = sampling.into();
189 if !sampling.is_empty() {
190 self.sampling = Some(sampling);
191 }
192 self
193 }
194
195 #[must_use]
203 pub fn with_settings(mut self, settings: Settings) -> Self {
204 self.settings = settings;
205 self
206 }
207
208 #[must_use]
218 pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
219 let ttl = ttl.into();
220 if !ttl.is_empty() {
221 self.ttl = Some(ttl);
222 }
223 self
224 }
225
226 #[must_use]
235 pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
236 where
237 SettingValue: From<S>,
238 {
239 self.settings.add_setting(name.into(), setting);
240 self
241 }
242
243 #[must_use]
251 pub fn with_defaults<I>(mut self, defaults: I) -> Self
252 where
253 I: Iterator<Item = (String, String)>,
254 {
255 self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
256 self
257 }
258
259 #[must_use]
264 pub fn with_defaults_for_nullable(mut self) -> Self {
265 self.defaults_for_nullable = true;
266 self
267 }
268
269 #[must_use]
278 pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
279 self.schema_conversions = Some(map);
280 self
281 }
282
283 pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
288
289 pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
294 self.schema_conversions.as_ref()
295 }
296
297 fn build(&self) -> Result<String> {
311 let engine = self.engine.to_string();
312 if engine.is_empty() {
313 return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
314 }
315
316 let mut options = vec![format!("ENGINE = {engine}")];
317
318 if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
320 return Ok(options.remove(0));
321 }
322
323 if self.order_by.is_empty() {
325 if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
327 {
328 return Err(Error::DDLMalformed(
329 "Cannot specify primary keys or sampling when order by is empty".into(),
330 ));
331 }
332
333 options.push("ORDER BY tuple()".into());
334 } else {
335 let order_by = self.order_by.clone();
336
337 if !self.primary_keys.is_empty()
339 && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
340 {
341 return Err(Error::DDLMalformed(format!(
342 "Primary keys but be present in order by and the ordering must match: order \
343 by = {order_by:?}, primary keys = {:?}",
344 self.primary_keys
345 )));
346 }
347
348 if let Some(sample) = self.sampling.as_ref() {
350 if !order_by.iter().any(|o| sample.contains(o.as_str())) {
351 return Err(Error::DDLMalformed(format!(
352 "Sampling must refer to a primary key: order by = {order_by:?}, \
353 sampling={:?}",
354 self.sampling
355 )));
356 }
357 }
358
359 options.push(format!("ORDER BY ({})", order_by.join(", ")));
360 }
361
362 if !self.primary_keys.is_empty() {
363 let primary_keys = self.primary_keys.clone();
364 options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
365 }
366
367 if let Some(partition) = self.partition_by.as_ref() {
368 options.push(format!("PARTITION BY {partition}"));
369 }
370
371 if let Some(sample) = self.sampling.as_ref() {
372 options.push(format!("SAMPLE BY {sample}"));
373 }
374
375 if let Some(ttl) = self.ttl.as_ref() {
376 options.push(format!("TTL {ttl}"));
377 }
378
379 if !self.settings.is_empty() {
380 options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
381 }
382
383 Ok(options.join("\n"))
384 }
385}
386
387pub(crate) fn create_db_statement(database: &str) -> Result<String> {
407 if database.is_empty() {
408 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
409 }
410
411 let db = database.to_lowercase();
412 if &db == "default" {
413 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
414 }
415
416 Ok(format!("CREATE DATABASE IF NOT EXISTS {db}"))
417}
418
419pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
440 if database.is_empty() {
441 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
442 }
443
444 let db = database.to_lowercase();
445 if &db == "default" {
446 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
447 }
448
449 let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
450 ddl.push_str(&db);
451 if sync {
452 ddl.push_str(" SYNC");
453 }
454 Ok(ddl)
455}
456
457pub(crate) fn create_table_statement_from_arrow(
492 database: Option<&str>,
493 table: &str,
494 schema: &SchemaRef,
495 options: &CreateOptions,
496 arrow_options: Option<ArrowOptions>,
497) -> Result<String> {
498 if schema.fields().is_empty() {
499 return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
500 }
501 let definition = RecordBatchDefinition {
502 arrow_options,
503 schema: Arc::clone(schema),
504 defaults: options.defaults().cloned(),
505 };
506 create_table_statement(database, table, Some(definition), options)
507}
508
509pub(crate) fn create_table_statement_from_native<T: Row>(
544 database: Option<&str>,
545 table: &str,
546 options: &CreateOptions,
547) -> Result<String> {
548 create_table_statement::<T>(database, table, None, options)
549}
550
551pub(crate) fn create_table_statement<T: ColumnDefine>(
552 database: Option<&str>,
553 table: &str,
554 schema: Option<T>,
555 options: &CreateOptions,
556) -> Result<String> {
557 let column_definitions = schema
558 .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
559 .transpose()?
560 .flatten()
561 .or(T::definitions());
562
563 let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
564 return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
565 };
566
567 let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
568 let table = table.trim_matches('`');
569 let mut sql = String::new();
570 let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
571
572 let total = definitions.len();
573 for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
574 let _ = write!(sql, " {name} {type_}");
575 if let Some(d) = options
576 .defaults
577 .as_ref()
578 .and_then(|d| d.get(&name))
579 .or(default_value.map(|d| d.to_string()).as_ref())
580 {
581 let _ = write!(sql, " DEFAULT");
582 if !d.is_empty() && d != "NULL" {
583 let _ = write!(sql, " {d}");
584 }
585 } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
586 let _ = write!(sql, " DEFAULT");
587 }
588
589 if i < (total - 1) {
590 let _ = writeln!(sql, ",");
591 }
592 }
593
594 let _ = writeln!(sql, "\n)");
595 let _ = write!(sql, "{}", options.build()?);
596
597 Ok(sql)
598}
599
600pub trait ColumnDefine: Sized {
606 type DefaultValue: std::fmt::Display + std::fmt::Debug;
607
608 fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
610
611 fn runtime_definitions(
617 &self,
618 _: Option<&HashMap<String, Type>>,
619 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
620 Ok(Self::definitions())
621 }
622}
623
624impl<T: Row> ColumnDefine for T {
625 type DefaultValue = crate::Value;
626
627 fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
628
629 fn runtime_definitions(
630 &self,
631 conversions: Option<&HashMap<String, Type>>,
632 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
633 let Some(static_definitions) = Self::definitions() else {
634 return Ok(None);
635 };
636
637 if let Some(conversions) = conversions {
638 return Ok(Some(
639 static_definitions
640 .into_iter()
641 .map(|(name, type_, default_value)| {
642 let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
643 (name, resolved_type, default_value)
644 })
645 .collect::<Vec<_>>(),
646 ));
647 }
648
649 Ok(Some(static_definitions))
650 }
651}
652
653pub(crate) struct RecordBatchDefinition {
655 pub(crate) arrow_options: Option<ArrowOptions>,
656 pub(crate) schema: SchemaRef,
657 pub(crate) defaults: Option<HashMap<String, String>>,
658}
659
660impl ColumnDefine for RecordBatchDefinition {
661 type DefaultValue = String;
662
663 fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
664
665 fn runtime_definitions(
666 &self,
667 conversions: Option<&HashMap<String, Type>>,
668 ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
669 let mut fields = Vec::with_capacity(self.schema.fields.len());
670 for field in self.schema.fields() {
671 let type_ =
672 schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
673 error!("Arrow conversion failed for field {field:?}: {error}");
674 })?;
675 let default_val =
676 if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
677 if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
678 } else {
679 None
680 };
681 fields.push((field.name().to_string(), type_, default_val));
682 }
683 Ok(Some(fields))
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use std::sync::Arc;
690
691 use arrow::datatypes::{DataType, Field, Schema};
692
693 use super::{ClickHouseEngine, *};
694 use crate::Type;
695
696 #[allow(clippy::needless_pass_by_value)]
697 fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
698 assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
699 }
700
701 #[test]
702 fn test_create_options_new() {
703 let options = CreateOptions::new("MergeTree");
704 assert_eq!(options.engine, "MergeTree");
705 assert!(options.order_by.is_empty());
706 assert!(options.primary_keys.is_empty());
707 assert!(options.partition_by.is_none());
708 assert!(options.sampling.is_none());
709 assert!(options.settings.is_empty());
710 assert!(options.ttl.is_none());
711 assert!(options.defaults.is_none());
712 assert!(!options.defaults_for_nullable);
713 }
714
715 #[test]
716 fn test_create_options_with_order_by() {
717 let options = CreateOptions::new("MergeTree").with_order_by(&[
718 "id".to_string(),
719 String::new(),
720 "name".to_string(),
721 ]);
722 assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
723 }
724
725 #[test]
726 fn test_create_options_with_primary_keys() {
727 let options = CreateOptions::new("MergeTree").with_primary_keys(&[
728 "id".to_string(),
729 String::new(),
730 "name".to_string(),
731 ]);
732 assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
733 }
734
735 #[test]
736 fn test_create_options_with_partition_by() {
737 let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
738 assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
739
740 let options = CreateOptions::new("MergeTree").with_partition_by("");
741 assert_eq!(options.partition_by, None);
742 }
743
744 #[test]
745 fn test_create_options_with_sample_by() {
746 let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
747 assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
748
749 let options = CreateOptions::new("MergeTree").with_sample_by("");
750 assert_eq!(options.sampling, None);
751 }
752
753 #[test]
754 fn test_create_options_with_settings() {
755 let settings = Settings::default().with_setting("index_granularity", 4096);
756 let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
757 assert_eq!(options.settings, settings);
758 }
759
760 #[test]
761 fn test_create_options_with_setting() {
762 let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
763 assert_eq!(options.settings.encode_to_strings(), vec![
764 "index_granularity = 4096".to_string()
765 ]);
766 }
767
768 #[test]
769 fn test_create_options_with_ttl() {
770 let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
771 assert_eq!(options.ttl, Some("1 DAY".to_string()));
772
773 let options = CreateOptions::new("MergeTree").with_ttl("");
774 assert_eq!(options.ttl, None);
775 }
776
777 #[test]
778 fn test_create_options_with_defaults() {
779 let defaults = vec![
780 ("id".to_string(), "0".to_string()),
781 ("name".to_string(), "'unknown'".to_string()),
782 ];
783 let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
784 assert_eq!(
785 options.defaults,
786 Some(HashMap::from([
787 ("id".to_string(), "0".to_string()),
788 ("name".to_string(), "'unknown'".to_string()),
789 ]))
790 );
791 }
792
793 #[test]
794 fn test_create_options_with_defaults_for_nullable() {
795 let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
796 assert!(options.defaults_for_nullable);
797 }
798
799 #[test]
800 fn test_create_options_build_merge_tree() {
801 let options = CreateOptions::new("MergeTree")
802 .with_order_by(&["id".to_string(), "date".to_string()])
803 .with_primary_keys(&["id".to_string()])
804 .with_partition_by("toYYYYMM(date)")
805 .with_sample_by("cityHash64(id)")
806 .with_ttl("1 DAY")
807 .with_setting("index_granularity", 4096);
808 let sql = options.build().unwrap();
809 compare_sql(
810 sql,
811 "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
812 toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
813 4096",
814 );
815 }
816
817 #[test]
818 fn test_create_options_build_log_engine() {
819 let options = CreateOptions::new("TinyLog");
820 let sql = options.build().unwrap();
821 assert_eq!(sql, "ENGINE = TinyLog");
822 }
823
824 #[test]
825 fn test_create_options_build_empty_order_by() {
826 let options = CreateOptions::new("MergeTree");
827 let sql = options.build().unwrap();
828 compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
829 }
830
831 #[test]
832 fn test_create_options_build_invalid_engine() {
833 let options = CreateOptions::new("");
834 let result = options.build();
835 assert!(matches!(result, Err(Error::DDLMalformed(_))));
836 }
837
838 #[test]
839 fn test_create_options_build_invalid_primary_keys() {
840 let options = CreateOptions::new("MergeTree")
841 .with_order_by(&["id".to_string()])
842 .with_primary_keys(&["name".to_string()]);
843 let result = options.build();
844 assert!(matches!(result, Err(Error::DDLMalformed(_))));
845 }
846
847 #[test]
848 fn test_create_options_build_invalid_sampling() {
849 let options = CreateOptions::new("MergeTree")
850 .with_order_by(&["id".to_string()])
851 .with_sample_by("cityHash64(name)");
852 let result = options.build();
853 assert!(matches!(result, Err(Error::DDLMalformed(_))));
854 }
855
856 #[test]
857 fn test_create_db_statement() {
858 let sql = create_db_statement("my_db").unwrap();
859 assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
860
861 let result = create_db_statement("");
862 assert!(matches!(result, Err(Error::DDLMalformed(_))));
863
864 let result = create_db_statement("default");
865 assert!(matches!(result, Err(Error::DDLMalformed(_))));
866 }
867
868 #[test]
869 fn test_drop_db_statement() {
870 let sql = drop_db_statement("my_db", false).unwrap();
871 compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
872
873 let sql = drop_db_statement("my_db", true).unwrap();
874 compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
875
876 let result = drop_db_statement("", false);
877 assert!(matches!(result, Err(Error::DDLMalformed(_))));
878
879 let result = drop_db_statement("default", false);
880 assert!(matches!(result, Err(Error::DDLMalformed(_))));
881 }
882
883 #[test]
884 fn test_create_table_statement() {
885 let schema = Arc::new(Schema::new(vec![
886 Field::new("id", DataType::Int32, false),
887 Field::new("name", DataType::Utf8, true),
888 ]));
889 let options = CreateOptions::new("MergeTree")
890 .with_order_by(&["id".to_string()])
891 .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
892 .with_defaults_for_nullable();
893 let sql =
894 create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
895 compare_sql(
896 sql,
897 "CREATE TABLE IF NOT EXISTS `my_table` (\n id Int32,\n name Nullable(String) \
898 DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
899 );
900 }
901
902 #[test]
903 fn test_create_table_statement_with_database() {
904 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
905 let options = CreateOptions::new("Memory");
906 let sql =
907 create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
908 .unwrap();
909 compare_sql(
910 sql,
911 "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
912 BY tuple()",
913 );
914 }
915
916 #[test]
917 fn test_create_table_statement_empty_schema() {
918 let schema = Arc::new(Schema::empty());
919 let options = CreateOptions::new("MergeTree");
920 let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
921 assert!(matches!(result, Err(Error::DDLMalformed(_))));
922 }
923
924 #[test]
925 fn test_create_table_with_nullable_dictionary() {
926 let schema = Arc::new(Schema::new(vec![
927 Field::new(
928 "status",
929 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
930 true,
931 ),
932 Field::new("id", DataType::Int32, false),
933 ]));
934
935 let enum_i8 = HashMap::from_iter([(
936 "status".to_string(),
937 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
938 )]);
939
940 let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
941 let enum_options = options.clone().with_schema_conversions(enum_i8);
942
943 assert!(
945 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
946 );
947
948 let sql =
950 create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
951 .expect("Should generate valid SQL");
952
953 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
954 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
955 assert!(sql.contains("id Int32"));
956 assert!(sql.contains("ENGINE = MergeTree"));
957 assert!(sql.contains("ORDER BY (id)"));
958 }
959
960 #[test]
961 fn test_create_table_with_enum8() {
962 let schema = Arc::new(Schema::new(vec![
963 Field::new(
964 "status",
965 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
966 false,
967 ),
968 Field::new("id", DataType::Int32, false),
969 ]));
970
971 let enum_i8 = HashMap::from_iter([(
972 "status".to_string(),
973 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
974 )]);
975
976 let options = CreateOptions::new("MergeTree")
977 .with_order_by(&["id".to_string()])
978 .with_schema_conversions(enum_i8);
979
980 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
981 .expect("Should generate valid SQL");
982
983 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
984 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
985 assert!(sql.contains("id Int32"));
986 assert!(sql.contains("ENGINE = MergeTree"));
987 assert!(sql.contains("ORDER BY (id)"));
988 }
989
990 #[test]
991 fn test_create_table_with_enum16() {
992 let schema = Arc::new(Schema::new(vec![
993 Field::new(
994 "category",
995 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
996 false,
997 ),
998 Field::new("value", DataType::Float32, true),
999 ]));
1000
1001 let enum_i16 = HashMap::from_iter([(
1002 "category".to_string(),
1003 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
1004 )]);
1005 let options = CreateOptions::new("MergeTree")
1006 .with_order_by(&["category".to_string()])
1007 .with_schema_conversions(enum_i16);
1008
1009 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1010 .expect("Should generate valid SQL");
1011
1012 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1013 assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
1014 assert!(sql.contains("value Nullable(Float32)"));
1015 assert!(sql.contains("ENGINE = MergeTree"));
1016 assert!(sql.contains("ORDER BY (category)"));
1017 }
1018
1019 #[test]
1020 fn test_create_table_with_invalid_enum_type() {
1021 let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
1022
1023 let enum_i8 = HashMap::from_iter([(
1024 "status".to_string(),
1025 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1026 )]);
1027
1028 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1029
1030 let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
1031
1032 assert!(matches!(
1033 result,
1034 Err(Error::TypeConversion(msg))
1035 if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
1036 ));
1037 }
1038
1039 #[test]
1040 fn test_create_table_with_non_low_cardinality_enum() {
1041 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1042
1043 let enum_i8 = HashMap::from_iter([(
1044 "name".to_string(),
1045 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1046 )]);
1047 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1048
1049 let sql =
1050 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
1051
1052 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1053 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1054 assert!(sql.contains("ENGINE = MergeTree"));
1055 }
1056
1057 #[test]
1059 fn test_create_table_with_nullable_field_non_nullable_enum() {
1060 let schema = Arc::new(Schema::new(vec![
1061 Field::new("name", DataType::Utf8, true),
1062 Field::new("status", DataType::Utf8, false),
1063 ]));
1064
1065 let enum_i8 = HashMap::from_iter([
1066 (
1067 "name".to_string(),
1068 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1069 .into_nullable(),
1070 ),
1071 (
1072 "status".to_string(),
1073 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1074 .into_nullable(),
1075 ),
1076 ]);
1077 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1078 let arrow_options = ArrowOptions::default()
1079 .with_strings_as_strings(true)
1081 .with_use_date32_for_date(true)
1083 .with_strict_schema(false)
1085 .with_disable_strict_schema_ddl(true);
1086
1087 let sql = create_table_statement_from_arrow(
1088 None,
1089 "test_table",
1090 &schema,
1091 &options,
1092 Some(arrow_options),
1093 )
1094 .unwrap();
1095
1096 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1097 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1098 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1099 assert!(sql.contains("ENGINE = MergeTree"));
1100 }
1101
1102 #[test]
1103 fn test_create_table_with_mixed_enum_and_non_enum() {
1104 let schema = Arc::new(Schema::new(vec![
1105 Field::new(
1106 "status",
1107 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1108 true,
1109 ),
1110 Field::new("name", DataType::Utf8, true),
1111 Field::new(
1112 "category",
1113 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1114 false,
1115 ),
1116 ]));
1117
1118 let enums = HashMap::from_iter([
1119 (
1120 "status".to_string(),
1121 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1122 ),
1123 (
1124 "category".to_string(),
1125 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1126 ),
1127 ]);
1128
1129 let options = CreateOptions::new("MergeTree")
1130 .with_order_by(&["category".to_string()])
1131 .with_schema_conversions(enums);
1132
1133 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1134 .expect("Should generate valid SQL");
1135
1136 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1137 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1138 assert!(sql.contains("name Nullable(String)"));
1139 assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1140 assert!(sql.contains("ENGINE = MergeTree"));
1141 assert!(sql.contains("ORDER BY (category)"));
1142 }
1143
1144 #[test]
1145 fn test_engines() {
1146 use super::ClickHouseEngine::*;
1147
1148 let engines = [
1149 MergeTree,
1150 AggregatingMergeTree,
1151 CollapsingMergeTree,
1152 ReplacingMergeTree,
1153 SummingMergeTree,
1154 Memory,
1155 Log,
1156 StripeLog,
1157 TinyLog,
1158 Other("NonExistentEngine".into()),
1159 ];
1160
1161 for engine in engines {
1162 let engine_str = engine.to_string();
1163 let engine_from = ClickHouseEngine::from(engine_str);
1164 assert_eq!(engine, engine_from);
1165 }
1166 }
1167}