clickhouse_arrow/
schema.rs

1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12/// Non-exhaustive list of `ClickHouse` engines. Helps prevent typos when configuring the engine.
13///
14/// [`Self::Other`] can always be used in the case the list does not include the engine.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum ClickHouseEngine {
17    MergeTree,
18    AggregatingMergeTree,
19    CollapsingMergeTree,
20    ReplacingMergeTree,
21    SummingMergeTree,
22    Memory,
23    Log,
24    StripeLog,
25    TinyLog,
26    Other(String),
27}
28
29impl<S> From<S> for ClickHouseEngine
30where
31    S: Into<String>,
32{
33    fn from(value: S) -> Self {
34        let engine = value.into();
35        match engine.to_uppercase().as_str() {
36            "MERGETREE" => Self::MergeTree,
37            "AGGREGATINGMERGETREE" => Self::AggregatingMergeTree,
38            "COLLAPSINGMERGETREE" => Self::CollapsingMergeTree,
39            "REPLACINGMERGETREE" => Self::ReplacingMergeTree,
40            "SUMMINGMERGETREE" => Self::SummingMergeTree,
41            "MEMORY" => Self::Memory,
42            "LOG" => Self::Log,
43            "STRIPELOG" => Self::StripeLog,
44            "TINYLOG" => Self::TinyLog,
45            // Be sure to add any new engines here
46            _ => Self::Other(engine),
47        }
48    }
49}
50
51impl std::fmt::Display for ClickHouseEngine {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        // Don't use wildcard, that way it gets updated as well
54        match self {
55            Self::MergeTree => write!(f, "MergeTree"),
56            Self::AggregatingMergeTree => write!(f, "AggregatingMergeTree"),
57            Self::CollapsingMergeTree => write!(f, "CollapsingMergeTree"),
58            Self::ReplacingMergeTree => write!(f, "ReplacingMergeTree"),
59            Self::SummingMergeTree => write!(f, "SummingMergeTree"),
60            Self::Memory => write!(f, "Memory"),
61            Self::Log => write!(f, "Log"),
62            Self::StripeLog => write!(f, "StripeLog"),
63            Self::TinyLog => write!(f, "TinyLog"),
64            Self::Other(engine) => write!(f, "{engine}"),
65        }
66    }
67}
68
69/// Options for creating a `ClickHouse` table, specifying engine, ordering, partitioning, and other
70/// settings.
71///
72/// This struct is used to configure the creation of a `ClickHouse` table via
73/// [`create_table_statement_from_arrow`]. It supports common table options like `ORDER BY`,
74/// `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`, `TTL`, and custom settings. It also allows
75/// specifying default values for columns and enabling defaults for nullable columns.
76///
77/// # Examples
78/// ```rust,ignore
79/// use clickhouse_arrow::sql::CreateOptions;
80/// use clickhouse_arrow::Settings;
81///
82/// let options = CreateOptions::new("MergeTree")
83///     .with_order_by(&["id".to_string()])
84///     .with_setting("index_granularity", 4096)
85///     .with_ttl("1 DAY");
86/// ```
87#[derive(Debug, Default, Clone, PartialEq)]
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89pub struct CreateOptions {
90    pub engine:                String,
91    pub order_by:              Vec<String>,
92    pub primary_keys:          Vec<String>,
93    pub partition_by:          Option<String>,
94    pub sampling:              Option<String>,
95    pub settings:              Settings,
96    pub ttl:                   Option<String>,
97    pub schema_conversions:    Option<SchemaConversions>,
98    pub defaults:              Option<HashMap<String, String>>,
99    pub defaults_for_nullable: bool,
100}
101
102impl CreateOptions {
103    /// Creates a new `CreateOptions` with the specified engine.
104    ///
105    /// # Arguments
106    /// - `engine`: The `ClickHouse` table engine (e.g., `MergeTree`, `Memory`).
107    ///
108    /// # Returns
109    /// A new `CreateOptions` instance with the specified engine.
110    #[must_use]
111    pub fn new(engine: impl Into<String>) -> Self {
112        Self { engine: engine.into(), ..Default::default() }
113    }
114
115    /// Creates a new `CreateOptions` with the specified engine.
116    ///
117    /// # Arguments
118    /// - `engine`: The `ClickHouseEngine` .
119    ///
120    /// # Returns
121    /// A new `CreateOptions` instance with the specified engine.
122    #[must_use]
123    pub fn from_engine(engine: impl Into<ClickHouseEngine>) -> Self {
124        Self { engine: engine.into().to_string(), ..Default::default() }
125    }
126
127    /// Sets the `ORDER BY` clause for the table.
128    ///
129    /// Filters out empty strings from the provided list.
130    ///
131    /// # Arguments
132    /// - `order_by`: A slice of column names to order by.
133    ///
134    /// # Returns
135    /// Self for method chaining.
136    #[must_use]
137    pub fn with_order_by(mut self, order_by: &[String]) -> Self {
138        self.order_by =
139            order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
140        self
141    }
142
143    /// Sets the `PRIMARY KEY` clause for the table.
144    ///
145    /// Filters out empty strings from the provided list.
146    ///
147    /// # Arguments
148    /// - `keys`: A slice of column names to use as primary keys.
149    ///
150    /// # Returns
151    /// Self for method chaining.
152    #[must_use]
153    pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
154        self.primary_keys =
155            keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
156        self
157    }
158
159    /// Sets the `PARTITION BY` clause for the table.
160    ///
161    /// Ignores empty strings.
162    ///
163    /// # Arguments
164    /// - `partition_by`: The partitioning expression.
165    ///
166    /// # Returns
167    /// Self for method chaining.
168    #[must_use]
169    pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
170        let partition_by = partition_by.into();
171        if !partition_by.is_empty() {
172            self.partition_by = Some(partition_by);
173        }
174        self
175    }
176
177    /// Sets the `SAMPLE BY` clause for the table.
178    ///
179    /// Ignores empty strings.
180    ///
181    /// # Arguments
182    /// - `sampling`: The sampling expression.
183    ///
184    /// # Returns
185    /// Self for method chaining.
186    #[must_use]
187    pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
188        let sampling = sampling.into();
189        if !sampling.is_empty() {
190            self.sampling = Some(sampling);
191        }
192        self
193    }
194
195    /// Sets the table settings.
196    ///
197    /// # Arguments
198    /// - `settings`: The `Settings` object containing key-value pairs.
199    ///
200    /// # Returns
201    /// Self for method chaining.
202    #[must_use]
203    pub fn with_settings(mut self, settings: Settings) -> Self {
204        self.settings = settings;
205        self
206    }
207
208    /// Sets the `TTL` clause for the table.
209    ///
210    /// Ignores empty strings.
211    ///
212    /// # Arguments
213    /// - `ttl`: The TTL expression (e.g., `1 DAY`).
214    ///
215    /// # Returns
216    /// Self for method chaining.
217    #[must_use]
218    pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
219        let ttl = ttl.into();
220        if !ttl.is_empty() {
221            self.ttl = Some(ttl);
222        }
223        self
224    }
225
226    /// Adds a single setting to the table.
227    ///
228    /// # Arguments
229    /// - `name`: The setting name (e.g., `index_granularity`).
230    /// - `setting`: The setting value (e.g., `4096`).
231    ///
232    /// # Returns
233    /// Self for method chaining.
234    #[must_use]
235    pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
236    where
237        SettingValue: From<S>,
238    {
239        self.settings.add_setting(name.into(), setting);
240        self
241    }
242
243    /// Sets default values for columns.
244    ///
245    /// # Arguments
246    /// - `defaults`: An iterator of (column name, default value) pairs.
247    ///
248    /// # Returns
249    /// Self for method chaining.
250    #[must_use]
251    pub fn with_defaults<I>(mut self, defaults: I) -> Self
252    where
253        I: Iterator<Item = (String, String)>,
254    {
255        self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
256        self
257    }
258
259    /// Enables default values for nullable columns (e.g., `NULL`).
260    ///
261    /// # Returns
262    /// Self for method chaining.
263    #[must_use]
264    pub fn with_defaults_for_nullable(mut self) -> Self {
265        self.defaults_for_nullable = true;
266        self
267    }
268
269    /// Provide a map of resolved type conversions.
270    ///
271    /// For example, since arrow does not support enum types, providing a map of column name to
272    /// `Type::Enum8` with enum values ensures [`arrow::datatypes::DataType::Dictionary`] is
273    /// serialized as [`crate::Type::Enum16`] instead of the default [`crate::Type::LowCardinality`]
274    ///
275    /// # Returns
276    /// Self for method chaining.
277    #[must_use]
278    pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
279        self.schema_conversions = Some(map);
280        self
281    }
282
283    /// Returns the configured default values, if any.
284    ///
285    /// # Returns
286    /// An optional reference to the `HashMap` of column names to default values.
287    pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
288
289    /// Returns the configured default values, if any.
290    ///
291    /// # Returns
292    /// An optional reference to the `HashMap` of column names to default values.
293    pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
294        self.schema_conversions.as_ref()
295    }
296
297    /// Builds the table options part of a `ClickHouse` `CREATE TABLE` statement.
298    ///
299    /// Constructs the SQL for engine, `ORDER BY`, `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`,
300    /// `TTL`, and `SETTINGS` clauses. Validates constraints, such as ensuring primary keys are
301    /// a subset of `ORDER BY` columns and sampling references a primary key.
302    ///
303    /// # Returns
304    /// A `Result` containing the SQL string for the table options or a `Error` if
305    /// validation fails (e.g., empty engine, invalid primary keys).
306    ///
307    /// # Errors
308    /// - Returns `DDLMalformed` if the engine is empty, primary keys are invalid, or sampling
309    ///   doesn’t reference a primary key.
310    fn build(&self) -> Result<String> {
311        let engine = self.engine.to_string();
312        if engine.is_empty() {
313            return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
314        }
315
316        let mut options = vec![format!("ENGINE = {engine}")];
317
318        // Log engines don't support options
319        if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
320            return Ok(options.remove(0));
321        }
322
323        // Make sure order by is set
324        if self.order_by.is_empty() {
325            // Validations
326            if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
327            {
328                return Err(Error::DDLMalformed(
329                    "Cannot specify primary keys or sampling when order by is empty".into(),
330                ));
331            }
332
333            options.push("ORDER BY tuple()".into());
334        } else {
335            let order_by = self.order_by.clone();
336
337            // Validate primary keys
338            if !self.primary_keys.is_empty()
339                && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
340            {
341                return Err(Error::DDLMalformed(format!(
342                    "Primary keys but be present in order by and the ordering must match: order \
343                     by = {order_by:?}, primary keys = {:?}",
344                    self.primary_keys
345                )));
346            }
347
348            // Validate sampling
349            if let Some(sample) = self.sampling.as_ref() {
350                if !order_by.iter().any(|o| sample.contains(o.as_str())) {
351                    return Err(Error::DDLMalformed(format!(
352                        "Sampling must refer to a primary key: order by = {order_by:?}, \
353                         sampling={:?}",
354                        self.sampling
355                    )));
356                }
357            }
358
359            options.push(format!("ORDER BY ({})", order_by.join(", ")));
360        }
361
362        if !self.primary_keys.is_empty() {
363            let primary_keys = self.primary_keys.clone();
364            options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
365        }
366
367        if let Some(partition) = self.partition_by.as_ref() {
368            options.push(format!("PARTITION BY {partition}"));
369        }
370
371        if let Some(sample) = self.sampling.as_ref() {
372            options.push(format!("SAMPLE BY {sample}"));
373        }
374
375        if let Some(ttl) = self.ttl.as_ref() {
376            options.push(format!("TTL {ttl}"));
377        }
378
379        if !self.settings.is_empty() {
380            options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
381        }
382
383        Ok(options.join("\n"))
384    }
385}
386
387/// Generates a `ClickHouse` `CREATE DATABASE` statement.
388///
389/// # Arguments
390/// - `database`: The name of the database to create.
391///
392/// # Returns
393/// A `Result` containing the SQL statement or a `Error` if the database name is
394/// invalid.
395///
396/// # Errors
397/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
398///
399/// # Example
400/// ```rust,ignore
401/// use clickhouse_arrow::sql::create_db_statement;
402///
403/// let sql = create_db_statement("my_db").unwrap();
404/// assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
405/// ```
406pub(crate) fn create_db_statement(database: &str) -> Result<String> {
407    if database.is_empty() {
408        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
409    }
410
411    if database.eq_ignore_ascii_case("default") {
412        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
413    }
414
415    Ok(format!("CREATE DATABASE IF NOT EXISTS {database}"))
416}
417
418/// Generates a `ClickHouse` `DROP DATABASE` statement.
419///
420/// # Arguments
421/// - `database`: The name of the database to drop.
422/// - `sync`: If `true`, adds the `SYNC` clause for synchronous dropping.
423///
424/// # Returns
425/// A `Result` containing the SQL statement or a `Error` if the database name is
426/// invalid.
427///
428/// # Errors
429/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
430///
431/// # Example
432/// ```rust,ignore
433/// use clickhouse_arrow::sql::drop_db_statement;
434///
435/// let sql = drop_db_statement("my_db", true).unwrap();
436/// assert_eq!(sql, "DROP DATABASE IF EXISTS my_db SYNC");
437/// ```
438pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
439    if database.is_empty() {
440        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
441    }
442
443    if database.eq_ignore_ascii_case("default") {
444        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
445    }
446
447    let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
448    ddl.push_str(database);
449    if sync {
450        ddl.push_str(" SYNC");
451    }
452    Ok(ddl)
453}
454
455/// Generates a `ClickHouse` `CREATE TABLE` statement from an Arrow schema and table options.
456///
457/// # Arguments
458/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
459///   default database.
460/// - `table`: The table name.
461/// - `schema`: The Arrow schema defining the table’s columns.
462/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
463///
464/// # Returns
465/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
466/// options fail validation.
467///
468/// # Errors
469/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
470///   engine).
471/// - Returns `ArrowDeserialize` if the Arrow `DataType` cannot be converted to a `ClickHouse` type.
472/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
473///
474/// # Example
475/// ```rust,ignore
476/// use arrow::datatypes::{DataType, Field, Schema};
477/// use clickhouse_arrow::sql::{CreateOptions, create_table_statement_from_arrow};
478/// use std::sync::Arc;
479///
480/// let schema = Arc::new(Schema::new(vec![
481///     Field::new("id", DataType::Int32, false),
482///     Field::new("name", DataType::Utf8, true),
483/// ]));
484/// let options = CreateOptions::new("MergeTree")
485///     .with_order_by(&["id".to_string()]);
486/// let sql = create_table_statement_from_arrow(None, "my_table", &schema, &options).unwrap();
487/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
488/// ```
489pub(crate) fn create_table_statement_from_arrow(
490    database: Option<&str>,
491    table: &str,
492    schema: &SchemaRef,
493    options: &CreateOptions,
494    arrow_options: Option<ArrowOptions>,
495) -> Result<String> {
496    if schema.fields().is_empty() {
497        return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
498    }
499    let definition = RecordBatchDefinition {
500        arrow_options,
501        schema: Arc::clone(schema),
502        defaults: options.defaults().cloned(),
503    };
504    create_table_statement(database, table, Some(definition), options)
505}
506
507/// Generates a `ClickHouse` `CREATE TABLE` statement from a type that implements [`crate::Row`] and
508/// [`CreateOptions`].
509///
510/// # Arguments
511/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
512///   default database.
513/// - `table`: The table name.
514/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
515///
516/// # Returns
517/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
518/// options fail validation.
519///
520/// # Errors
521/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
522///   engine).
523/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
524///
525/// # Example
526/// ```rust,ignore
527/// use clickhouse_arrow::Row;
528/// use clickhouse_arrow::sql::{CreateOptions, create_table_statement_from_native};
529///
530/// #[derive(Row)]
531/// struct MyRow {
532///     id: String,
533///     name: String,
534/// }
535///
536/// let options = CreateOptions::new("MergeTree")
537///     .with_order_by(&["id".to_string()]);
538/// let sql = create_table_statement_from_native::<MyRow>(None, "my_table", &options).unwrap();
539/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
540/// ```
541pub(crate) fn create_table_statement_from_native<T: Row>(
542    database: Option<&str>,
543    table: &str,
544    options: &CreateOptions,
545) -> Result<String> {
546    create_table_statement::<T>(database, table, None, options)
547}
548
549pub(crate) fn create_table_statement<T: ColumnDefine>(
550    database: Option<&str>,
551    table: &str,
552    schema: Option<T>,
553    options: &CreateOptions,
554) -> Result<String> {
555    let column_definitions = schema
556        .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
557        .transpose()?
558        .flatten()
559        .or(T::definitions());
560
561    let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
562        return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
563    };
564
565    let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
566    let table = table.trim_matches('`');
567    let mut sql = String::new();
568    let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
569
570    let total = definitions.len();
571    for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
572        let _ = write!(sql, "  {name} {type_}");
573        if let Some(d) = options
574            .defaults
575            .as_ref()
576            .and_then(|d| d.get(&name))
577            .or(default_value.map(|d| d.to_string()).as_ref())
578        {
579            let _ = write!(sql, " DEFAULT");
580            if !d.is_empty() && d != "NULL" {
581                let _ = write!(sql, " {d}");
582            }
583        } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
584            let _ = write!(sql, " DEFAULT");
585        }
586
587        if i < (total - 1) {
588            let _ = writeln!(sql, ",");
589        }
590    }
591
592    let _ = writeln!(sql, "\n)");
593    let _ = write!(sql, "{}", options.build()?);
594
595    Ok(sql)
596}
597
598/// A type that describe the schema of its fields to be used in a `CREATE TABLE ...` query.
599///
600/// Generally this is not implemented manually, but using `clickhouse_arrow::Row` since it's
601/// implemented on any `T: Row`. But it's helpful to implement manually if additional formats are
602/// created.
603pub trait ColumnDefine: Sized {
604    type DefaultValue: std::fmt::Display + std::fmt::Debug;
605
606    /// Provide the static schema
607    fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
608
609    /// Infers the schema and returns it.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error defined by the implementation
614    fn runtime_definitions(
615        &self,
616        _: Option<&HashMap<String, Type>>,
617    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
618        Ok(Self::definitions())
619    }
620}
621
622impl<T: Row> ColumnDefine for T {
623    type DefaultValue = crate::Value;
624
625    fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
626
627    fn runtime_definitions(
628        &self,
629        conversions: Option<&HashMap<String, Type>>,
630    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
631        let Some(static_definitions) = Self::definitions() else {
632            return Ok(None);
633        };
634
635        if let Some(conversions) = conversions {
636            return Ok(Some(
637                static_definitions
638                    .into_iter()
639                    .map(|(name, type_, default_value)| {
640                        let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
641                        (name, resolved_type, default_value)
642                    })
643                    .collect::<Vec<_>>(),
644            ));
645        }
646
647        Ok(Some(static_definitions))
648    }
649}
650
651/// Helper struct to encapsulate schema creation logic for Arrow schemas.
652pub(crate) struct RecordBatchDefinition {
653    pub(crate) arrow_options: Option<ArrowOptions>,
654    pub(crate) schema:        SchemaRef,
655    pub(crate) defaults:      Option<HashMap<String, String>>,
656}
657
658impl ColumnDefine for RecordBatchDefinition {
659    type DefaultValue = String;
660
661    fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
662
663    fn runtime_definitions(
664        &self,
665        conversions: Option<&HashMap<String, Type>>,
666    ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
667        let mut fields = Vec::with_capacity(self.schema.fields.len());
668        for field in self.schema.fields() {
669            let type_ =
670                schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
671                    error!("Arrow conversion failed for field {field:?}: {error}");
672                })?;
673            let default_val =
674                if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
675                    if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
676                } else {
677                    None
678                };
679            fields.push((field.name().to_string(), type_, default_val));
680        }
681        Ok(Some(fields))
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use std::sync::Arc;
688
689    use arrow::datatypes::{DataType, Field, Schema};
690
691    use super::{ClickHouseEngine, *};
692    use crate::Type;
693
694    #[allow(clippy::needless_pass_by_value)]
695    fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
696        assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
697    }
698
699    #[test]
700    fn test_create_options_new() {
701        let options = CreateOptions::new("MergeTree");
702        assert_eq!(options.engine, "MergeTree");
703        assert!(options.order_by.is_empty());
704        assert!(options.primary_keys.is_empty());
705        assert!(options.partition_by.is_none());
706        assert!(options.sampling.is_none());
707        assert!(options.settings.is_empty());
708        assert!(options.ttl.is_none());
709        assert!(options.defaults.is_none());
710        assert!(!options.defaults_for_nullable);
711    }
712
713    #[test]
714    fn test_create_options_with_order_by() {
715        let options = CreateOptions::new("MergeTree").with_order_by(&[
716            "id".to_string(),
717            String::new(),
718            "name".to_string(),
719        ]);
720        assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
721    }
722
723    #[test]
724    fn test_create_options_with_primary_keys() {
725        let options = CreateOptions::new("MergeTree").with_primary_keys(&[
726            "id".to_string(),
727            String::new(),
728            "name".to_string(),
729        ]);
730        assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
731    }
732
733    #[test]
734    fn test_create_options_with_partition_by() {
735        let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
736        assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
737
738        let options = CreateOptions::new("MergeTree").with_partition_by("");
739        assert_eq!(options.partition_by, None);
740    }
741
742    #[test]
743    fn test_create_options_with_sample_by() {
744        let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
745        assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
746
747        let options = CreateOptions::new("MergeTree").with_sample_by("");
748        assert_eq!(options.sampling, None);
749    }
750
751    #[test]
752    fn test_create_options_with_settings() {
753        let settings = Settings::default().with_setting("index_granularity", 4096);
754        let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
755        assert_eq!(options.settings, settings);
756    }
757
758    #[test]
759    fn test_create_options_with_setting() {
760        let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
761        assert_eq!(options.settings.encode_to_strings(), vec![
762            "index_granularity = 4096".to_string()
763        ]);
764    }
765
766    #[test]
767    fn test_create_options_with_ttl() {
768        let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
769        assert_eq!(options.ttl, Some("1 DAY".to_string()));
770
771        let options = CreateOptions::new("MergeTree").with_ttl("");
772        assert_eq!(options.ttl, None);
773    }
774
775    #[test]
776    fn test_create_options_with_defaults() {
777        let defaults = vec![
778            ("id".to_string(), "0".to_string()),
779            ("name".to_string(), "'unknown'".to_string()),
780        ];
781        let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
782        assert_eq!(
783            options.defaults,
784            Some(HashMap::from([
785                ("id".to_string(), "0".to_string()),
786                ("name".to_string(), "'unknown'".to_string()),
787            ]))
788        );
789    }
790
791    #[test]
792    fn test_create_options_with_defaults_for_nullable() {
793        let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
794        assert!(options.defaults_for_nullable);
795    }
796
797    #[test]
798    fn test_create_options_build_merge_tree() {
799        let options = CreateOptions::new("MergeTree")
800            .with_order_by(&["id".to_string(), "date".to_string()])
801            .with_primary_keys(&["id".to_string()])
802            .with_partition_by("toYYYYMM(date)")
803            .with_sample_by("cityHash64(id)")
804            .with_ttl("1 DAY")
805            .with_setting("index_granularity", 4096);
806        let sql = options.build().unwrap();
807        compare_sql(
808            sql,
809            "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
810             toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
811             4096",
812        );
813    }
814
815    #[test]
816    fn test_create_options_build_log_engine() {
817        let options = CreateOptions::new("TinyLog");
818        let sql = options.build().unwrap();
819        assert_eq!(sql, "ENGINE = TinyLog");
820    }
821
822    #[test]
823    fn test_create_options_build_empty_order_by() {
824        let options = CreateOptions::new("MergeTree");
825        let sql = options.build().unwrap();
826        compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
827    }
828
829    #[test]
830    fn test_create_options_build_invalid_engine() {
831        let options = CreateOptions::new("");
832        let result = options.build();
833        assert!(matches!(result, Err(Error::DDLMalformed(_))));
834    }
835
836    #[test]
837    fn test_create_options_build_invalid_primary_keys() {
838        let options = CreateOptions::new("MergeTree")
839            .with_order_by(&["id".to_string()])
840            .with_primary_keys(&["name".to_string()]);
841        let result = options.build();
842        assert!(matches!(result, Err(Error::DDLMalformed(_))));
843    }
844
845    #[test]
846    fn test_create_options_build_invalid_sampling() {
847        let options = CreateOptions::new("MergeTree")
848            .with_order_by(&["id".to_string()])
849            .with_sample_by("cityHash64(name)");
850        let result = options.build();
851        assert!(matches!(result, Err(Error::DDLMalformed(_))));
852    }
853
854    #[test]
855    fn test_create_db_statement() {
856        let sql = create_db_statement("my_db").unwrap();
857        assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
858
859        let result = create_db_statement("");
860        assert!(matches!(result, Err(Error::DDLMalformed(_))));
861
862        let result = create_db_statement("default");
863        assert!(matches!(result, Err(Error::DDLMalformed(_))));
864    }
865
866    #[test]
867    fn test_drop_db_statement() {
868        let sql = drop_db_statement("my_db", false).unwrap();
869        compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
870
871        let sql = drop_db_statement("my_db", true).unwrap();
872        compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
873
874        let result = drop_db_statement("", false);
875        assert!(matches!(result, Err(Error::DDLMalformed(_))));
876
877        let result = drop_db_statement("default", false);
878        assert!(matches!(result, Err(Error::DDLMalformed(_))));
879    }
880
881    #[test]
882    fn test_create_table_statement() {
883        let schema = Arc::new(Schema::new(vec![
884            Field::new("id", DataType::Int32, false),
885            Field::new("name", DataType::Utf8, true),
886        ]));
887        let options = CreateOptions::new("MergeTree")
888            .with_order_by(&["id".to_string()])
889            .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
890            .with_defaults_for_nullable();
891        let sql =
892            create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
893        compare_sql(
894            sql,
895            "CREATE TABLE IF NOT EXISTS `my_table` (\n  id Int32,\n  name Nullable(String) \
896             DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
897        );
898    }
899
900    #[test]
901    fn test_create_table_statement_with_database() {
902        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
903        let options = CreateOptions::new("Memory");
904        let sql =
905            create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
906                .unwrap();
907        compare_sql(
908            sql,
909            "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
910             BY tuple()",
911        );
912    }
913
914    #[test]
915    fn test_create_table_statement_empty_schema() {
916        let schema = Arc::new(Schema::empty());
917        let options = CreateOptions::new("MergeTree");
918        let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
919        assert!(matches!(result, Err(Error::DDLMalformed(_))));
920    }
921
922    #[test]
923    fn test_create_table_with_nullable_dictionary() {
924        let schema = Arc::new(Schema::new(vec![
925            Field::new(
926                "status",
927                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
928                true,
929            ),
930            Field::new("id", DataType::Int32, false),
931        ]));
932
933        let enum_i8 = HashMap::from_iter([(
934            "status".to_string(),
935            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
936        )]);
937
938        let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
939        let enum_options = options.clone().with_schema_conversions(enum_i8);
940
941        // If the nullable dictionary will not be converted to enum, this will fail
942        assert!(
943            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
944        );
945
946        // Otherwise it will succeed
947        let sql =
948            create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
949                .expect("Should generate valid SQL");
950
951        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
952        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
953        assert!(sql.contains("id Int32"));
954        assert!(sql.contains("ENGINE = MergeTree"));
955        assert!(sql.contains("ORDER BY (id)"));
956    }
957
958    #[test]
959    fn test_create_table_with_enum8() {
960        let schema = Arc::new(Schema::new(vec![
961            Field::new(
962                "status",
963                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
964                false,
965            ),
966            Field::new("id", DataType::Int32, false),
967        ]));
968
969        let enum_i8 = HashMap::from_iter([(
970            "status".to_string(),
971            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
972        )]);
973
974        let options = CreateOptions::new("MergeTree")
975            .with_order_by(&["id".to_string()])
976            .with_schema_conversions(enum_i8);
977
978        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
979            .expect("Should generate valid SQL");
980
981        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
982        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
983        assert!(sql.contains("id Int32"));
984        assert!(sql.contains("ENGINE = MergeTree"));
985        assert!(sql.contains("ORDER BY (id)"));
986    }
987
988    #[test]
989    fn test_create_table_with_enum16() {
990        let schema = Arc::new(Schema::new(vec![
991            Field::new(
992                "category",
993                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
994                false,
995            ),
996            Field::new("value", DataType::Float32, true),
997        ]));
998
999        let enum_i16 = HashMap::from_iter([(
1000            "category".to_string(),
1001            Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
1002        )]);
1003        let options = CreateOptions::new("MergeTree")
1004            .with_order_by(&["category".to_string()])
1005            .with_schema_conversions(enum_i16);
1006
1007        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1008            .expect("Should generate valid SQL");
1009
1010        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1011        assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
1012        assert!(sql.contains("value Nullable(Float32)"));
1013        assert!(sql.contains("ENGINE = MergeTree"));
1014        assert!(sql.contains("ORDER BY (category)"));
1015    }
1016
1017    #[test]
1018    fn test_create_table_with_invalid_enum_type() {
1019        let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
1020
1021        let enum_i8 = HashMap::from_iter([(
1022            "status".to_string(),
1023            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1024        )]);
1025
1026        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1027
1028        let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
1029
1030        assert!(matches!(
1031            result,
1032            Err(Error::TypeConversion(msg))
1033            if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
1034        ));
1035    }
1036
1037    #[test]
1038    fn test_create_table_with_non_low_cardinality_enum() {
1039        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1040
1041        let enum_i8 = HashMap::from_iter([(
1042            "name".to_string(),
1043            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1044        )]);
1045        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1046
1047        let sql =
1048            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
1049
1050        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1051        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1052        assert!(sql.contains("ENGINE = MergeTree"));
1053    }
1054
1055    // The arrow data type drives nullability
1056    #[test]
1057    fn test_create_table_with_nullable_field_non_nullable_enum() {
1058        let schema = Arc::new(Schema::new(vec![
1059            Field::new("name", DataType::Utf8, true),
1060            Field::new("status", DataType::Utf8, false),
1061        ]));
1062
1063        let enum_i8 = HashMap::from_iter([
1064            (
1065                "name".to_string(),
1066                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1067                    .into_nullable(),
1068            ),
1069            (
1070                "status".to_string(),
1071                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1072                    .into_nullable(),
1073            ),
1074        ]);
1075        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1076        let arrow_options = ArrowOptions::default()
1077            // Deserialize strings as Utf8, not Binary
1078            .with_strings_as_strings(true)
1079            // Deserialize Date as Date32
1080            .with_use_date32_for_date(true)
1081            // Ignore fields that ClickHouse doesn't support.
1082            .with_strict_schema(false)
1083            .with_disable_strict_schema_ddl(true);
1084
1085        let sql = create_table_statement_from_arrow(
1086            None,
1087            "test_table",
1088            &schema,
1089            &options,
1090            Some(arrow_options),
1091        )
1092        .unwrap();
1093
1094        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1095        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1096        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1097        assert!(sql.contains("ENGINE = MergeTree"));
1098    }
1099
1100    #[test]
1101    fn test_create_table_with_mixed_enum_and_non_enum() {
1102        let schema = Arc::new(Schema::new(vec![
1103            Field::new(
1104                "status",
1105                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1106                true,
1107            ),
1108            Field::new("name", DataType::Utf8, true),
1109            Field::new(
1110                "category",
1111                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1112                false,
1113            ),
1114        ]));
1115
1116        let enums = HashMap::from_iter([
1117            (
1118                "status".to_string(),
1119                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1120            ),
1121            (
1122                "category".to_string(),
1123                Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1124            ),
1125        ]);
1126
1127        let options = CreateOptions::new("MergeTree")
1128            .with_order_by(&["category".to_string()])
1129            .with_schema_conversions(enums);
1130
1131        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1132            .expect("Should generate valid SQL");
1133
1134        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1135        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1136        assert!(sql.contains("name Nullable(String)"));
1137        assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1138        assert!(sql.contains("ENGINE = MergeTree"));
1139        assert!(sql.contains("ORDER BY (category)"));
1140    }
1141
1142    #[test]
1143    fn test_engines() {
1144        use super::ClickHouseEngine::*;
1145
1146        let engines = [
1147            MergeTree,
1148            AggregatingMergeTree,
1149            CollapsingMergeTree,
1150            ReplacingMergeTree,
1151            SummingMergeTree,
1152            Memory,
1153            Log,
1154            StripeLog,
1155            TinyLog,
1156            Other("NonExistentEngine".into()),
1157        ];
1158
1159        for engine in engines {
1160            let engine_str = engine.to_string();
1161            let engine_from = ClickHouseEngine::from(engine_str);
1162            assert_eq!(engine, engine_from);
1163        }
1164    }
1165}