clickhouse_arrow/
schema.rs

1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12/// Non-exhaustive list of `ClickHouse` engines. Helps prevent typos when configuring the engine.
13///
14/// [`Self::Other`] can always be used in the case the list does not include the engine.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum ClickHouseEngine {
17    MergeTree,
18    AggregatingMergeTree,
19    CollapsingMergeTree,
20    ReplacingMergeTree,
21    SummingMergeTree,
22    Memory,
23    Log,
24    StripeLog,
25    TinyLog,
26    Other(String),
27}
28
29impl<S> From<S> for ClickHouseEngine
30where
31    S: Into<String>,
32{
33    fn from(value: S) -> Self {
34        let engine = value.into();
35        match engine.to_uppercase().as_str() {
36            "MERGETREE" => Self::MergeTree,
37            "AGGREGATINGMERGETREE" => Self::AggregatingMergeTree,
38            "COLLAPSINGMERGETREE" => Self::CollapsingMergeTree,
39            "REPLACINGMERGETREE" => Self::ReplacingMergeTree,
40            "SUMMINGMERGETREE" => Self::SummingMergeTree,
41            "MEMORY" => Self::Memory,
42            "LOG" => Self::Log,
43            "STRIPELOG" => Self::StripeLog,
44            "TINYLOG" => Self::TinyLog,
45            // Be sure to add any new engines here
46            _ => Self::Other(engine),
47        }
48    }
49}
50
51impl std::fmt::Display for ClickHouseEngine {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        // Don't use wildcard, that way it gets updated as well
54        match self {
55            Self::MergeTree => write!(f, "MergeTree"),
56            Self::AggregatingMergeTree => write!(f, "AggregatingMergeTree"),
57            Self::CollapsingMergeTree => write!(f, "CollapsingMergeTree"),
58            Self::ReplacingMergeTree => write!(f, "ReplacingMergeTree"),
59            Self::SummingMergeTree => write!(f, "SummingMergeTree"),
60            Self::Memory => write!(f, "Memory"),
61            Self::Log => write!(f, "Log"),
62            Self::StripeLog => write!(f, "StripeLog"),
63            Self::TinyLog => write!(f, "TinyLog"),
64            Self::Other(engine) => write!(f, "{engine}"),
65        }
66    }
67}
68
69/// Options for creating a `ClickHouse` table, specifying engine, ordering, partitioning, and other
70/// settings.
71///
72/// This struct is used to configure the creation of a `ClickHouse` table via
73/// `create_table_statement_from_arrow`. It supports common table options like `ORDER BY`,
74/// `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`, `TTL`, and custom settings. It also allows
75/// specifying default values for columns and enabling defaults for nullable columns.
76///
77/// # Examples
78/// ```rust,ignore
79/// use clickhouse_arrow::sql::CreateOptions;
80/// use clickhouse_arrow::Settings;
81///
82/// let options = CreateOptions::new("MergeTree")
83///     .with_order_by(&["id".to_string()])
84///     .with_setting("index_granularity", 4096)
85///     .with_ttl("1 DAY");
86/// ```
87#[derive(Debug, Default, Clone, PartialEq)]
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89pub struct CreateOptions {
90    pub engine:                String,
91    pub order_by:              Vec<String>,
92    pub primary_keys:          Vec<String>,
93    pub partition_by:          Option<String>,
94    pub sampling:              Option<String>,
95    pub settings:              Settings,
96    pub ttl:                   Option<String>,
97    pub schema_conversions:    Option<SchemaConversions>,
98    pub defaults:              Option<HashMap<String, String>>,
99    pub defaults_for_nullable: bool,
100}
101
102impl CreateOptions {
103    /// Creates a new `CreateOptions` with the specified engine.
104    ///
105    /// # Arguments
106    /// - `engine`: The `ClickHouse` table engine (e.g., `MergeTree`, `Memory`).
107    ///
108    /// # Returns
109    /// A new `CreateOptions` instance with the specified engine.
110    #[must_use]
111    pub fn new(engine: impl Into<String>) -> Self {
112        Self { engine: engine.into(), ..Default::default() }
113    }
114
115    /// Creates a new `CreateOptions` with the specified engine.
116    ///
117    /// # Arguments
118    /// - `engine`: The `ClickHouseEngine` .
119    ///
120    /// # Returns
121    /// A new `CreateOptions` instance with the specified engine.
122    #[must_use]
123    pub fn from_engine(engine: impl Into<ClickHouseEngine>) -> Self {
124        Self { engine: engine.into().to_string(), ..Default::default() }
125    }
126
127    /// Sets the `ORDER BY` clause for the table.
128    ///
129    /// Filters out empty strings from the provided list.
130    ///
131    /// # Arguments
132    /// - `order_by`: A slice of column names to order by.
133    ///
134    /// # Returns
135    /// Self for method chaining.
136    #[must_use]
137    pub fn with_order_by(mut self, order_by: &[String]) -> Self {
138        self.order_by =
139            order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
140        self
141    }
142
143    /// Sets the `PRIMARY KEY` clause for the table.
144    ///
145    /// Filters out empty strings from the provided list.
146    ///
147    /// # Arguments
148    /// - `keys`: A slice of column names to use as primary keys.
149    ///
150    /// # Returns
151    /// Self for method chaining.
152    #[must_use]
153    pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
154        self.primary_keys =
155            keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
156        self
157    }
158
159    /// Sets the `PARTITION BY` clause for the table.
160    ///
161    /// Ignores empty strings.
162    ///
163    /// # Arguments
164    /// - `partition_by`: The partitioning expression.
165    ///
166    /// # Returns
167    /// Self for method chaining.
168    #[must_use]
169    pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
170        let partition_by = partition_by.into();
171        if !partition_by.is_empty() {
172            self.partition_by = Some(partition_by);
173        }
174        self
175    }
176
177    /// Sets the `SAMPLE BY` clause for the table.
178    ///
179    /// Ignores empty strings.
180    ///
181    /// # Arguments
182    /// - `sampling`: The sampling expression.
183    ///
184    /// # Returns
185    /// Self for method chaining.
186    #[must_use]
187    pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
188        let sampling = sampling.into();
189        if !sampling.is_empty() {
190            self.sampling = Some(sampling);
191        }
192        self
193    }
194
195    /// Sets the table settings.
196    ///
197    /// # Arguments
198    /// - `settings`: The `Settings` object containing key-value pairs.
199    ///
200    /// # Returns
201    /// Self for method chaining.
202    #[must_use]
203    pub fn with_settings(mut self, settings: Settings) -> Self {
204        self.settings = settings;
205        self
206    }
207
208    /// Sets the `TTL` clause for the table.
209    ///
210    /// Ignores empty strings.
211    ///
212    /// # Arguments
213    /// - `ttl`: The TTL expression (e.g., `1 DAY`).
214    ///
215    /// # Returns
216    /// Self for method chaining.
217    #[must_use]
218    pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
219        let ttl = ttl.into();
220        if !ttl.is_empty() {
221            self.ttl = Some(ttl);
222        }
223        self
224    }
225
226    /// Adds a single setting to the table.
227    ///
228    /// # Arguments
229    /// - `name`: The setting name (e.g., `index_granularity`).
230    /// - `setting`: The setting value (e.g., `4096`).
231    ///
232    /// # Returns
233    /// Self for method chaining.
234    #[must_use]
235    pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
236    where
237        SettingValue: From<S>,
238    {
239        self.settings.add_setting(name.into(), setting);
240        self
241    }
242
243    /// Sets default values for columns.
244    ///
245    /// # Arguments
246    /// - `defaults`: An iterator of (column name, default value) pairs.
247    ///
248    /// # Returns
249    /// Self for method chaining.
250    #[must_use]
251    pub fn with_defaults<I>(mut self, defaults: I) -> Self
252    where
253        I: Iterator<Item = (String, String)>,
254    {
255        self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
256        self
257    }
258
259    /// Enables default values for nullable columns (e.g., `NULL`).
260    ///
261    /// # Returns
262    /// Self for method chaining.
263    #[must_use]
264    pub fn with_defaults_for_nullable(mut self) -> Self {
265        self.defaults_for_nullable = true;
266        self
267    }
268
269    /// Provide a map of resolved type conversions.
270    ///
271    /// For example, since arrow does not support enum types, providing a map of column name to
272    /// `Type::Enum8` with enum values ensures [`arrow::datatypes::DataType::Dictionary`] is
273    /// serialized as [`crate::Type::Enum16`] instead of the default [`crate::Type::LowCardinality`]
274    ///
275    /// # Returns
276    /// Self for method chaining.
277    #[must_use]
278    pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
279        self.schema_conversions = Some(map);
280        self
281    }
282
283    /// Returns the configured default values, if any.
284    ///
285    /// # Returns
286    /// An optional reference to the `HashMap` of column names to default values.
287    pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
288
289    /// Returns the configured default values, if any.
290    ///
291    /// # Returns
292    /// An optional reference to the `HashMap` of column names to default values.
293    pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
294        self.schema_conversions.as_ref()
295    }
296
297    /// Builds the table options part of a `ClickHouse` `CREATE TABLE` statement.
298    ///
299    /// Constructs the SQL for engine, `ORDER BY`, `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`,
300    /// `TTL`, and `SETTINGS` clauses. Validates constraints, such as ensuring primary keys are
301    /// a subset of `ORDER BY` columns and sampling references a primary key.
302    ///
303    /// # Returns
304    /// A `Result` containing the SQL string for the table options or a `Error` if
305    /// validation fails (e.g., empty engine, invalid primary keys).
306    ///
307    /// # Errors
308    /// - Returns `DDLMalformed` if the engine is empty, primary keys are invalid, or sampling
309    ///   doesn’t reference a primary key.
310    fn build(&self) -> Result<String> {
311        let engine = self.engine.clone();
312        if engine.is_empty() {
313            return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
314        }
315
316        let mut options = vec![format!("ENGINE = {engine}")];
317
318        // Log engines don't support options
319        if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
320            return Ok(options.remove(0));
321        }
322
323        // Make sure order by is set
324        if self.order_by.is_empty() {
325            // Validations
326            if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
327            {
328                return Err(Error::DDLMalformed(
329                    "Cannot specify primary keys or sampling when order by is empty".into(),
330                ));
331            }
332
333            options.push("ORDER BY tuple()".into());
334        } else {
335            let order_by = self.order_by.clone();
336
337            // Validate primary keys
338            if !self.primary_keys.is_empty()
339                && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
340            {
341                return Err(Error::DDLMalformed(format!(
342                    "Primary keys but be present in order by and the ordering must match: order \
343                     by = {order_by:?}, primary keys = {:?}",
344                    self.primary_keys
345                )));
346            }
347
348            // Validate sampling
349            if let Some(sample) = self.sampling.as_ref()
350                && !order_by.iter().any(|o| sample.contains(o.as_str()))
351            {
352                return Err(Error::DDLMalformed(format!(
353                    "Sampling must refer to a primary key: order by = {order_by:?}, sampling={:?}",
354                    self.sampling
355                )));
356            }
357
358            options.push(format!("ORDER BY ({})", order_by.join(", ")));
359        }
360
361        if !self.primary_keys.is_empty() {
362            let primary_keys = self.primary_keys.clone();
363            options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
364        }
365
366        if let Some(partition) = self.partition_by.as_ref() {
367            options.push(format!("PARTITION BY {partition}"));
368        }
369
370        if let Some(sample) = self.sampling.as_ref() {
371            options.push(format!("SAMPLE BY {sample}"));
372        }
373
374        if let Some(ttl) = self.ttl.as_ref() {
375            options.push(format!("TTL {ttl}"));
376        }
377
378        if !self.settings.is_empty() {
379            options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
380        }
381
382        Ok(options.join("\n"))
383    }
384}
385
386/// Generates a `ClickHouse` `CREATE DATABASE` statement.
387///
388/// # Arguments
389/// - `database`: The name of the database to create.
390///
391/// # Returns
392/// A `Result` containing the SQL statement or a `Error` if the database name is
393/// invalid.
394///
395/// # Errors
396/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
397///
398/// # Example
399/// ```rust,ignore
400/// use clickhouse_arrow::sql::create_db_statement;
401///
402/// let sql = create_db_statement("my_db").unwrap();
403/// assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
404/// ```
405pub(crate) fn create_db_statement(database: &str) -> Result<String> {
406    if database.is_empty() {
407        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
408    }
409
410    if database.eq_ignore_ascii_case("default") {
411        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
412    }
413
414    Ok(format!("CREATE DATABASE IF NOT EXISTS {database}"))
415}
416
417/// Generates a `ClickHouse` `DROP DATABASE` statement.
418///
419/// # Arguments
420/// - `database`: The name of the database to drop.
421/// - `sync`: If `true`, adds the `SYNC` clause for synchronous dropping.
422///
423/// # Returns
424/// A `Result` containing the SQL statement or a `Error` if the database name is
425/// invalid.
426///
427/// # Errors
428/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
429///
430/// # Example
431/// ```rust,ignore
432/// use clickhouse_arrow::sql::drop_db_statement;
433///
434/// let sql = drop_db_statement("my_db", true).unwrap();
435/// assert_eq!(sql, "DROP DATABASE IF EXISTS my_db SYNC");
436/// ```
437pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
438    if database.is_empty() {
439        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
440    }
441
442    if database.eq_ignore_ascii_case("default") {
443        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
444    }
445
446    let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
447    ddl.push_str(database);
448    if sync {
449        ddl.push_str(" SYNC");
450    }
451    Ok(ddl)
452}
453
454/// Generates a `ClickHouse` `CREATE TABLE` statement from an Arrow schema and table options.
455///
456/// # Arguments
457/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
458///   default database.
459/// - `table`: The table name.
460/// - `schema`: The Arrow schema defining the table’s columns.
461/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
462///
463/// # Returns
464/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
465/// options fail validation.
466///
467/// # Errors
468/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
469///   engine).
470/// - Returns `ArrowDeserialize` if the Arrow `DataType` cannot be converted to a `ClickHouse` type.
471/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
472///
473/// # Example
474/// ```rust,ignore
475/// use arrow::datatypes::{DataType, Field, Schema};
476/// use crate::sql::{CreateOptions, create_table_statement_from_arrow};
477/// use std::sync::Arc;
478///
479/// let schema = Arc::new(Schema::new(vec![
480///     Field::new("id", DataType::Int32, false),
481///     Field::new("name", DataType::Utf8, true),
482/// ]));
483/// let options = CreateOptions::new("MergeTree")
484///     .with_order_by(&["id".to_string()]);
485/// let sql = create_table_statement_from_arrow(None, "my_table", &schema, &options).unwrap();
486/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
487/// ```
488pub(crate) fn create_table_statement_from_arrow(
489    database: Option<&str>,
490    table: &str,
491    schema: &SchemaRef,
492    options: &CreateOptions,
493    arrow_options: Option<ArrowOptions>,
494) -> Result<String> {
495    if schema.fields().is_empty() {
496        return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
497    }
498    let definition = RecordBatchDefinition {
499        arrow_options,
500        schema: Arc::clone(schema),
501        defaults: options.defaults().cloned(),
502    };
503    create_table_statement(database, table, Some(definition), options)
504}
505
506/// Generates a `ClickHouse` `CREATE TABLE` statement from a type that implements [`crate::Row`] and
507/// [`CreateOptions`].
508///
509/// # Arguments
510/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
511///   default database.
512/// - `table`: The table name.
513/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
514///
515/// # Returns
516/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
517/// options fail validation.
518///
519/// # Errors
520/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
521///   engine).
522/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
523///
524/// # Example
525/// ```rust,ignore
526/// use clickhouse_arrow::Row;
527/// use clickhouse_arrow::sql::{CreateOptions, create_table_statement_from_native};
528///
529/// #[derive(Row)]
530/// struct MyRow {
531///     id: String,
532///     name: String,
533/// }
534///
535/// let options = CreateOptions::new("MergeTree")
536///     .with_order_by(&["id".to_string()]);
537/// let sql = create_table_statement_from_native::<MyRow>(None, "my_table", &options).unwrap();
538/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
539/// ```
540pub(crate) fn create_table_statement_from_native<T: Row>(
541    database: Option<&str>,
542    table: &str,
543    options: &CreateOptions,
544) -> Result<String> {
545    create_table_statement::<T>(database, table, None, options)
546}
547
548pub(crate) fn create_table_statement<T: ColumnDefine>(
549    database: Option<&str>,
550    table: &str,
551    schema: Option<T>,
552    options: &CreateOptions,
553) -> Result<String> {
554    let column_definitions = schema
555        .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
556        .transpose()?
557        .flatten()
558        .or(T::definitions());
559
560    let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
561        return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
562    };
563
564    let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
565    let table = table.trim_matches('`');
566    let mut sql = String::new();
567    let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
568
569    let total = definitions.len();
570    for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
571        let _ = write!(sql, "  {name} {type_}");
572        if let Some(d) = options
573            .defaults
574            .as_ref()
575            .and_then(|d| d.get(&name))
576            .or(default_value.map(|d| d.to_string()).as_ref())
577        {
578            let _ = write!(sql, " DEFAULT");
579            if !d.is_empty() && d != "NULL" {
580                let _ = write!(sql, " {d}");
581            }
582        } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
583            let _ = write!(sql, " DEFAULT");
584        }
585
586        if i < (total - 1) {
587            let _ = writeln!(sql, ",");
588        }
589    }
590
591    let _ = writeln!(sql, "\n)");
592    let _ = write!(sql, "{}", options.build()?);
593
594    Ok(sql)
595}
596
597/// A type that describe the schema of its fields to be used in a `CREATE TABLE ...` query.
598///
599/// Generally this is not implemented manually, but using `clickhouse_arrow::Row` since it's
600/// implemented on any `T: Row`. But it's helpful to implement manually if additional formats are
601/// created.
602pub trait ColumnDefine: Sized {
603    type DefaultValue: std::fmt::Display + std::fmt::Debug;
604
605    /// Provide the static schema
606    fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
607
608    /// Infers the schema and returns it.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error defined by the implementation
613    fn runtime_definitions(
614        &self,
615        _: Option<&HashMap<String, Type>>,
616    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
617        Ok(Self::definitions())
618    }
619}
620
621impl<T: Row> ColumnDefine for T {
622    type DefaultValue = crate::Value;
623
624    fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
625
626    fn runtime_definitions(
627        &self,
628        conversions: Option<&HashMap<String, Type>>,
629    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
630        let Some(static_definitions) = Self::definitions() else {
631            return Ok(None);
632        };
633
634        if let Some(conversions) = conversions {
635            return Ok(Some(
636                static_definitions
637                    .into_iter()
638                    .map(|(name, type_, default_value)| {
639                        let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
640                        (name, resolved_type, default_value)
641                    })
642                    .collect::<Vec<_>>(),
643            ));
644        }
645
646        Ok(Some(static_definitions))
647    }
648}
649
650/// Helper struct to encapsulate schema creation logic for Arrow schemas.
651pub(crate) struct RecordBatchDefinition {
652    pub(crate) arrow_options: Option<ArrowOptions>,
653    pub(crate) schema:        SchemaRef,
654    pub(crate) defaults:      Option<HashMap<String, String>>,
655}
656
657impl ColumnDefine for RecordBatchDefinition {
658    type DefaultValue = String;
659
660    fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
661
662    fn runtime_definitions(
663        &self,
664        conversions: Option<&HashMap<String, Type>>,
665    ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
666        let mut fields = Vec::with_capacity(self.schema.fields.len());
667        for field in self.schema.fields() {
668            let type_ =
669                schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
670                    error!("Arrow conversion failed for field {field:?}: {error}");
671                })?;
672            let default_val =
673                if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
674                    if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
675                } else {
676                    None
677                };
678            fields.push((field.name().clone(), type_, default_val));
679        }
680        Ok(Some(fields))
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use std::sync::Arc;
687
688    use arrow::datatypes::{DataType, Field, Schema};
689
690    use super::{ClickHouseEngine, *};
691    use crate::Type;
692
693    #[allow(clippy::needless_pass_by_value)]
694    fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
695        assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
696    }
697
698    #[test]
699    fn test_create_options_new() {
700        let options = CreateOptions::new("MergeTree");
701        assert_eq!(options.engine, "MergeTree");
702        assert!(options.order_by.is_empty());
703        assert!(options.primary_keys.is_empty());
704        assert!(options.partition_by.is_none());
705        assert!(options.sampling.is_none());
706        assert!(options.settings.is_empty());
707        assert!(options.ttl.is_none());
708        assert!(options.defaults.is_none());
709        assert!(!options.defaults_for_nullable);
710    }
711
712    #[test]
713    fn test_create_options_with_order_by() {
714        let options = CreateOptions::new("MergeTree").with_order_by(&[
715            "id".to_string(),
716            String::new(),
717            "name".to_string(),
718        ]);
719        assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
720    }
721
722    #[test]
723    fn test_create_options_with_primary_keys() {
724        let options = CreateOptions::new("MergeTree").with_primary_keys(&[
725            "id".to_string(),
726            String::new(),
727            "name".to_string(),
728        ]);
729        assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
730    }
731
732    #[test]
733    fn test_create_options_with_partition_by() {
734        let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
735        assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
736
737        let options = CreateOptions::new("MergeTree").with_partition_by("");
738        assert_eq!(options.partition_by, None);
739    }
740
741    #[test]
742    fn test_create_options_with_sample_by() {
743        let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
744        assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
745
746        let options = CreateOptions::new("MergeTree").with_sample_by("");
747        assert_eq!(options.sampling, None);
748    }
749
750    #[test]
751    fn test_create_options_with_settings() {
752        let settings = Settings::default().with_setting("index_granularity", 4096);
753        let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
754        assert_eq!(options.settings, settings);
755    }
756
757    #[test]
758    fn test_create_options_with_setting() {
759        let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
760        assert_eq!(options.settings.encode_to_strings(), vec![
761            "index_granularity = 4096".to_string()
762        ]);
763    }
764
765    #[test]
766    fn test_create_options_with_ttl() {
767        let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
768        assert_eq!(options.ttl, Some("1 DAY".to_string()));
769
770        let options = CreateOptions::new("MergeTree").with_ttl("");
771        assert_eq!(options.ttl, None);
772    }
773
774    #[test]
775    fn test_create_options_with_defaults() {
776        let defaults = vec![
777            ("id".to_string(), "0".to_string()),
778            ("name".to_string(), "'unknown'".to_string()),
779        ];
780        let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
781        assert_eq!(
782            options.defaults,
783            Some(HashMap::from([
784                ("id".to_string(), "0".to_string()),
785                ("name".to_string(), "'unknown'".to_string()),
786            ]))
787        );
788    }
789
790    #[test]
791    fn test_create_options_with_defaults_for_nullable() {
792        let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
793        assert!(options.defaults_for_nullable);
794    }
795
796    #[test]
797    fn test_create_options_build_merge_tree() {
798        let options = CreateOptions::new("MergeTree")
799            .with_order_by(&["id".to_string(), "date".to_string()])
800            .with_primary_keys(&["id".to_string()])
801            .with_partition_by("toYYYYMM(date)")
802            .with_sample_by("cityHash64(id)")
803            .with_ttl("1 DAY")
804            .with_setting("index_granularity", 4096);
805        let sql = options.build().unwrap();
806        compare_sql(
807            sql,
808            "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
809             toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
810             4096",
811        );
812    }
813
814    #[test]
815    fn test_create_options_build_log_engine() {
816        let options = CreateOptions::new("TinyLog");
817        let sql = options.build().unwrap();
818        assert_eq!(sql, "ENGINE = TinyLog");
819    }
820
821    #[test]
822    fn test_create_options_build_empty_order_by() {
823        let options = CreateOptions::new("MergeTree");
824        let sql = options.build().unwrap();
825        compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
826    }
827
828    #[test]
829    fn test_create_options_build_invalid_engine() {
830        let options = CreateOptions::new("");
831        let result = options.build();
832        assert!(matches!(result, Err(Error::DDLMalformed(_))));
833    }
834
835    #[test]
836    fn test_create_options_build_invalid_primary_keys() {
837        let options = CreateOptions::new("MergeTree")
838            .with_order_by(&["id".to_string()])
839            .with_primary_keys(&["name".to_string()]);
840        let result = options.build();
841        assert!(matches!(result, Err(Error::DDLMalformed(_))));
842    }
843
844    #[test]
845    fn test_create_options_build_invalid_sampling() {
846        let options = CreateOptions::new("MergeTree")
847            .with_order_by(&["id".to_string()])
848            .with_sample_by("cityHash64(name)");
849        let result = options.build();
850        assert!(matches!(result, Err(Error::DDLMalformed(_))));
851    }
852
853    #[test]
854    fn test_create_db_statement() {
855        let sql = create_db_statement("my_db").unwrap();
856        assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
857
858        let result = create_db_statement("");
859        assert!(matches!(result, Err(Error::DDLMalformed(_))));
860
861        let result = create_db_statement("default");
862        assert!(matches!(result, Err(Error::DDLMalformed(_))));
863    }
864
865    #[test]
866    fn test_drop_db_statement() {
867        let sql = drop_db_statement("my_db", false).unwrap();
868        compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
869
870        let sql = drop_db_statement("my_db", true).unwrap();
871        compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
872
873        let result = drop_db_statement("", false);
874        assert!(matches!(result, Err(Error::DDLMalformed(_))));
875
876        let result = drop_db_statement("default", false);
877        assert!(matches!(result, Err(Error::DDLMalformed(_))));
878    }
879
880    #[test]
881    fn test_create_table_statement() {
882        let schema = Arc::new(Schema::new(vec![
883            Field::new("id", DataType::Int32, false),
884            Field::new("name", DataType::Utf8, true),
885        ]));
886        let options = CreateOptions::new("MergeTree")
887            .with_order_by(&["id".to_string()])
888            .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
889            .with_defaults_for_nullable();
890        let sql =
891            create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
892        compare_sql(
893            sql,
894            "CREATE TABLE IF NOT EXISTS `my_table` (\n  id Int32,\n  name Nullable(String) \
895             DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
896        );
897    }
898
899    #[test]
900    fn test_create_table_statement_with_database() {
901        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
902        let options = CreateOptions::new("Memory");
903        let sql =
904            create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
905                .unwrap();
906        compare_sql(
907            sql,
908            "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
909             BY tuple()",
910        );
911    }
912
913    #[test]
914    fn test_create_table_statement_empty_schema() {
915        let schema = Arc::new(Schema::empty());
916        let options = CreateOptions::new("MergeTree");
917        let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
918        assert!(matches!(result, Err(Error::DDLMalformed(_))));
919    }
920
921    #[test]
922    fn test_create_table_with_nullable_dictionary() {
923        let schema = Arc::new(Schema::new(vec![
924            Field::new(
925                "status",
926                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
927                true,
928            ),
929            Field::new("id", DataType::Int32, false),
930        ]));
931
932        let enum_i8 = HashMap::from_iter([(
933            "status".to_string(),
934            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
935        )]);
936
937        let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
938        let enum_options = options.clone().with_schema_conversions(enum_i8);
939
940        // If the nullable dictionary will not be converted to enum, this will fail
941        assert!(
942            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
943        );
944
945        // Otherwise it will succeed
946        let sql =
947            create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
948                .expect("Should generate valid SQL");
949
950        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
951        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
952        assert!(sql.contains("id Int32"));
953        assert!(sql.contains("ENGINE = MergeTree"));
954        assert!(sql.contains("ORDER BY (id)"));
955    }
956
957    #[test]
958    fn test_create_table_with_enum8() {
959        let schema = Arc::new(Schema::new(vec![
960            Field::new(
961                "status",
962                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
963                false,
964            ),
965            Field::new("id", DataType::Int32, false),
966        ]));
967
968        let enum_i8 = HashMap::from_iter([(
969            "status".to_string(),
970            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
971        )]);
972
973        let options = CreateOptions::new("MergeTree")
974            .with_order_by(&["id".to_string()])
975            .with_schema_conversions(enum_i8);
976
977        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
978            .expect("Should generate valid SQL");
979
980        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
981        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
982        assert!(sql.contains("id Int32"));
983        assert!(sql.contains("ENGINE = MergeTree"));
984        assert!(sql.contains("ORDER BY (id)"));
985    }
986
987    #[test]
988    fn test_create_table_with_enum16() {
989        let schema = Arc::new(Schema::new(vec![
990            Field::new(
991                "category",
992                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
993                false,
994            ),
995            Field::new("value", DataType::Float32, true),
996        ]));
997
998        let enum_i16 = HashMap::from_iter([(
999            "category".to_string(),
1000            Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
1001        )]);
1002        let options = CreateOptions::new("MergeTree")
1003            .with_order_by(&["category".to_string()])
1004            .with_schema_conversions(enum_i16);
1005
1006        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1007            .expect("Should generate valid SQL");
1008
1009        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1010        assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
1011        assert!(sql.contains("value Nullable(Float32)"));
1012        assert!(sql.contains("ENGINE = MergeTree"));
1013        assert!(sql.contains("ORDER BY (category)"));
1014    }
1015
1016    #[test]
1017    fn test_create_table_with_invalid_enum_type() {
1018        let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
1019
1020        let enum_i8 = HashMap::from_iter([(
1021            "status".to_string(),
1022            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1023        )]);
1024
1025        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1026
1027        let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
1028
1029        assert!(matches!(
1030            result,
1031            Err(Error::TypeConversion(msg))
1032            if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
1033        ));
1034    }
1035
1036    #[test]
1037    fn test_create_table_with_non_low_cardinality_enum() {
1038        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1039
1040        let enum_i8 = HashMap::from_iter([(
1041            "name".to_string(),
1042            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1043        )]);
1044        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1045
1046        let sql =
1047            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
1048
1049        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1050        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1051        assert!(sql.contains("ENGINE = MergeTree"));
1052    }
1053
1054    // The arrow data type drives nullability
1055    #[test]
1056    fn test_create_table_with_nullable_field_non_nullable_enum() {
1057        let schema = Arc::new(Schema::new(vec![
1058            Field::new("name", DataType::Utf8, true),
1059            Field::new("status", DataType::Utf8, false),
1060        ]));
1061
1062        let enum_i8 = HashMap::from_iter([
1063            (
1064                "name".to_string(),
1065                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1066                    .into_nullable(),
1067            ),
1068            (
1069                "status".to_string(),
1070                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1071                    .into_nullable(),
1072            ),
1073        ]);
1074        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1075        let arrow_options = ArrowOptions::default()
1076            // Deserialize strings as Utf8, not Binary
1077            .with_strings_as_strings(true)
1078            // Deserialize Date as Date32
1079            .with_use_date32_for_date(true)
1080            // Ignore fields that ClickHouse doesn't support.
1081            .with_strict_schema(false)
1082            .with_disable_strict_schema_ddl(true);
1083
1084        let sql = create_table_statement_from_arrow(
1085            None,
1086            "test_table",
1087            &schema,
1088            &options,
1089            Some(arrow_options),
1090        )
1091        .unwrap();
1092
1093        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1094        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1095        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1096        assert!(sql.contains("ENGINE = MergeTree"));
1097    }
1098
1099    #[test]
1100    fn test_create_table_with_mixed_enum_and_non_enum() {
1101        let schema = Arc::new(Schema::new(vec![
1102            Field::new(
1103                "status",
1104                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1105                true,
1106            ),
1107            Field::new("name", DataType::Utf8, true),
1108            Field::new(
1109                "category",
1110                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1111                false,
1112            ),
1113        ]));
1114
1115        let enums = HashMap::from_iter([
1116            (
1117                "status".to_string(),
1118                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1119            ),
1120            (
1121                "category".to_string(),
1122                Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1123            ),
1124        ]);
1125
1126        let options = CreateOptions::new("MergeTree")
1127            .with_order_by(&["category".to_string()])
1128            .with_schema_conversions(enums);
1129
1130        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1131            .expect("Should generate valid SQL");
1132
1133        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1134        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1135        assert!(sql.contains("name Nullable(String)"));
1136        assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1137        assert!(sql.contains("ENGINE = MergeTree"));
1138        assert!(sql.contains("ORDER BY (category)"));
1139    }
1140
1141    #[test]
1142    fn test_engines() {
1143        use super::ClickHouseEngine::*;
1144
1145        let engines = [
1146            MergeTree,
1147            AggregatingMergeTree,
1148            CollapsingMergeTree,
1149            ReplacingMergeTree,
1150            SummingMergeTree,
1151            Memory,
1152            Log,
1153            StripeLog,
1154            TinyLog,
1155            Other("NonExistentEngine".into()),
1156        ];
1157
1158        for engine in engines {
1159            let engine_str = engine.to_string();
1160            let engine_from = ClickHouseEngine::from(engine_str);
1161            assert_eq!(engine, engine_from);
1162        }
1163    }
1164}