1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12#[derive(Debug, Default, Clone, PartialEq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct CreateOptions {
33 pub engine: String,
34 pub order_by: Vec<String>,
35 pub primary_keys: Vec<String>,
36 pub partition_by: Option<String>,
37 pub sampling: Option<String>,
38 pub settings: Settings,
39 pub ttl: Option<String>,
40 pub schema_conversions: Option<SchemaConversions>,
41 pub defaults: Option<HashMap<String, String>>,
42 pub defaults_for_nullable: bool,
43}
44
45impl CreateOptions {
46 #[must_use]
54 pub fn new(engine: impl Into<String>) -> Self {
55 Self { engine: engine.into(), ..Default::default() }
56 }
57
58 #[must_use]
68 pub fn with_order_by(mut self, order_by: &[String]) -> Self {
69 self.order_by =
70 order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
71 self
72 }
73
74 #[must_use]
84 pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
85 self.primary_keys =
86 keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
87 self
88 }
89
90 #[must_use]
100 pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
101 let partition_by = partition_by.into();
102 if !partition_by.is_empty() {
103 self.partition_by = Some(partition_by);
104 }
105 self
106 }
107
108 #[must_use]
118 pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
119 let sampling = sampling.into();
120 if !sampling.is_empty() {
121 self.sampling = Some(sampling);
122 }
123 self
124 }
125
126 #[must_use]
134 pub fn with_settings(mut self, settings: Settings) -> Self {
135 self.settings = settings;
136 self
137 }
138
139 #[must_use]
149 pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
150 let ttl = ttl.into();
151 if !ttl.is_empty() {
152 self.ttl = Some(ttl);
153 }
154 self
155 }
156
157 #[must_use]
166 pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
167 where
168 SettingValue: From<S>,
169 {
170 self.settings.add_setting(name.into(), setting);
171 self
172 }
173
174 #[must_use]
182 pub fn with_defaults<I>(mut self, defaults: I) -> Self
183 where
184 I: Iterator<Item = (String, String)>,
185 {
186 self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
187 self
188 }
189
190 #[must_use]
195 pub fn with_defaults_for_nullable(mut self) -> Self {
196 self.defaults_for_nullable = true;
197 self
198 }
199
200 #[must_use]
209 pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
210 self.schema_conversions = Some(map);
211 self
212 }
213
214 pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
219
220 pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
225 self.schema_conversions.as_ref()
226 }
227
228 fn build(&self) -> Result<String> {
242 let engine = self.engine.to_string();
243 if engine.is_empty() {
244 return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
245 }
246
247 let mut options = vec![format!("ENGINE = {engine}")];
248
249 if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
251 return Ok(options.remove(0));
252 }
253
254 if self.order_by.is_empty() {
256 if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
258 {
259 return Err(Error::DDLMalformed(
260 "Cannot specify primary keys or sampling when order by is empty".into(),
261 ));
262 }
263
264 options.push("ORDER BY tuple()".into());
265 } else {
266 let order_by = self.order_by.clone();
267
268 if !self.primary_keys.is_empty()
270 && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
271 {
272 return Err(Error::DDLMalformed(format!(
273 "Primary keys but be present in order by and the ordering must match: order \
274 by = {order_by:?}, primary keys = {:?}",
275 self.primary_keys
276 )));
277 }
278
279 if let Some(sample) = self.sampling.as_ref() {
281 if !order_by.iter().any(|o| sample.contains(o.as_str())) {
282 return Err(Error::DDLMalformed(format!(
283 "Sampling must refer to a primary key: order by = {order_by:?}, \
284 sampling={:?}",
285 self.sampling
286 )));
287 }
288 }
289
290 options.push(format!("ORDER BY ({})", order_by.join(", ")));
291 }
292
293 if !self.primary_keys.is_empty() {
294 let primary_keys = self.primary_keys.clone();
295 options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
296 }
297
298 if let Some(partition) = self.partition_by.as_ref() {
299 options.push(format!("PARTITION BY {partition}"));
300 }
301
302 if let Some(sample) = self.sampling.as_ref() {
303 options.push(format!("SAMPLE BY {sample}"));
304 }
305
306 if let Some(ttl) = self.ttl.as_ref() {
307 options.push(format!("TTL {ttl}"));
308 }
309
310 if !self.settings.is_empty() {
311 options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
312 }
313
314 Ok(options.join("\n"))
315 }
316}
317
318pub(crate) fn create_db_statement(database: &str) -> Result<String> {
338 if database.is_empty() {
339 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
340 }
341
342 let db = database.to_lowercase();
343 if &db == "default" {
344 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
345 }
346
347 Ok(format!("CREATE DATABASE IF NOT EXISTS {db}"))
348}
349
350pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
371 if database.is_empty() {
372 return Err(Error::DDLMalformed("Database name cannot be empty".into()));
373 }
374
375 let db = database.to_lowercase();
376 if &db == "default" {
377 return Err(Error::DDLMalformed("Cannot create `default` database".into()));
378 }
379
380 let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
381 ddl.push_str(&db);
382 if sync {
383 ddl.push_str(" SYNC");
384 }
385 Ok(ddl)
386}
387
388pub(crate) fn create_table_statement_from_arrow(
423 database: Option<&str>,
424 table: &str,
425 schema: &SchemaRef,
426 options: &CreateOptions,
427 arrow_options: Option<ArrowOptions>,
428) -> Result<String> {
429 if schema.fields().is_empty() {
430 return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
431 }
432 let definition = RecordBatchDefinition {
433 arrow_options,
434 schema: Arc::clone(schema),
435 defaults: options.defaults().cloned(),
436 };
437 create_table_statement(database, table, Some(definition), options)
438}
439
440pub(crate) fn create_table_statement_from_native<T: Row>(
475 database: Option<&str>,
476 table: &str,
477 options: &CreateOptions,
478) -> Result<String> {
479 create_table_statement::<T>(database, table, None, options)
480}
481
482pub(crate) fn create_table_statement<T: ColumnDefine>(
483 database: Option<&str>,
484 table: &str,
485 schema: Option<T>,
486 options: &CreateOptions,
487) -> Result<String> {
488 let column_definitions = schema
489 .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
490 .transpose()?
491 .flatten()
492 .or(T::definitions());
493
494 let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
495 return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
496 };
497
498 let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
499 let table = table.trim_matches('`');
500 let mut sql = String::new();
501 let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
502
503 let total = definitions.len();
504 for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
505 let _ = write!(sql, " {name} {type_}");
506 if let Some(d) = options
507 .defaults
508 .as_ref()
509 .and_then(|d| d.get(&name))
510 .or(default_value.map(|d| d.to_string()).as_ref())
511 {
512 let _ = write!(sql, " DEFAULT");
513 if !d.is_empty() && d != "NULL" {
514 let _ = write!(sql, " {d}");
515 }
516 } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
517 let _ = write!(sql, " DEFAULT");
518 }
519
520 if i < (total - 1) {
521 let _ = writeln!(sql, ",");
522 }
523 }
524
525 let _ = writeln!(sql, "\n)");
526 let _ = write!(sql, "{}", options.build()?);
527
528 Ok(sql)
529}
530
531pub trait ColumnDefine: Sized {
537 type DefaultValue: std::fmt::Display + std::fmt::Debug;
538
539 fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
541
542 fn runtime_definitions(
548 &self,
549 _: Option<&HashMap<String, Type>>,
550 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
551 Ok(Self::definitions())
552 }
553}
554
555impl<T: Row> ColumnDefine for T {
556 type DefaultValue = crate::Value;
557
558 fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
559
560 fn runtime_definitions(
561 &self,
562 conversions: Option<&HashMap<String, Type>>,
563 ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
564 let Some(static_definitions) = Self::definitions() else {
565 return Ok(None);
566 };
567
568 if let Some(conversions) = conversions {
569 return Ok(Some(
570 static_definitions
571 .into_iter()
572 .map(|(name, type_, default_value)| {
573 let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
574 (name, resolved_type, default_value)
575 })
576 .collect::<Vec<_>>(),
577 ));
578 }
579
580 Ok(Some(static_definitions))
581 }
582}
583
584pub(crate) struct RecordBatchDefinition {
586 pub(crate) arrow_options: Option<ArrowOptions>,
587 pub(crate) schema: SchemaRef,
588 pub(crate) defaults: Option<HashMap<String, String>>,
589}
590
591impl ColumnDefine for RecordBatchDefinition {
592 type DefaultValue = String;
593
594 fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
595
596 fn runtime_definitions(
597 &self,
598 conversions: Option<&HashMap<String, Type>>,
599 ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
600 let mut fields = Vec::with_capacity(self.schema.fields.len());
601 for field in self.schema.fields() {
602 let type_ =
603 schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
604 error!("Arrow conversion failed for field {field:?}: {error}");
605 })?;
606 let default_val =
607 if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
608 if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
609 } else {
610 None
611 };
612 fields.push((field.name().to_string(), type_, default_val));
613 }
614 Ok(Some(fields))
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use std::sync::Arc;
621
622 use arrow::datatypes::{DataType, Field, Schema};
623
624 use super::*;
625 use crate::Type;
626
627 #[allow(clippy::needless_pass_by_value)]
628 fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
629 assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
630 }
631
632 #[test]
633 fn test_create_options_new() {
634 let options = CreateOptions::new("MergeTree");
635 assert_eq!(options.engine, "MergeTree");
636 assert!(options.order_by.is_empty());
637 assert!(options.primary_keys.is_empty());
638 assert!(options.partition_by.is_none());
639 assert!(options.sampling.is_none());
640 assert!(options.settings.is_empty());
641 assert!(options.ttl.is_none());
642 assert!(options.defaults.is_none());
643 assert!(!options.defaults_for_nullable);
644 }
645
646 #[test]
647 fn test_create_options_with_order_by() {
648 let options = CreateOptions::new("MergeTree").with_order_by(&[
649 "id".to_string(),
650 String::new(),
651 "name".to_string(),
652 ]);
653 assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
654 }
655
656 #[test]
657 fn test_create_options_with_primary_keys() {
658 let options = CreateOptions::new("MergeTree").with_primary_keys(&[
659 "id".to_string(),
660 String::new(),
661 "name".to_string(),
662 ]);
663 assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
664 }
665
666 #[test]
667 fn test_create_options_with_partition_by() {
668 let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
669 assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
670
671 let options = CreateOptions::new("MergeTree").with_partition_by("");
672 assert_eq!(options.partition_by, None);
673 }
674
675 #[test]
676 fn test_create_options_with_sample_by() {
677 let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
678 assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
679
680 let options = CreateOptions::new("MergeTree").with_sample_by("");
681 assert_eq!(options.sampling, None);
682 }
683
684 #[test]
685 fn test_create_options_with_settings() {
686 let settings = Settings::default().with_setting("index_granularity", 4096);
687 let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
688 assert_eq!(options.settings, settings);
689 }
690
691 #[test]
692 fn test_create_options_with_setting() {
693 let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
694 assert_eq!(options.settings.encode_to_strings(), vec![
695 "index_granularity = 4096".to_string()
696 ]);
697 }
698
699 #[test]
700 fn test_create_options_with_ttl() {
701 let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
702 assert_eq!(options.ttl, Some("1 DAY".to_string()));
703
704 let options = CreateOptions::new("MergeTree").with_ttl("");
705 assert_eq!(options.ttl, None);
706 }
707
708 #[test]
709 fn test_create_options_with_defaults() {
710 let defaults = vec![
711 ("id".to_string(), "0".to_string()),
712 ("name".to_string(), "'unknown'".to_string()),
713 ];
714 let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
715 assert_eq!(
716 options.defaults,
717 Some(HashMap::from([
718 ("id".to_string(), "0".to_string()),
719 ("name".to_string(), "'unknown'".to_string()),
720 ]))
721 );
722 }
723
724 #[test]
725 fn test_create_options_with_defaults_for_nullable() {
726 let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
727 assert!(options.defaults_for_nullable);
728 }
729
730 #[test]
731 fn test_create_options_build_merge_tree() {
732 let options = CreateOptions::new("MergeTree")
733 .with_order_by(&["id".to_string(), "date".to_string()])
734 .with_primary_keys(&["id".to_string()])
735 .with_partition_by("toYYYYMM(date)")
736 .with_sample_by("cityHash64(id)")
737 .with_ttl("1 DAY")
738 .with_setting("index_granularity", 4096);
739 let sql = options.build().unwrap();
740 compare_sql(
741 sql,
742 "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
743 toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
744 4096",
745 );
746 }
747
748 #[test]
749 fn test_create_options_build_log_engine() {
750 let options = CreateOptions::new("TinyLog");
751 let sql = options.build().unwrap();
752 assert_eq!(sql, "ENGINE = TinyLog");
753 }
754
755 #[test]
756 fn test_create_options_build_empty_order_by() {
757 let options = CreateOptions::new("MergeTree");
758 let sql = options.build().unwrap();
759 compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
760 }
761
762 #[test]
763 fn test_create_options_build_invalid_engine() {
764 let options = CreateOptions::new("");
765 let result = options.build();
766 assert!(matches!(result, Err(Error::DDLMalformed(_))));
767 }
768
769 #[test]
770 fn test_create_options_build_invalid_primary_keys() {
771 let options = CreateOptions::new("MergeTree")
772 .with_order_by(&["id".to_string()])
773 .with_primary_keys(&["name".to_string()]);
774 let result = options.build();
775 assert!(matches!(result, Err(Error::DDLMalformed(_))));
776 }
777
778 #[test]
779 fn test_create_options_build_invalid_sampling() {
780 let options = CreateOptions::new("MergeTree")
781 .with_order_by(&["id".to_string()])
782 .with_sample_by("cityHash64(name)");
783 let result = options.build();
784 assert!(matches!(result, Err(Error::DDLMalformed(_))));
785 }
786
787 #[test]
788 fn test_create_db_statement() {
789 let sql = create_db_statement("my_db").unwrap();
790 assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
791
792 let result = create_db_statement("");
793 assert!(matches!(result, Err(Error::DDLMalformed(_))));
794
795 let result = create_db_statement("default");
796 assert!(matches!(result, Err(Error::DDLMalformed(_))));
797 }
798
799 #[test]
800 fn test_drop_db_statement() {
801 let sql = drop_db_statement("my_db", false).unwrap();
802 compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
803
804 let sql = drop_db_statement("my_db", true).unwrap();
805 compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
806
807 let result = drop_db_statement("", false);
808 assert!(matches!(result, Err(Error::DDLMalformed(_))));
809
810 let result = drop_db_statement("default", false);
811 assert!(matches!(result, Err(Error::DDLMalformed(_))));
812 }
813
814 #[test]
815 fn test_create_table_statement() {
816 let schema = Arc::new(Schema::new(vec![
817 Field::new("id", DataType::Int32, false),
818 Field::new("name", DataType::Utf8, true),
819 ]));
820 let options = CreateOptions::new("MergeTree")
821 .with_order_by(&["id".to_string()])
822 .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
823 .with_defaults_for_nullable();
824 let sql =
825 create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
826 compare_sql(
827 sql,
828 "CREATE TABLE IF NOT EXISTS `my_table` (\n id Int32,\n name Nullable(String) \
829 DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
830 );
831 }
832
833 #[test]
834 fn test_create_table_statement_with_database() {
835 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
836 let options = CreateOptions::new("Memory");
837 let sql =
838 create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
839 .unwrap();
840 compare_sql(
841 sql,
842 "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
843 BY tuple()",
844 );
845 }
846
847 #[test]
848 fn test_create_table_statement_empty_schema() {
849 let schema = Arc::new(Schema::empty());
850 let options = CreateOptions::new("MergeTree");
851 let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
852 assert!(matches!(result, Err(Error::DDLMalformed(_))));
853 }
854
855 #[test]
856 fn test_create_table_with_nullable_dictionary() {
857 let schema = Arc::new(Schema::new(vec![
858 Field::new(
859 "status",
860 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
861 true,
862 ),
863 Field::new("id", DataType::Int32, false),
864 ]));
865
866 let enum_i8 = HashMap::from_iter([(
867 "status".to_string(),
868 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
869 )]);
870
871 let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
872 let enum_options = options.clone().with_schema_conversions(enum_i8);
873
874 assert!(
876 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
877 );
878
879 let sql =
881 create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
882 .expect("Should generate valid SQL");
883
884 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
885 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
886 assert!(sql.contains("id Int32"));
887 assert!(sql.contains("ENGINE = MergeTree"));
888 assert!(sql.contains("ORDER BY (id)"));
889 }
890
891 #[test]
892 fn test_create_table_with_enum8() {
893 let schema = Arc::new(Schema::new(vec![
894 Field::new(
895 "status",
896 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
897 false,
898 ),
899 Field::new("id", DataType::Int32, false),
900 ]));
901
902 let enum_i8 = HashMap::from_iter([(
903 "status".to_string(),
904 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
905 )]);
906
907 let options = CreateOptions::new("MergeTree")
908 .with_order_by(&["id".to_string()])
909 .with_schema_conversions(enum_i8);
910
911 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
912 .expect("Should generate valid SQL");
913
914 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
915 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
916 assert!(sql.contains("id Int32"));
917 assert!(sql.contains("ENGINE = MergeTree"));
918 assert!(sql.contains("ORDER BY (id)"));
919 }
920
921 #[test]
922 fn test_create_table_with_enum16() {
923 let schema = Arc::new(Schema::new(vec![
924 Field::new(
925 "category",
926 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
927 false,
928 ),
929 Field::new("value", DataType::Float32, true),
930 ]));
931
932 let enum_i16 = HashMap::from_iter([(
933 "category".to_string(),
934 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
935 )]);
936 let options = CreateOptions::new("MergeTree")
937 .with_order_by(&["category".to_string()])
938 .with_schema_conversions(enum_i16);
939
940 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
941 .expect("Should generate valid SQL");
942
943 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
944 assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
945 assert!(sql.contains("value Nullable(Float32)"));
946 assert!(sql.contains("ENGINE = MergeTree"));
947 assert!(sql.contains("ORDER BY (category)"));
948 }
949
950 #[test]
951 fn test_create_table_with_invalid_enum_type() {
952 let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
953
954 let enum_i8 = HashMap::from_iter([(
955 "status".to_string(),
956 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
957 )]);
958
959 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
960
961 let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
962
963 assert!(matches!(
964 result,
965 Err(Error::TypeConversion(msg))
966 if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
967 ));
968 }
969
970 #[test]
971 fn test_create_table_with_non_low_cardinality_enum() {
972 let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
973
974 let enum_i8 = HashMap::from_iter([(
975 "name".to_string(),
976 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
977 )]);
978 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
979
980 let sql =
981 create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
982
983 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
984 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
985 assert!(sql.contains("ENGINE = MergeTree"));
986 }
987
988 #[test]
990 fn test_create_table_with_nullable_field_non_nullable_enum() {
991 let schema = Arc::new(Schema::new(vec![
992 Field::new("name", DataType::Utf8, true),
993 Field::new("status", DataType::Utf8, false),
994 ]));
995
996 let enum_i8 = HashMap::from_iter([
997 (
998 "name".to_string(),
999 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1000 .into_nullable(),
1001 ),
1002 (
1003 "status".to_string(),
1004 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1005 .into_nullable(),
1006 ),
1007 ]);
1008 let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1009 let arrow_options = ArrowOptions::default()
1010 .with_strings_as_strings(true)
1012 .with_use_date32_for_date(true)
1014 .with_strict_schema(false)
1016 .with_disable_strict_schema_ddl(true);
1017
1018 let sql = create_table_statement_from_arrow(
1019 None,
1020 "test_table",
1021 &schema,
1022 &options,
1023 Some(arrow_options),
1024 )
1025 .unwrap();
1026
1027 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1028 assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1029 assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1030 assert!(sql.contains("ENGINE = MergeTree"));
1031 }
1032
1033 #[test]
1034 fn test_create_table_with_mixed_enum_and_non_enum() {
1035 let schema = Arc::new(Schema::new(vec![
1036 Field::new(
1037 "status",
1038 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1039 true,
1040 ),
1041 Field::new("name", DataType::Utf8, true),
1042 Field::new(
1043 "category",
1044 DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1045 false,
1046 ),
1047 ]));
1048
1049 let enums = HashMap::from_iter([
1050 (
1051 "status".to_string(),
1052 Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1053 ),
1054 (
1055 "category".to_string(),
1056 Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1057 ),
1058 ]);
1059
1060 let options = CreateOptions::new("MergeTree")
1061 .with_order_by(&["category".to_string()])
1062 .with_schema_conversions(enums);
1063
1064 let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1065 .expect("Should generate valid SQL");
1066
1067 assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1068 assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1069 assert!(sql.contains("name Nullable(String)"));
1070 assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1071 assert!(sql.contains("ENGINE = MergeTree"));
1072 assert!(sql.contains("ORDER BY (category)"));
1073 }
1074}