clickhouse_arrow/
schema.rs

1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12/// Options for creating a `ClickHouse` table, specifying engine, ordering, partitioning, and other
13/// settings.
14///
15/// This struct is used to configure the creation of a `ClickHouse` table via
16/// [`create_table_statement_from_arrow`]. It supports common table options like `ORDER BY`,
17/// `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`, `TTL`, and custom settings. It also allows
18/// specifying default values for columns and enabling defaults for nullable columns.
19///
20/// # Examples
21/// ```rust,ignore
22/// use clickhouse_arrow::sql::CreateOptions;
23/// use clickhouse_arrow::Settings;
24///
25/// let options = CreateOptions::new("MergeTree")
26///     .with_order_by(&["id".to_string()])
27///     .with_setting("index_granularity", 4096)
28///     .with_ttl("1 DAY");
29/// ```
30#[derive(Debug, Default, Clone, PartialEq)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32pub struct CreateOptions {
33    pub engine:                String,
34    pub order_by:              Vec<String>,
35    pub primary_keys:          Vec<String>,
36    pub partition_by:          Option<String>,
37    pub sampling:              Option<String>,
38    pub settings:              Settings,
39    pub ttl:                   Option<String>,
40    pub schema_conversions:    Option<SchemaConversions>,
41    pub defaults:              Option<HashMap<String, String>>,
42    pub defaults_for_nullable: bool,
43}
44
45impl CreateOptions {
46    /// Creates a new `CreateOptions` with the specified engine.
47    ///
48    /// # Arguments
49    /// - `engine`: The `ClickHouse` table engine (e.g., `MergeTree`, `Memory`).
50    ///
51    /// # Returns
52    /// A new `CreateOptions` instance with the specified engine.
53    #[must_use]
54    pub fn new(engine: impl Into<String>) -> Self {
55        Self { engine: engine.into(), ..Default::default() }
56    }
57
58    /// Sets the `ORDER BY` clause for the table.
59    ///
60    /// Filters out empty strings from the provided list.
61    ///
62    /// # Arguments
63    /// - `order_by`: A slice of column names to order by.
64    ///
65    /// # Returns
66    /// Self for method chaining.
67    #[must_use]
68    pub fn with_order_by(mut self, order_by: &[String]) -> Self {
69        self.order_by =
70            order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
71        self
72    }
73
74    /// Sets the `PRIMARY KEY` clause for the table.
75    ///
76    /// Filters out empty strings from the provided list.
77    ///
78    /// # Arguments
79    /// - `keys`: A slice of column names to use as primary keys.
80    ///
81    /// # Returns
82    /// Self for method chaining.
83    #[must_use]
84    pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
85        self.primary_keys =
86            keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
87        self
88    }
89
90    /// Sets the `PARTITION BY` clause for the table.
91    ///
92    /// Ignores empty strings.
93    ///
94    /// # Arguments
95    /// - `partition_by`: The partitioning expression.
96    ///
97    /// # Returns
98    /// Self for method chaining.
99    #[must_use]
100    pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
101        let partition_by = partition_by.into();
102        if !partition_by.is_empty() {
103            self.partition_by = Some(partition_by);
104        }
105        self
106    }
107
108    /// Sets the `SAMPLE BY` clause for the table.
109    ///
110    /// Ignores empty strings.
111    ///
112    /// # Arguments
113    /// - `sampling`: The sampling expression.
114    ///
115    /// # Returns
116    /// Self for method chaining.
117    #[must_use]
118    pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
119        let sampling = sampling.into();
120        if !sampling.is_empty() {
121            self.sampling = Some(sampling);
122        }
123        self
124    }
125
126    /// Sets the table settings.
127    ///
128    /// # Arguments
129    /// - `settings`: The `Settings` object containing key-value pairs.
130    ///
131    /// # Returns
132    /// Self for method chaining.
133    #[must_use]
134    pub fn with_settings(mut self, settings: Settings) -> Self {
135        self.settings = settings;
136        self
137    }
138
139    /// Sets the `TTL` clause for the table.
140    ///
141    /// Ignores empty strings.
142    ///
143    /// # Arguments
144    /// - `ttl`: The TTL expression (e.g., `1 DAY`).
145    ///
146    /// # Returns
147    /// Self for method chaining.
148    #[must_use]
149    pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
150        let ttl = ttl.into();
151        if !ttl.is_empty() {
152            self.ttl = Some(ttl);
153        }
154        self
155    }
156
157    /// Adds a single setting to the table.
158    ///
159    /// # Arguments
160    /// - `name`: The setting name (e.g., `index_granularity`).
161    /// - `setting`: The setting value (e.g., `4096`).
162    ///
163    /// # Returns
164    /// Self for method chaining.
165    #[must_use]
166    pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
167    where
168        SettingValue: From<S>,
169    {
170        self.settings.add_setting(name.into(), setting);
171        self
172    }
173
174    /// Sets default values for columns.
175    ///
176    /// # Arguments
177    /// - `defaults`: An iterator of (column name, default value) pairs.
178    ///
179    /// # Returns
180    /// Self for method chaining.
181    #[must_use]
182    pub fn with_defaults<I>(mut self, defaults: I) -> Self
183    where
184        I: Iterator<Item = (String, String)>,
185    {
186        self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
187        self
188    }
189
190    /// Enables default values for nullable columns (e.g., `NULL`).
191    ///
192    /// # Returns
193    /// Self for method chaining.
194    #[must_use]
195    pub fn with_defaults_for_nullable(mut self) -> Self {
196        self.defaults_for_nullable = true;
197        self
198    }
199
200    /// Provide a map of resolved type conversions.
201    ///
202    /// For example, since arrow does not support enum types, providing a map of column name to
203    /// `Type::Enum8` with enum values ensures [`arrow::datatypes::DataType::Dictionary`] is
204    /// serialized as [`crate::Type::Enum16`] instead of the default [`crate::Type::LowCardinality`]
205    ///
206    /// # Returns
207    /// Self for method chaining.
208    #[must_use]
209    pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
210        self.schema_conversions = Some(map);
211        self
212    }
213
214    /// Returns the configured default values, if any.
215    ///
216    /// # Returns
217    /// An optional reference to the `HashMap` of column names to default values.
218    pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
219
220    /// Returns the configured default values, if any.
221    ///
222    /// # Returns
223    /// An optional reference to the `HashMap` of column names to default values.
224    pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
225        self.schema_conversions.as_ref()
226    }
227
228    /// Builds the table options part of a `ClickHouse` `CREATE TABLE` statement.
229    ///
230    /// Constructs the SQL for engine, `ORDER BY`, `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`,
231    /// `TTL`, and `SETTINGS` clauses. Validates constraints, such as ensuring primary keys are
232    /// a subset of `ORDER BY` columns and sampling references a primary key.
233    ///
234    /// # Returns
235    /// A `Result` containing the SQL string for the table options or a `Error` if
236    /// validation fails (e.g., empty engine, invalid primary keys).
237    ///
238    /// # Errors
239    /// - Returns `DDLMalformed` if the engine is empty, primary keys are invalid, or sampling
240    ///   doesn’t reference a primary key.
241    fn build(&self) -> Result<String> {
242        let engine = self.engine.to_string();
243        if engine.is_empty() {
244            return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
245        }
246
247        let mut options = vec![format!("ENGINE = {engine}")];
248
249        // Log engines don't support options
250        if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
251            return Ok(options.remove(0));
252        }
253
254        // Make sure order by is set
255        if self.order_by.is_empty() {
256            // Validations
257            if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
258            {
259                return Err(Error::DDLMalformed(
260                    "Cannot specify primary keys or sampling when order by is empty".into(),
261                ));
262            }
263
264            options.push("ORDER BY tuple()".into());
265        } else {
266            let order_by = self.order_by.clone();
267
268            // Validate primary keys
269            if !self.primary_keys.is_empty()
270                && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
271            {
272                return Err(Error::DDLMalformed(format!(
273                    "Primary keys but be present in order by and the ordering must match: order \
274                     by = {order_by:?}, primary keys = {:?}",
275                    self.primary_keys
276                )));
277            }
278
279            // Validate sampling
280            if let Some(sample) = self.sampling.as_ref() {
281                if !order_by.iter().any(|o| sample.contains(o.as_str())) {
282                    return Err(Error::DDLMalformed(format!(
283                        "Sampling must refer to a primary key: order by = {order_by:?}, \
284                         sampling={:?}",
285                        self.sampling
286                    )));
287                }
288            }
289
290            options.push(format!("ORDER BY ({})", order_by.join(", ")));
291        }
292
293        if !self.primary_keys.is_empty() {
294            let primary_keys = self.primary_keys.clone();
295            options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
296        }
297
298        if let Some(partition) = self.partition_by.as_ref() {
299            options.push(format!("PARTITION BY {partition}"));
300        }
301
302        if let Some(sample) = self.sampling.as_ref() {
303            options.push(format!("SAMPLE BY {sample}"));
304        }
305
306        if let Some(ttl) = self.ttl.as_ref() {
307            options.push(format!("TTL {ttl}"));
308        }
309
310        if !self.settings.is_empty() {
311            options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
312        }
313
314        Ok(options.join("\n"))
315    }
316}
317
318/// Generates a `ClickHouse` `CREATE DATABASE` statement.
319///
320/// # Arguments
321/// - `database`: The name of the database to create.
322///
323/// # Returns
324/// A `Result` containing the SQL statement or a `Error` if the database name is
325/// invalid.
326///
327/// # Errors
328/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
329///
330/// # Example
331/// ```rust,ignore
332/// use clickhouse_arrow::sql::create_db_statement;
333///
334/// let sql = create_db_statement("my_db").unwrap();
335/// assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
336/// ```
337pub(crate) fn create_db_statement(database: &str) -> Result<String> {
338    if database.is_empty() {
339        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
340    }
341
342    let db = database.to_lowercase();
343    if &db == "default" {
344        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
345    }
346
347    Ok(format!("CREATE DATABASE IF NOT EXISTS {db}"))
348}
349
350/// Generates a `ClickHouse` `DROP DATABASE` statement.
351///
352/// # Arguments
353/// - `database`: The name of the database to drop.
354/// - `sync`: If `true`, adds the `SYNC` clause for synchronous dropping.
355///
356/// # Returns
357/// A `Result` containing the SQL statement or a `Error` if the database name is
358/// invalid.
359///
360/// # Errors
361/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
362///
363/// # Example
364/// ```rust,ignore
365/// use clickhouse_arrow::sql::drop_db_statement;
366///
367/// let sql = drop_db_statement("my_db", true).unwrap();
368/// assert_eq!(sql, "DROP DATABASE IF EXISTS my_db SYNC");
369/// ```
370pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
371    if database.is_empty() {
372        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
373    }
374
375    let db = database.to_lowercase();
376    if &db == "default" {
377        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
378    }
379
380    let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
381    ddl.push_str(&db);
382    if sync {
383        ddl.push_str(" SYNC");
384    }
385    Ok(ddl)
386}
387
388/// Generates a `ClickHouse` `CREATE TABLE` statement from an Arrow schema and table options.
389///
390/// # Arguments
391/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
392///   default database.
393/// - `table`: The table name.
394/// - `schema`: The Arrow schema defining the table’s columns.
395/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
396///
397/// # Returns
398/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
399/// options fail validation.
400///
401/// # Errors
402/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
403///   engine).
404/// - Returns `ArrowDeserialize` if the Arrow `DataType` cannot be converted to a `ClickHouse` type.
405/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
406///
407/// # Example
408/// ```rust,ignore
409/// use arrow::datatypes::{DataType, Field, Schema};
410/// use clickhouse_arrow::sql::{CreateOptions, create_table_statement_from_arrow};
411/// use std::sync::Arc;
412///
413/// let schema = Arc::new(Schema::new(vec![
414///     Field::new("id", DataType::Int32, false),
415///     Field::new("name", DataType::Utf8, true),
416/// ]));
417/// let options = CreateOptions::new("MergeTree")
418///     .with_order_by(&["id".to_string()]);
419/// let sql = create_table_statement_from_arrow(None, "my_table", &schema, &options).unwrap();
420/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
421/// ```
422pub(crate) fn create_table_statement_from_arrow(
423    database: Option<&str>,
424    table: &str,
425    schema: &SchemaRef,
426    options: &CreateOptions,
427    arrow_options: Option<ArrowOptions>,
428) -> Result<String> {
429    if schema.fields().is_empty() {
430        return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
431    }
432    let definition = RecordBatchDefinition {
433        arrow_options,
434        schema: Arc::clone(schema),
435        defaults: options.defaults().cloned(),
436    };
437    create_table_statement(database, table, Some(definition), options)
438}
439
440/// Generates a `ClickHouse` `CREATE TABLE` statement from a type that implements [`crate::Row`] and
441/// [`CreateOptions`].
442///
443/// # Arguments
444/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
445///   default database.
446/// - `table`: The table name.
447/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
448///
449/// # Returns
450/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
451/// options fail validation.
452///
453/// # Errors
454/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
455///   engine).
456/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
457///
458/// # Example
459/// ```rust,ignore
460/// use clickhouse_arrow::Row;
461/// use clickhouse_arrow::sql::{CreateOptions, create_table_statement_from_native};
462///
463/// #[derive(Row)]
464/// struct MyRow {
465///     id: String,
466///     name: String,
467/// }
468///
469/// let options = CreateOptions::new("MergeTree")
470///     .with_order_by(&["id".to_string()]);
471/// let sql = create_table_statement_from_native::<MyRow>(None, "my_table", &options).unwrap();
472/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
473/// ```
474pub(crate) fn create_table_statement_from_native<T: Row>(
475    database: Option<&str>,
476    table: &str,
477    options: &CreateOptions,
478) -> Result<String> {
479    create_table_statement::<T>(database, table, None, options)
480}
481
482pub(crate) fn create_table_statement<T: ColumnDefine>(
483    database: Option<&str>,
484    table: &str,
485    schema: Option<T>,
486    options: &CreateOptions,
487) -> Result<String> {
488    let column_definitions = schema
489        .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
490        .transpose()?
491        .flatten()
492        .or(T::definitions());
493
494    let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
495        return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
496    };
497
498    let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
499    let table = table.trim_matches('`');
500    let mut sql = String::new();
501    let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
502
503    let total = definitions.len();
504    for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
505        let _ = write!(sql, "  {name} {type_}");
506        if let Some(d) = options
507            .defaults
508            .as_ref()
509            .and_then(|d| d.get(&name))
510            .or(default_value.map(|d| d.to_string()).as_ref())
511        {
512            let _ = write!(sql, " DEFAULT");
513            if !d.is_empty() && d != "NULL" {
514                let _ = write!(sql, " {d}");
515            }
516        } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
517            let _ = write!(sql, " DEFAULT");
518        }
519
520        if i < (total - 1) {
521            let _ = writeln!(sql, ",");
522        }
523    }
524
525    let _ = writeln!(sql, "\n)");
526    let _ = write!(sql, "{}", options.build()?);
527
528    Ok(sql)
529}
530
531/// A type that describe the schema of its fields to be used in a `CREATE TABLE ...` query.
532///
533/// Generally this is not implemented manually, but using `clickhouse_arrow::Row` since it's
534/// implemented on any `T: Row`. But it's helpful to implement manually if additional formats are
535/// created.
536pub trait ColumnDefine: Sized {
537    type DefaultValue: std::fmt::Display + std::fmt::Debug;
538
539    /// Provide the static schema
540    fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
541
542    /// Infers the schema and returns it.
543    ///
544    /// # Errors
545    ///
546    /// Returns an error defined by the implementation
547    fn runtime_definitions(
548        &self,
549        _: Option<&HashMap<String, Type>>,
550    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
551        Ok(Self::definitions())
552    }
553}
554
555impl<T: Row> ColumnDefine for T {
556    type DefaultValue = crate::Value;
557
558    fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
559
560    fn runtime_definitions(
561        &self,
562        conversions: Option<&HashMap<String, Type>>,
563    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
564        let Some(static_definitions) = Self::definitions() else {
565            return Ok(None);
566        };
567
568        if let Some(conversions) = conversions {
569            return Ok(Some(
570                static_definitions
571                    .into_iter()
572                    .map(|(name, type_, default_value)| {
573                        let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
574                        (name, resolved_type, default_value)
575                    })
576                    .collect::<Vec<_>>(),
577            ));
578        }
579
580        Ok(Some(static_definitions))
581    }
582}
583
584/// Helper struct to encapsulate schema creation logic for Arrow schemas.
585pub(crate) struct RecordBatchDefinition {
586    pub(crate) arrow_options: Option<ArrowOptions>,
587    pub(crate) schema:        SchemaRef,
588    pub(crate) defaults:      Option<HashMap<String, String>>,
589}
590
591impl ColumnDefine for RecordBatchDefinition {
592    type DefaultValue = String;
593
594    fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
595
596    fn runtime_definitions(
597        &self,
598        conversions: Option<&HashMap<String, Type>>,
599    ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
600        let mut fields = Vec::with_capacity(self.schema.fields.len());
601        for field in self.schema.fields() {
602            let type_ =
603                schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
604                    error!("Arrow conversion failed for field {field:?}: {error}");
605                })?;
606            let default_val =
607                if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
608                    if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
609                } else {
610                    None
611                };
612            fields.push((field.name().to_string(), type_, default_val));
613        }
614        Ok(Some(fields))
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use std::sync::Arc;
621
622    use arrow::datatypes::{DataType, Field, Schema};
623
624    use super::*;
625    use crate::Type;
626
627    #[allow(clippy::needless_pass_by_value)]
628    fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
629        assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
630    }
631
632    #[test]
633    fn test_create_options_new() {
634        let options = CreateOptions::new("MergeTree");
635        assert_eq!(options.engine, "MergeTree");
636        assert!(options.order_by.is_empty());
637        assert!(options.primary_keys.is_empty());
638        assert!(options.partition_by.is_none());
639        assert!(options.sampling.is_none());
640        assert!(options.settings.is_empty());
641        assert!(options.ttl.is_none());
642        assert!(options.defaults.is_none());
643        assert!(!options.defaults_for_nullable);
644    }
645
646    #[test]
647    fn test_create_options_with_order_by() {
648        let options = CreateOptions::new("MergeTree").with_order_by(&[
649            "id".to_string(),
650            String::new(),
651            "name".to_string(),
652        ]);
653        assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
654    }
655
656    #[test]
657    fn test_create_options_with_primary_keys() {
658        let options = CreateOptions::new("MergeTree").with_primary_keys(&[
659            "id".to_string(),
660            String::new(),
661            "name".to_string(),
662        ]);
663        assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
664    }
665
666    #[test]
667    fn test_create_options_with_partition_by() {
668        let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
669        assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
670
671        let options = CreateOptions::new("MergeTree").with_partition_by("");
672        assert_eq!(options.partition_by, None);
673    }
674
675    #[test]
676    fn test_create_options_with_sample_by() {
677        let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
678        assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
679
680        let options = CreateOptions::new("MergeTree").with_sample_by("");
681        assert_eq!(options.sampling, None);
682    }
683
684    #[test]
685    fn test_create_options_with_settings() {
686        let settings = Settings::default().with_setting("index_granularity", 4096);
687        let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
688        assert_eq!(options.settings, settings);
689    }
690
691    #[test]
692    fn test_create_options_with_setting() {
693        let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
694        assert_eq!(options.settings.encode_to_strings(), vec![
695            "index_granularity = 4096".to_string()
696        ]);
697    }
698
699    #[test]
700    fn test_create_options_with_ttl() {
701        let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
702        assert_eq!(options.ttl, Some("1 DAY".to_string()));
703
704        let options = CreateOptions::new("MergeTree").with_ttl("");
705        assert_eq!(options.ttl, None);
706    }
707
708    #[test]
709    fn test_create_options_with_defaults() {
710        let defaults = vec![
711            ("id".to_string(), "0".to_string()),
712            ("name".to_string(), "'unknown'".to_string()),
713        ];
714        let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
715        assert_eq!(
716            options.defaults,
717            Some(HashMap::from([
718                ("id".to_string(), "0".to_string()),
719                ("name".to_string(), "'unknown'".to_string()),
720            ]))
721        );
722    }
723
724    #[test]
725    fn test_create_options_with_defaults_for_nullable() {
726        let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
727        assert!(options.defaults_for_nullable);
728    }
729
730    #[test]
731    fn test_create_options_build_merge_tree() {
732        let options = CreateOptions::new("MergeTree")
733            .with_order_by(&["id".to_string(), "date".to_string()])
734            .with_primary_keys(&["id".to_string()])
735            .with_partition_by("toYYYYMM(date)")
736            .with_sample_by("cityHash64(id)")
737            .with_ttl("1 DAY")
738            .with_setting("index_granularity", 4096);
739        let sql = options.build().unwrap();
740        compare_sql(
741            sql,
742            "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
743             toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
744             4096",
745        );
746    }
747
748    #[test]
749    fn test_create_options_build_log_engine() {
750        let options = CreateOptions::new("TinyLog");
751        let sql = options.build().unwrap();
752        assert_eq!(sql, "ENGINE = TinyLog");
753    }
754
755    #[test]
756    fn test_create_options_build_empty_order_by() {
757        let options = CreateOptions::new("MergeTree");
758        let sql = options.build().unwrap();
759        compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
760    }
761
762    #[test]
763    fn test_create_options_build_invalid_engine() {
764        let options = CreateOptions::new("");
765        let result = options.build();
766        assert!(matches!(result, Err(Error::DDLMalformed(_))));
767    }
768
769    #[test]
770    fn test_create_options_build_invalid_primary_keys() {
771        let options = CreateOptions::new("MergeTree")
772            .with_order_by(&["id".to_string()])
773            .with_primary_keys(&["name".to_string()]);
774        let result = options.build();
775        assert!(matches!(result, Err(Error::DDLMalformed(_))));
776    }
777
778    #[test]
779    fn test_create_options_build_invalid_sampling() {
780        let options = CreateOptions::new("MergeTree")
781            .with_order_by(&["id".to_string()])
782            .with_sample_by("cityHash64(name)");
783        let result = options.build();
784        assert!(matches!(result, Err(Error::DDLMalformed(_))));
785    }
786
787    #[test]
788    fn test_create_db_statement() {
789        let sql = create_db_statement("my_db").unwrap();
790        assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
791
792        let result = create_db_statement("");
793        assert!(matches!(result, Err(Error::DDLMalformed(_))));
794
795        let result = create_db_statement("default");
796        assert!(matches!(result, Err(Error::DDLMalformed(_))));
797    }
798
799    #[test]
800    fn test_drop_db_statement() {
801        let sql = drop_db_statement("my_db", false).unwrap();
802        compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
803
804        let sql = drop_db_statement("my_db", true).unwrap();
805        compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
806
807        let result = drop_db_statement("", false);
808        assert!(matches!(result, Err(Error::DDLMalformed(_))));
809
810        let result = drop_db_statement("default", false);
811        assert!(matches!(result, Err(Error::DDLMalformed(_))));
812    }
813
814    #[test]
815    fn test_create_table_statement() {
816        let schema = Arc::new(Schema::new(vec![
817            Field::new("id", DataType::Int32, false),
818            Field::new("name", DataType::Utf8, true),
819        ]));
820        let options = CreateOptions::new("MergeTree")
821            .with_order_by(&["id".to_string()])
822            .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
823            .with_defaults_for_nullable();
824        let sql =
825            create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
826        compare_sql(
827            sql,
828            "CREATE TABLE IF NOT EXISTS `my_table` (\n  id Int32,\n  name Nullable(String) \
829             DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
830        );
831    }
832
833    #[test]
834    fn test_create_table_statement_with_database() {
835        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
836        let options = CreateOptions::new("Memory");
837        let sql =
838            create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
839                .unwrap();
840        compare_sql(
841            sql,
842            "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
843             BY tuple()",
844        );
845    }
846
847    #[test]
848    fn test_create_table_statement_empty_schema() {
849        let schema = Arc::new(Schema::empty());
850        let options = CreateOptions::new("MergeTree");
851        let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
852        assert!(matches!(result, Err(Error::DDLMalformed(_))));
853    }
854
855    #[test]
856    fn test_create_table_with_nullable_dictionary() {
857        let schema = Arc::new(Schema::new(vec![
858            Field::new(
859                "status",
860                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
861                true,
862            ),
863            Field::new("id", DataType::Int32, false),
864        ]));
865
866        let enum_i8 = HashMap::from_iter([(
867            "status".to_string(),
868            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
869        )]);
870
871        let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
872        let enum_options = options.clone().with_schema_conversions(enum_i8);
873
874        // If the nullable dictionary will not be converted to enum, this will fail
875        assert!(
876            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
877        );
878
879        // Otherwise it will succeed
880        let sql =
881            create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
882                .expect("Should generate valid SQL");
883
884        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
885        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
886        assert!(sql.contains("id Int32"));
887        assert!(sql.contains("ENGINE = MergeTree"));
888        assert!(sql.contains("ORDER BY (id)"));
889    }
890
891    #[test]
892    fn test_create_table_with_enum8() {
893        let schema = Arc::new(Schema::new(vec![
894            Field::new(
895                "status",
896                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
897                false,
898            ),
899            Field::new("id", DataType::Int32, false),
900        ]));
901
902        let enum_i8 = HashMap::from_iter([(
903            "status".to_string(),
904            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
905        )]);
906
907        let options = CreateOptions::new("MergeTree")
908            .with_order_by(&["id".to_string()])
909            .with_schema_conversions(enum_i8);
910
911        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
912            .expect("Should generate valid SQL");
913
914        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
915        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
916        assert!(sql.contains("id Int32"));
917        assert!(sql.contains("ENGINE = MergeTree"));
918        assert!(sql.contains("ORDER BY (id)"));
919    }
920
921    #[test]
922    fn test_create_table_with_enum16() {
923        let schema = Arc::new(Schema::new(vec![
924            Field::new(
925                "category",
926                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
927                false,
928            ),
929            Field::new("value", DataType::Float32, true),
930        ]));
931
932        let enum_i16 = HashMap::from_iter([(
933            "category".to_string(),
934            Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
935        )]);
936        let options = CreateOptions::new("MergeTree")
937            .with_order_by(&["category".to_string()])
938            .with_schema_conversions(enum_i16);
939
940        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
941            .expect("Should generate valid SQL");
942
943        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
944        assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
945        assert!(sql.contains("value Nullable(Float32)"));
946        assert!(sql.contains("ENGINE = MergeTree"));
947        assert!(sql.contains("ORDER BY (category)"));
948    }
949
950    #[test]
951    fn test_create_table_with_invalid_enum_type() {
952        let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
953
954        let enum_i8 = HashMap::from_iter([(
955            "status".to_string(),
956            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
957        )]);
958
959        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
960
961        let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
962
963        assert!(matches!(
964            result,
965            Err(Error::TypeConversion(msg))
966            if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
967        ));
968    }
969
970    #[test]
971    fn test_create_table_with_non_low_cardinality_enum() {
972        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
973
974        let enum_i8 = HashMap::from_iter([(
975            "name".to_string(),
976            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
977        )]);
978        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
979
980        let sql =
981            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
982
983        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
984        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
985        assert!(sql.contains("ENGINE = MergeTree"));
986    }
987
988    // The arrow data type drives nullability
989    #[test]
990    fn test_create_table_with_nullable_field_non_nullable_enum() {
991        let schema = Arc::new(Schema::new(vec![
992            Field::new("name", DataType::Utf8, true),
993            Field::new("status", DataType::Utf8, false),
994        ]));
995
996        let enum_i8 = HashMap::from_iter([
997            (
998                "name".to_string(),
999                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1000                    .into_nullable(),
1001            ),
1002            (
1003                "status".to_string(),
1004                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1005                    .into_nullable(),
1006            ),
1007        ]);
1008        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1009        let arrow_options = ArrowOptions::default()
1010            // Deserialize strings as Utf8, not Binary
1011            .with_strings_as_strings(true)
1012            // Deserialize Date as Date32
1013            .with_use_date32_for_date(true)
1014            // Ignore fields that ClickHouse doesn't support.
1015            .with_strict_schema(false)
1016            .with_disable_strict_schema_ddl(true);
1017
1018        let sql = create_table_statement_from_arrow(
1019            None,
1020            "test_table",
1021            &schema,
1022            &options,
1023            Some(arrow_options),
1024        )
1025        .unwrap();
1026
1027        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1028        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1029        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1030        assert!(sql.contains("ENGINE = MergeTree"));
1031    }
1032
1033    #[test]
1034    fn test_create_table_with_mixed_enum_and_non_enum() {
1035        let schema = Arc::new(Schema::new(vec![
1036            Field::new(
1037                "status",
1038                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1039                true,
1040            ),
1041            Field::new("name", DataType::Utf8, true),
1042            Field::new(
1043                "category",
1044                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1045                false,
1046            ),
1047        ]));
1048
1049        let enums = HashMap::from_iter([
1050            (
1051                "status".to_string(),
1052                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1053            ),
1054            (
1055                "category".to_string(),
1056                Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1057            ),
1058        ]);
1059
1060        let options = CreateOptions::new("MergeTree")
1061            .with_order_by(&["category".to_string()])
1062            .with_schema_conversions(enums);
1063
1064        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1065            .expect("Should generate valid SQL");
1066
1067        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1068        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1069        assert!(sql.contains("name Nullable(String)"));
1070        assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1071        assert!(sql.contains("ENGINE = MergeTree"));
1072        assert!(sql.contains("ORDER BY (category)"));
1073    }
1074}