clickhouse_arrow/
schema.rs

1use std::collections::HashMap;
2use std::fmt::Write as _;
3use std::sync::Arc;
4
5use arrow::datatypes::SchemaRef;
6use tracing::error;
7
8use super::settings::{SettingValue, Settings};
9use crate::arrow::types::{SchemaConversions, schema_conversion};
10use crate::{ArrowOptions, ColumnDefinition, Error, Result, Row, Type};
11
12/// Non-exhaustive list of `ClickHouse` engines. Helps prevent typos when configuring the engine.
13///
14/// [`Self::Other`] can always be used in the case the list does not include the engine.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub enum ClickHouseEngine {
17    MergeTree,
18    AggregatingMergeTree,
19    CollapsingMergeTree,
20    ReplacingMergeTree,
21    SummingMergeTree,
22    Memory,
23    Log,
24    StripeLog,
25    TinyLog,
26    Other(String),
27}
28
29impl<S> From<S> for ClickHouseEngine
30where
31    S: Into<String>,
32{
33    fn from(value: S) -> Self {
34        let engine = value.into();
35        match engine.to_uppercase().as_str() {
36            "MERGETREE" => Self::MergeTree,
37            "AGGREGATINGMERGETREE" => Self::AggregatingMergeTree,
38            "COLLAPSINGMERGETREE" => Self::CollapsingMergeTree,
39            "REPLACINGMERGETREE" => Self::ReplacingMergeTree,
40            "SUMMINGMERGETREE" => Self::SummingMergeTree,
41            "MEMORY" => Self::Memory,
42            "LOG" => Self::Log,
43            "STRIPELOG" => Self::StripeLog,
44            "TINYLOG" => Self::TinyLog,
45            // Be sure to add any new engines here
46            _ => Self::Other(engine),
47        }
48    }
49}
50
51impl std::fmt::Display for ClickHouseEngine {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        // Don't use wildcard, that way it gets updated as well
54        match self {
55            Self::MergeTree => write!(f, "MergeTree"),
56            Self::AggregatingMergeTree => write!(f, "AggregatingMergeTree"),
57            Self::CollapsingMergeTree => write!(f, "CollapsingMergeTree"),
58            Self::ReplacingMergeTree => write!(f, "ReplacingMergeTree"),
59            Self::SummingMergeTree => write!(f, "SummingMergeTree"),
60            Self::Memory => write!(f, "Memory"),
61            Self::Log => write!(f, "Log"),
62            Self::StripeLog => write!(f, "StripeLog"),
63            Self::TinyLog => write!(f, "TinyLog"),
64            Self::Other(engine) => write!(f, "{engine}"),
65        }
66    }
67}
68
69/// Options for creating a `ClickHouse` table, specifying engine, ordering, partitioning, and other
70/// settings.
71///
72/// This struct is used to configure the creation of a `ClickHouse` table via
73/// [`create_table_statement_from_arrow`]. It supports common table options like `ORDER BY`,
74/// `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`, `TTL`, and custom settings. It also allows
75/// specifying default values for columns and enabling defaults for nullable columns.
76///
77/// # Examples
78/// ```rust,ignore
79/// use clickhouse_arrow::sql::CreateOptions;
80/// use clickhouse_arrow::Settings;
81///
82/// let options = CreateOptions::new("MergeTree")
83///     .with_order_by(&["id".to_string()])
84///     .with_setting("index_granularity", 4096)
85///     .with_ttl("1 DAY");
86/// ```
87#[derive(Debug, Default, Clone, PartialEq)]
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89pub struct CreateOptions {
90    pub engine:                String,
91    pub order_by:              Vec<String>,
92    pub primary_keys:          Vec<String>,
93    pub partition_by:          Option<String>,
94    pub sampling:              Option<String>,
95    pub settings:              Settings,
96    pub ttl:                   Option<String>,
97    pub schema_conversions:    Option<SchemaConversions>,
98    pub defaults:              Option<HashMap<String, String>>,
99    pub defaults_for_nullable: bool,
100}
101
102impl CreateOptions {
103    /// Creates a new `CreateOptions` with the specified engine.
104    ///
105    /// # Arguments
106    /// - `engine`: The `ClickHouse` table engine (e.g., `MergeTree`, `Memory`).
107    ///
108    /// # Returns
109    /// A new `CreateOptions` instance with the specified engine.
110    #[must_use]
111    pub fn new(engine: impl Into<String>) -> Self {
112        Self { engine: engine.into(), ..Default::default() }
113    }
114
115    /// Creates a new `CreateOptions` with the specified engine.
116    ///
117    /// # Arguments
118    /// - `engine`: The `ClickHouseEngine` .
119    ///
120    /// # Returns
121    /// A new `CreateOptions` instance with the specified engine.
122    #[must_use]
123    pub fn from_engine(engine: impl Into<ClickHouseEngine>) -> Self {
124        Self { engine: engine.into().to_string(), ..Default::default() }
125    }
126
127    /// Sets the `ORDER BY` clause for the table.
128    ///
129    /// Filters out empty strings from the provided list.
130    ///
131    /// # Arguments
132    /// - `order_by`: A slice of column names to order by.
133    ///
134    /// # Returns
135    /// Self for method chaining.
136    #[must_use]
137    pub fn with_order_by(mut self, order_by: &[String]) -> Self {
138        self.order_by =
139            order_by.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
140        self
141    }
142
143    /// Sets the `PRIMARY KEY` clause for the table.
144    ///
145    /// Filters out empty strings from the provided list.
146    ///
147    /// # Arguments
148    /// - `keys`: A slice of column names to use as primary keys.
149    ///
150    /// # Returns
151    /// Self for method chaining.
152    #[must_use]
153    pub fn with_primary_keys(mut self, keys: &[String]) -> Self {
154        self.primary_keys =
155            keys.iter().filter(|k| !k.is_empty()).map(ToString::to_string).collect();
156        self
157    }
158
159    /// Sets the `PARTITION BY` clause for the table.
160    ///
161    /// Ignores empty strings.
162    ///
163    /// # Arguments
164    /// - `partition_by`: The partitioning expression.
165    ///
166    /// # Returns
167    /// Self for method chaining.
168    #[must_use]
169    pub fn with_partition_by(mut self, partition_by: impl Into<String>) -> Self {
170        let partition_by = partition_by.into();
171        if !partition_by.is_empty() {
172            self.partition_by = Some(partition_by);
173        }
174        self
175    }
176
177    /// Sets the `SAMPLE BY` clause for the table.
178    ///
179    /// Ignores empty strings.
180    ///
181    /// # Arguments
182    /// - `sampling`: The sampling expression.
183    ///
184    /// # Returns
185    /// Self for method chaining.
186    #[must_use]
187    pub fn with_sample_by(mut self, sampling: impl Into<String>) -> Self {
188        let sampling = sampling.into();
189        if !sampling.is_empty() {
190            self.sampling = Some(sampling);
191        }
192        self
193    }
194
195    /// Sets the table settings.
196    ///
197    /// # Arguments
198    /// - `settings`: The `Settings` object containing key-value pairs.
199    ///
200    /// # Returns
201    /// Self for method chaining.
202    #[must_use]
203    pub fn with_settings(mut self, settings: Settings) -> Self {
204        self.settings = settings;
205        self
206    }
207
208    /// Sets the `TTL` clause for the table.
209    ///
210    /// Ignores empty strings.
211    ///
212    /// # Arguments
213    /// - `ttl`: The TTL expression (e.g., `1 DAY`).
214    ///
215    /// # Returns
216    /// Self for method chaining.
217    #[must_use]
218    pub fn with_ttl(mut self, ttl: impl Into<String>) -> Self {
219        let ttl = ttl.into();
220        if !ttl.is_empty() {
221            self.ttl = Some(ttl);
222        }
223        self
224    }
225
226    /// Adds a single setting to the table.
227    ///
228    /// # Arguments
229    /// - `name`: The setting name (e.g., `index_granularity`).
230    /// - `setting`: The setting value (e.g., `4096`).
231    ///
232    /// # Returns
233    /// Self for method chaining.
234    #[must_use]
235    pub fn with_setting<S>(mut self, name: impl Into<String>, setting: S) -> Self
236    where
237        SettingValue: From<S>,
238    {
239        self.settings.add_setting(name.into(), setting);
240        self
241    }
242
243    /// Sets default values for columns.
244    ///
245    /// # Arguments
246    /// - `defaults`: An iterator of (column name, default value) pairs.
247    ///
248    /// # Returns
249    /// Self for method chaining.
250    #[must_use]
251    pub fn with_defaults<I>(mut self, defaults: I) -> Self
252    where
253        I: Iterator<Item = (String, String)>,
254    {
255        self.defaults = Some(defaults.into_iter().collect::<HashMap<_, _>>());
256        self
257    }
258
259    /// Enables default values for nullable columns (e.g., `NULL`).
260    ///
261    /// # Returns
262    /// Self for method chaining.
263    #[must_use]
264    pub fn with_defaults_for_nullable(mut self) -> Self {
265        self.defaults_for_nullable = true;
266        self
267    }
268
269    /// Provide a map of resolved type conversions.
270    ///
271    /// For example, since arrow does not support enum types, providing a map of column name to
272    /// `Type::Enum8` with enum values ensures [`arrow::datatypes::DataType::Dictionary`] is
273    /// serialized as [`crate::Type::Enum16`] instead of the default [`crate::Type::LowCardinality`]
274    ///
275    /// # Returns
276    /// Self for method chaining.
277    #[must_use]
278    pub fn with_schema_conversions(mut self, map: SchemaConversions) -> Self {
279        self.schema_conversions = Some(map);
280        self
281    }
282
283    /// Returns the configured default values, if any.
284    ///
285    /// # Returns
286    /// An optional reference to the `HashMap` of column names to default values.
287    pub fn defaults(&self) -> Option<&HashMap<String, String>> { self.defaults.as_ref() }
288
289    /// Returns the configured default values, if any.
290    ///
291    /// # Returns
292    /// An optional reference to the `HashMap` of column names to default values.
293    pub fn schema_conversions(&self) -> Option<&SchemaConversions> {
294        self.schema_conversions.as_ref()
295    }
296
297    /// Builds the table options part of a `ClickHouse` `CREATE TABLE` statement.
298    ///
299    /// Constructs the SQL for engine, `ORDER BY`, `PRIMARY KEY`, `PARTITION BY`, `SAMPLE BY`,
300    /// `TTL`, and `SETTINGS` clauses. Validates constraints, such as ensuring primary keys are
301    /// a subset of `ORDER BY` columns and sampling references a primary key.
302    ///
303    /// # Returns
304    /// A `Result` containing the SQL string for the table options or a `Error` if
305    /// validation fails (e.g., empty engine, invalid primary keys).
306    ///
307    /// # Errors
308    /// - Returns `DDLMalformed` if the engine is empty, primary keys are invalid, or sampling
309    ///   doesn’t reference a primary key.
310    fn build(&self) -> Result<String> {
311        let engine = self.engine.to_string();
312        if engine.is_empty() {
313            return Err(Error::DDLMalformed("An engine is required, received empty string".into()));
314        }
315
316        let mut options = vec![format!("ENGINE = {engine}")];
317
318        // Log engines don't support options
319        if ["log", "LOG", "Log"].iter().any(|s| engine.contains(s)) {
320            return Ok(options.remove(0));
321        }
322
323        // Make sure order by is set
324        if self.order_by.is_empty() {
325            // Validations
326            if !self.primary_keys.is_empty() || !self.sampling.as_ref().is_none_or(String::is_empty)
327            {
328                return Err(Error::DDLMalformed(
329                    "Cannot specify primary keys or sampling when order by is empty".into(),
330                ));
331            }
332
333            options.push("ORDER BY tuple()".into());
334        } else {
335            let order_by = self.order_by.clone();
336
337            // Validate primary keys
338            if !self.primary_keys.is_empty()
339                && !self.primary_keys.iter().enumerate().all(|(i, k)| order_by.get(i) == Some(k))
340            {
341                return Err(Error::DDLMalformed(format!(
342                    "Primary keys but be present in order by and the ordering must match: order \
343                     by = {order_by:?}, primary keys = {:?}",
344                    self.primary_keys
345                )));
346            }
347
348            // Validate sampling
349            if let Some(sample) = self.sampling.as_ref() {
350                if !order_by.iter().any(|o| sample.contains(o.as_str())) {
351                    return Err(Error::DDLMalformed(format!(
352                        "Sampling must refer to a primary key: order by = {order_by:?}, \
353                         sampling={:?}",
354                        self.sampling
355                    )));
356                }
357            }
358
359            options.push(format!("ORDER BY ({})", order_by.join(", ")));
360        }
361
362        if !self.primary_keys.is_empty() {
363            let primary_keys = self.primary_keys.clone();
364            options.push(format!("PRIMARY KEY ({})", primary_keys.join(", ")));
365        }
366
367        if let Some(partition) = self.partition_by.as_ref() {
368            options.push(format!("PARTITION BY {partition}"));
369        }
370
371        if let Some(sample) = self.sampling.as_ref() {
372            options.push(format!("SAMPLE BY {sample}"));
373        }
374
375        if let Some(ttl) = self.ttl.as_ref() {
376            options.push(format!("TTL {ttl}"));
377        }
378
379        if !self.settings.is_empty() {
380            options.push(format!("SETTINGS {}", self.settings.encode_to_strings().join(", ")));
381        }
382
383        Ok(options.join("\n"))
384    }
385}
386
387/// Generates a `ClickHouse` `CREATE DATABASE` statement.
388///
389/// # Arguments
390/// - `database`: The name of the database to create.
391///
392/// # Returns
393/// A `Result` containing the SQL statement or a `Error` if the database name is
394/// invalid.
395///
396/// # Errors
397/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
398///
399/// # Example
400/// ```rust,ignore
401/// use clickhouse_arrow::sql::create_db_statement;
402///
403/// let sql = create_db_statement("my_db").unwrap();
404/// assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
405/// ```
406pub(crate) fn create_db_statement(database: &str) -> Result<String> {
407    if database.is_empty() {
408        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
409    }
410
411    let db = database.to_lowercase();
412    if &db == "default" {
413        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
414    }
415
416    Ok(format!("CREATE DATABASE IF NOT EXISTS {db}"))
417}
418
419/// Generates a `ClickHouse` `DROP DATABASE` statement.
420///
421/// # Arguments
422/// - `database`: The name of the database to drop.
423/// - `sync`: If `true`, adds the `SYNC` clause for synchronous dropping.
424///
425/// # Returns
426/// A `Result` containing the SQL statement or a `Error` if the database name is
427/// invalid.
428///
429/// # Errors
430/// - Returns `DDLMalformed` if the database name is empty or is `"default"`.
431///
432/// # Example
433/// ```rust,ignore
434/// use clickhouse_arrow::sql::drop_db_statement;
435///
436/// let sql = drop_db_statement("my_db", true).unwrap();
437/// assert_eq!(sql, "DROP DATABASE IF EXISTS my_db SYNC");
438/// ```
439pub(crate) fn drop_db_statement(database: &str, sync: bool) -> Result<String> {
440    if database.is_empty() {
441        return Err(Error::DDLMalformed("Database name cannot be empty".into()));
442    }
443
444    let db = database.to_lowercase();
445    if &db == "default" {
446        return Err(Error::DDLMalformed("Cannot create `default` database".into()));
447    }
448
449    let mut ddl = "DROP DATABASE IF EXISTS ".to_string();
450    ddl.push_str(&db);
451    if sync {
452        ddl.push_str(" SYNC");
453    }
454    Ok(ddl)
455}
456
457/// Generates a `ClickHouse` `CREATE TABLE` statement from an Arrow schema and table options.
458///
459/// # Arguments
460/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
461///   default database.
462/// - `table`: The table name.
463/// - `schema`: The Arrow schema defining the table’s columns.
464/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
465///
466/// # Returns
467/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
468/// options fail validation.
469///
470/// # Errors
471/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
472///   engine).
473/// - Returns `ArrowDeserialize` if the Arrow `DataType` cannot be converted to a `ClickHouse` type.
474/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
475///
476/// # Example
477/// ```rust,ignore
478/// use arrow::datatypes::{DataType, Field, Schema};
479/// use clickhouse_arrow::sql::{CreateOptions, create_table_statement_from_arrow};
480/// use std::sync::Arc;
481///
482/// let schema = Arc::new(Schema::new(vec![
483///     Field::new("id", DataType::Int32, false),
484///     Field::new("name", DataType::Utf8, true),
485/// ]));
486/// let options = CreateOptions::new("MergeTree")
487///     .with_order_by(&["id".to_string()]);
488/// let sql = create_table_statement_from_arrow(None, "my_table", &schema, &options).unwrap();
489/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
490/// ```
491pub(crate) fn create_table_statement_from_arrow(
492    database: Option<&str>,
493    table: &str,
494    schema: &SchemaRef,
495    options: &CreateOptions,
496    arrow_options: Option<ArrowOptions>,
497) -> Result<String> {
498    if schema.fields().is_empty() {
499        return Err(Error::DDLMalformed("Arrow Schema is empty, cannot create table".into()));
500    }
501    let definition = RecordBatchDefinition {
502        arrow_options,
503        schema: Arc::clone(schema),
504        defaults: options.defaults().cloned(),
505    };
506    create_table_statement(database, table, Some(definition), options)
507}
508
509/// Generates a `ClickHouse` `CREATE TABLE` statement from a type that implements [`crate::Row`] and
510/// [`CreateOptions`].
511///
512/// # Arguments
513/// - `database`: Optional database name (e.g., `my_db`). If `None`, the table is created in the
514///   default database.
515/// - `table`: The table name.
516/// - `options`: The `CreateOptions` specifying engine, ordering, and other settings.
517///
518/// # Returns
519/// A `Result` containing the SQL statement or a `Error` if the schema is invalid or
520/// options fail validation.
521///
522/// # Errors
523/// - Returns `DDLMalformed` if the schema is empty or options validation fails (e.g., invalid
524///   engine).
525/// - Returns `TypeConversion` if the schema is disallowed by `ClickHouse`
526///
527/// # Example
528/// ```rust,ignore
529/// use clickhouse_arrow::Row;
530/// use clickhouse_arrow::sql::{CreateOptions, create_table_statement_from_native};
531///
532/// #[derive(Row)]
533/// struct MyRow {
534///     id: String,
535///     name: String,
536/// }
537///
538/// let options = CreateOptions::new("MergeTree")
539///     .with_order_by(&["id".to_string()]);
540/// let sql = create_table_statement_from_native::<MyRow>(None, "my_table", &options).unwrap();
541/// assert!(sql.contains("CREATE TABLE IF NOT EXISTS `my_table`"));
542/// ```
543pub(crate) fn create_table_statement_from_native<T: Row>(
544    database: Option<&str>,
545    table: &str,
546    options: &CreateOptions,
547) -> Result<String> {
548    create_table_statement::<T>(database, table, None, options)
549}
550
551pub(crate) fn create_table_statement<T: ColumnDefine>(
552    database: Option<&str>,
553    table: &str,
554    schema: Option<T>,
555    options: &CreateOptions,
556) -> Result<String> {
557    let column_definitions = schema
558        .map(|s| s.runtime_definitions(options.schema_conversions.as_ref()))
559        .transpose()?
560        .flatten()
561        .or(T::definitions());
562
563    let Some(definitions) = column_definitions.filter(|c| !c.is_empty()) else {
564        return Err(Error::DDLMalformed("Schema is empty, cannot create table".into()));
565    };
566
567    let db_pre = database.map(|c| format!("{c}.")).unwrap_or_default();
568    let table = table.trim_matches('`');
569    let mut sql = String::new();
570    let _ = writeln!(sql, "CREATE TABLE IF NOT EXISTS {db_pre}`{table}` (");
571
572    let total = definitions.len();
573    for (i, (name, type_, default_value)) in definitions.into_iter().enumerate() {
574        let _ = write!(sql, "  {name} {type_}");
575        if let Some(d) = options
576            .defaults
577            .as_ref()
578            .and_then(|d| d.get(&name))
579            .or(default_value.map(|d| d.to_string()).as_ref())
580        {
581            let _ = write!(sql, " DEFAULT");
582            if !d.is_empty() && d != "NULL" {
583                let _ = write!(sql, " {d}");
584            }
585        } else if options.defaults_for_nullable && matches!(type_, Type::Nullable(_)) {
586            let _ = write!(sql, " DEFAULT");
587        }
588
589        if i < (total - 1) {
590            let _ = writeln!(sql, ",");
591        }
592    }
593
594    let _ = writeln!(sql, "\n)");
595    let _ = write!(sql, "{}", options.build()?);
596
597    Ok(sql)
598}
599
600/// A type that describe the schema of its fields to be used in a `CREATE TABLE ...` query.
601///
602/// Generally this is not implemented manually, but using `clickhouse_arrow::Row` since it's
603/// implemented on any `T: Row`. But it's helpful to implement manually if additional formats are
604/// created.
605pub trait ColumnDefine: Sized {
606    type DefaultValue: std::fmt::Display + std::fmt::Debug;
607
608    /// Provide the static schema
609    fn definitions() -> Option<Vec<ColumnDefinition<Self::DefaultValue>>>;
610
611    /// Infers the schema and returns it.
612    ///
613    /// # Errors
614    ///
615    /// Returns an error defined by the implementation
616    fn runtime_definitions(
617        &self,
618        _: Option<&HashMap<String, Type>>,
619    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
620        Ok(Self::definitions())
621    }
622}
623
624impl<T: Row> ColumnDefine for T {
625    type DefaultValue = crate::Value;
626
627    fn definitions() -> Option<Vec<ColumnDefinition>> { Self::to_schema() }
628
629    fn runtime_definitions(
630        &self,
631        conversions: Option<&HashMap<String, Type>>,
632    ) -> Result<Option<Vec<ColumnDefinition<Self::DefaultValue>>>> {
633        let Some(static_definitions) = Self::definitions() else {
634            return Ok(None);
635        };
636
637        if let Some(conversions) = conversions {
638            return Ok(Some(
639                static_definitions
640                    .into_iter()
641                    .map(|(name, type_, default_value)| {
642                        let resolved_type = conversions.get(&name).cloned().unwrap_or(type_);
643                        (name, resolved_type, default_value)
644                    })
645                    .collect::<Vec<_>>(),
646            ));
647        }
648
649        Ok(Some(static_definitions))
650    }
651}
652
653/// Helper struct to encapsulate schema creation logic for Arrow schemas.
654pub(crate) struct RecordBatchDefinition {
655    pub(crate) arrow_options: Option<ArrowOptions>,
656    pub(crate) schema:        SchemaRef,
657    pub(crate) defaults:      Option<HashMap<String, String>>,
658}
659
660impl ColumnDefine for RecordBatchDefinition {
661    type DefaultValue = String;
662
663    fn definitions() -> Option<Vec<ColumnDefinition<String>>> { None }
664
665    fn runtime_definitions(
666        &self,
667        conversions: Option<&HashMap<String, Type>>,
668    ) -> Result<Option<Vec<ColumnDefinition<String>>>> {
669        let mut fields = Vec::with_capacity(self.schema.fields.len());
670        for field in self.schema.fields() {
671            let type_ =
672                schema_conversion(field, conversions, self.arrow_options).inspect_err(|error| {
673                    error!("Arrow conversion failed for field {field:?}: {error}");
674                })?;
675            let default_val =
676                if let Some(d) = self.defaults.as_ref().and_then(|d| d.get(field.name())) {
677                    if !d.is_empty() && d != "NULL" { Some(d.clone()) } else { None }
678                } else {
679                    None
680                };
681            fields.push((field.name().to_string(), type_, default_val));
682        }
683        Ok(Some(fields))
684    }
685}
686
687#[cfg(test)]
688mod tests {
689    use std::sync::Arc;
690
691    use arrow::datatypes::{DataType, Field, Schema};
692
693    use super::{ClickHouseEngine, *};
694    use crate::Type;
695
696    #[allow(clippy::needless_pass_by_value)]
697    fn compare_sql(left: impl AsRef<str> + Into<String>, right: impl AsRef<str> + Into<String>) {
698        assert_eq!(left.as_ref().replace(['\n', ' '], ""), right.as_ref().replace(['\n', ' '], ""));
699    }
700
701    #[test]
702    fn test_create_options_new() {
703        let options = CreateOptions::new("MergeTree");
704        assert_eq!(options.engine, "MergeTree");
705        assert!(options.order_by.is_empty());
706        assert!(options.primary_keys.is_empty());
707        assert!(options.partition_by.is_none());
708        assert!(options.sampling.is_none());
709        assert!(options.settings.is_empty());
710        assert!(options.ttl.is_none());
711        assert!(options.defaults.is_none());
712        assert!(!options.defaults_for_nullable);
713    }
714
715    #[test]
716    fn test_create_options_with_order_by() {
717        let options = CreateOptions::new("MergeTree").with_order_by(&[
718            "id".to_string(),
719            String::new(),
720            "name".to_string(),
721        ]);
722        assert_eq!(options.order_by, vec!["id".to_string(), "name".to_string()]);
723    }
724
725    #[test]
726    fn test_create_options_with_primary_keys() {
727        let options = CreateOptions::new("MergeTree").with_primary_keys(&[
728            "id".to_string(),
729            String::new(),
730            "name".to_string(),
731        ]);
732        assert_eq!(options.primary_keys, vec!["id".to_string(), "name".to_string()]);
733    }
734
735    #[test]
736    fn test_create_options_with_partition_by() {
737        let options = CreateOptions::new("MergeTree").with_partition_by("toYYYYMM(date)");
738        assert_eq!(options.partition_by, Some("toYYYYMM(date)".to_string()));
739
740        let options = CreateOptions::new("MergeTree").with_partition_by("");
741        assert_eq!(options.partition_by, None);
742    }
743
744    #[test]
745    fn test_create_options_with_sample_by() {
746        let options = CreateOptions::new("MergeTree").with_sample_by("cityHash64(id)");
747        assert_eq!(options.sampling, Some("cityHash64(id)".to_string()));
748
749        let options = CreateOptions::new("MergeTree").with_sample_by("");
750        assert_eq!(options.sampling, None);
751    }
752
753    #[test]
754    fn test_create_options_with_settings() {
755        let settings = Settings::default().with_setting("index_granularity", 4096);
756        let options = CreateOptions::new("MergeTree").with_settings(settings.clone());
757        assert_eq!(options.settings, settings);
758    }
759
760    #[test]
761    fn test_create_options_with_setting() {
762        let options = CreateOptions::new("MergeTree").with_setting("index_granularity", 4096);
763        assert_eq!(options.settings.encode_to_strings(), vec![
764            "index_granularity = 4096".to_string()
765        ]);
766    }
767
768    #[test]
769    fn test_create_options_with_ttl() {
770        let options = CreateOptions::new("MergeTree").with_ttl("1 DAY");
771        assert_eq!(options.ttl, Some("1 DAY".to_string()));
772
773        let options = CreateOptions::new("MergeTree").with_ttl("");
774        assert_eq!(options.ttl, None);
775    }
776
777    #[test]
778    fn test_create_options_with_defaults() {
779        let defaults = vec![
780            ("id".to_string(), "0".to_string()),
781            ("name".to_string(), "'unknown'".to_string()),
782        ];
783        let options = CreateOptions::new("MergeTree").with_defaults(defaults.into_iter());
784        assert_eq!(
785            options.defaults,
786            Some(HashMap::from([
787                ("id".to_string(), "0".to_string()),
788                ("name".to_string(), "'unknown'".to_string()),
789            ]))
790        );
791    }
792
793    #[test]
794    fn test_create_options_with_defaults_for_nullable() {
795        let options = CreateOptions::new("MergeTree").with_defaults_for_nullable();
796        assert!(options.defaults_for_nullable);
797    }
798
799    #[test]
800    fn test_create_options_build_merge_tree() {
801        let options = CreateOptions::new("MergeTree")
802            .with_order_by(&["id".to_string(), "date".to_string()])
803            .with_primary_keys(&["id".to_string()])
804            .with_partition_by("toYYYYMM(date)")
805            .with_sample_by("cityHash64(id)")
806            .with_ttl("1 DAY")
807            .with_setting("index_granularity", 4096);
808        let sql = options.build().unwrap();
809        compare_sql(
810            sql,
811            "ENGINE = MergeTree\nORDER BY (id, date)\nPRIMARY KEY (id)\nPARTITION BY \
812             toYYYYMM(date)\nSAMPLE BY cityHash64(id)\nTTL 1 DAY\nSETTINGS index_granularity = \
813             4096",
814        );
815    }
816
817    #[test]
818    fn test_create_options_build_log_engine() {
819        let options = CreateOptions::new("TinyLog");
820        let sql = options.build().unwrap();
821        assert_eq!(sql, "ENGINE = TinyLog");
822    }
823
824    #[test]
825    fn test_create_options_build_empty_order_by() {
826        let options = CreateOptions::new("MergeTree");
827        let sql = options.build().unwrap();
828        compare_sql(sql, "ENGINE = MergeTree\nORDER BY tuple()");
829    }
830
831    #[test]
832    fn test_create_options_build_invalid_engine() {
833        let options = CreateOptions::new("");
834        let result = options.build();
835        assert!(matches!(result, Err(Error::DDLMalformed(_))));
836    }
837
838    #[test]
839    fn test_create_options_build_invalid_primary_keys() {
840        let options = CreateOptions::new("MergeTree")
841            .with_order_by(&["id".to_string()])
842            .with_primary_keys(&["name".to_string()]);
843        let result = options.build();
844        assert!(matches!(result, Err(Error::DDLMalformed(_))));
845    }
846
847    #[test]
848    fn test_create_options_build_invalid_sampling() {
849        let options = CreateOptions::new("MergeTree")
850            .with_order_by(&["id".to_string()])
851            .with_sample_by("cityHash64(name)");
852        let result = options.build();
853        assert!(matches!(result, Err(Error::DDLMalformed(_))));
854    }
855
856    #[test]
857    fn test_create_db_statement() {
858        let sql = create_db_statement("my_db").unwrap();
859        assert_eq!(sql, "CREATE DATABASE IF NOT EXISTS my_db");
860
861        let result = create_db_statement("");
862        assert!(matches!(result, Err(Error::DDLMalformed(_))));
863
864        let result = create_db_statement("default");
865        assert!(matches!(result, Err(Error::DDLMalformed(_))));
866    }
867
868    #[test]
869    fn test_drop_db_statement() {
870        let sql = drop_db_statement("my_db", false).unwrap();
871        compare_sql(sql, "DROP DATABASE IF EXISTS my_db");
872
873        let sql = drop_db_statement("my_db", true).unwrap();
874        compare_sql(sql, "DROP DATABASE IF EXISTS my_db SYNC");
875
876        let result = drop_db_statement("", false);
877        assert!(matches!(result, Err(Error::DDLMalformed(_))));
878
879        let result = drop_db_statement("default", false);
880        assert!(matches!(result, Err(Error::DDLMalformed(_))));
881    }
882
883    #[test]
884    fn test_create_table_statement() {
885        let schema = Arc::new(Schema::new(vec![
886            Field::new("id", DataType::Int32, false),
887            Field::new("name", DataType::Utf8, true),
888        ]));
889        let options = CreateOptions::new("MergeTree")
890            .with_order_by(&["id".to_string()])
891            .with_defaults(vec![("name".to_string(), "'unknown'".to_string())].into_iter())
892            .with_defaults_for_nullable();
893        let sql =
894            create_table_statement_from_arrow(None, "my_table", &schema, &options, None).unwrap();
895        compare_sql(
896            sql,
897            "CREATE TABLE IF NOT EXISTS `my_table` (\n  id Int32,\n  name Nullable(String) \
898             DEFAULT 'unknown'\n)\nENGINE = MergeTree\nORDER BY (id)",
899        );
900    }
901
902    #[test]
903    fn test_create_table_statement_with_database() {
904        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
905        let options = CreateOptions::new("Memory");
906        let sql =
907            create_table_statement_from_arrow(Some("my_db"), "my_table", &schema, &options, None)
908                .unwrap();
909        compare_sql(
910            sql,
911            "CREATE TABLE IF NOT EXISTS my_db.`my_table` (\nid Int32\n)\nENGINE = Memory\nORDER \
912             BY tuple()",
913        );
914    }
915
916    #[test]
917    fn test_create_table_statement_empty_schema() {
918        let schema = Arc::new(Schema::empty());
919        let options = CreateOptions::new("MergeTree");
920        let result = create_table_statement_from_arrow(None, "my_table", &schema, &options, None);
921        assert!(matches!(result, Err(Error::DDLMalformed(_))));
922    }
923
924    #[test]
925    fn test_create_table_with_nullable_dictionary() {
926        let schema = Arc::new(Schema::new(vec![
927            Field::new(
928                "status",
929                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
930                true,
931            ),
932            Field::new("id", DataType::Int32, false),
933        ]));
934
935        let enum_i8 = HashMap::from_iter([(
936            "status".to_string(),
937            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
938        )]);
939
940        let options = CreateOptions::new("MergeTree").with_order_by(&["id".to_string()]);
941        let enum_options = options.clone().with_schema_conversions(enum_i8);
942
943        // If the nullable dictionary will not be converted to enum, this will fail
944        assert!(
945            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).is_err()
946        );
947
948        // Otherwise it will succeed
949        let sql =
950            create_table_statement_from_arrow(None, "test_table", &schema, &enum_options, None)
951                .expect("Should generate valid SQL");
952
953        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
954        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
955        assert!(sql.contains("id Int32"));
956        assert!(sql.contains("ENGINE = MergeTree"));
957        assert!(sql.contains("ORDER BY (id)"));
958    }
959
960    #[test]
961    fn test_create_table_with_enum8() {
962        let schema = Arc::new(Schema::new(vec![
963            Field::new(
964                "status",
965                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
966                false,
967            ),
968            Field::new("id", DataType::Int32, false),
969        ]));
970
971        let enum_i8 = HashMap::from_iter([(
972            "status".to_string(),
973            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
974        )]);
975
976        let options = CreateOptions::new("MergeTree")
977            .with_order_by(&["id".to_string()])
978            .with_schema_conversions(enum_i8);
979
980        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
981            .expect("Should generate valid SQL");
982
983        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
984        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
985        assert!(sql.contains("id Int32"));
986        assert!(sql.contains("ENGINE = MergeTree"));
987        assert!(sql.contains("ORDER BY (id)"));
988    }
989
990    #[test]
991    fn test_create_table_with_enum16() {
992        let schema = Arc::new(Schema::new(vec![
993            Field::new(
994                "category",
995                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
996                false,
997            ),
998            Field::new("value", DataType::Float32, true),
999        ]));
1000
1001        let enum_i16 = HashMap::from_iter([(
1002            "category".to_string(),
1003            Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2), ("z".to_string(), 3)]),
1004        )]);
1005        let options = CreateOptions::new("MergeTree")
1006            .with_order_by(&["category".to_string()])
1007            .with_schema_conversions(enum_i16);
1008
1009        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1010            .expect("Should generate valid SQL");
1011
1012        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1013        assert!(sql.contains("category Enum16('x' = 1,'y' = 2,'z' = 3)"));
1014        assert!(sql.contains("value Nullable(Float32)"));
1015        assert!(sql.contains("ENGINE = MergeTree"));
1016        assert!(sql.contains("ORDER BY (category)"));
1017    }
1018
1019    #[test]
1020    fn test_create_table_with_invalid_enum_type() {
1021        let schema = Arc::new(Schema::new(vec![Field::new("status", DataType::Int32, true)]));
1022
1023        let enum_i8 = HashMap::from_iter([(
1024            "status".to_string(),
1025            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1026        )]);
1027
1028        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1029
1030        let result = create_table_statement_from_arrow(None, "test_table", &schema, &options, None);
1031
1032        assert!(matches!(
1033            result,
1034            Err(Error::TypeConversion(msg))
1035            if msg.contains("expected LowCardinality(String) or String/Binary, found Nullable(Int32)")
1036        ));
1037    }
1038
1039    #[test]
1040    fn test_create_table_with_non_low_cardinality_enum() {
1041        let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
1042
1043        let enum_i8 = HashMap::from_iter([(
1044            "name".to_string(),
1045            Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1046        )]);
1047        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1048
1049        let sql =
1050            create_table_statement_from_arrow(None, "test_table", &schema, &options, None).unwrap();
1051
1052        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1053        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1054        assert!(sql.contains("ENGINE = MergeTree"));
1055    }
1056
1057    // The arrow data type drives nullability
1058    #[test]
1059    fn test_create_table_with_nullable_field_non_nullable_enum() {
1060        let schema = Arc::new(Schema::new(vec![
1061            Field::new("name", DataType::Utf8, true),
1062            Field::new("status", DataType::Utf8, false),
1063        ]));
1064
1065        let enum_i8 = HashMap::from_iter([
1066            (
1067                "name".to_string(),
1068                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1069                    .into_nullable(),
1070            ),
1071            (
1072                "status".to_string(),
1073                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)])
1074                    .into_nullable(),
1075            ),
1076        ]);
1077        let options = CreateOptions::new("MergeTree").with_schema_conversions(enum_i8);
1078        let arrow_options = ArrowOptions::default()
1079            // Deserialize strings as Utf8, not Binary
1080            .with_strings_as_strings(true)
1081            // Deserialize Date as Date32
1082            .with_use_date32_for_date(true)
1083            // Ignore fields that ClickHouse doesn't support.
1084            .with_strict_schema(false)
1085            .with_disable_strict_schema_ddl(true);
1086
1087        let sql = create_table_statement_from_arrow(
1088            None,
1089            "test_table",
1090            &schema,
1091            &options,
1092            Some(arrow_options),
1093        )
1094        .unwrap();
1095
1096        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1097        assert!(sql.contains("name Nullable(Enum8('active' = 1,'inactive' = 2))"));
1098        assert!(sql.contains("status Enum8('active' = 1,'inactive' = 2)"));
1099        assert!(sql.contains("ENGINE = MergeTree"));
1100    }
1101
1102    #[test]
1103    fn test_create_table_with_mixed_enum_and_non_enum() {
1104        let schema = Arc::new(Schema::new(vec![
1105            Field::new(
1106                "status",
1107                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
1108                true,
1109            ),
1110            Field::new("name", DataType::Utf8, true),
1111            Field::new(
1112                "category",
1113                DataType::Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
1114                false,
1115            ),
1116        ]));
1117
1118        let enums = HashMap::from_iter([
1119            (
1120                "status".to_string(),
1121                Type::Enum8(vec![("active".to_string(), 1_i8), ("inactive".to_string(), 2)]),
1122            ),
1123            (
1124                "category".to_string(),
1125                Type::Enum16(vec![("x".to_string(), 1), ("y".to_string(), 2)]),
1126            ),
1127        ]);
1128
1129        let options = CreateOptions::new("MergeTree")
1130            .with_order_by(&["category".to_string()])
1131            .with_schema_conversions(enums);
1132
1133        let sql = create_table_statement_from_arrow(None, "test_table", &schema, &options, None)
1134            .expect("Should generate valid SQL");
1135
1136        assert!(sql.contains("CREATE TABLE IF NOT EXISTS `test_table`"));
1137        assert!(sql.contains("status Nullable(Enum8('active' = 1,'inactive' = 2))"));
1138        assert!(sql.contains("name Nullable(String)"));
1139        assert!(sql.contains("category Enum16('x' = 1,'y' = 2)"));
1140        assert!(sql.contains("ENGINE = MergeTree"));
1141        assert!(sql.contains("ORDER BY (category)"));
1142    }
1143
1144    #[test]
1145    fn test_engines() {
1146        use super::ClickHouseEngine::*;
1147
1148        let engines = [
1149            MergeTree,
1150            AggregatingMergeTree,
1151            CollapsingMergeTree,
1152            ReplacingMergeTree,
1153            SummingMergeTree,
1154            Memory,
1155            Log,
1156            StripeLog,
1157            TinyLog,
1158            Other("NonExistentEngine".into()),
1159        ];
1160
1161        for engine in engines {
1162            let engine_str = engine.to_string();
1163            let engine_from = ClickHouseEngine::from(engine_str);
1164            assert_eq!(engine, engine_from);
1165        }
1166    }
1167}