Skip to main content

clickhouse_kit/
table.rs

1//! Runtime table construction from untrusted specs → `CREATE TABLE` DDL.
2//! Every identifier is validated and every type goes through the allowlist
3//! ([`ColumnTypeSpec`]), so a spec built from customer input cannot inject SQL.
4
5use crate::safety::{
6    assert_column_count, validate_identifier, ColumnTypeSpec, SchemaError, SchemaLimits,
7};
8use std::collections::HashSet;
9
10/// A single column in a runtime-built table.
11#[derive(Debug, Clone)]
12pub struct ColumnSpec {
13    pub name: String,
14    pub type_spec: ColumnTypeSpec,
15    /// Optional ClickHouse DEFAULT expression (e.g. `now()`).
16    pub default: Option<String>,
17}
18
19/// A secondary data-skipping index.
20///
21/// **Safety posture:** `name` is identifier-validated. `expression` and `type_def`
22/// are **app-controlled raw SQL** (like [`TableSpec::engine`]) — they are emitted
23/// verbatim, so never build them from untrusted input.
24#[derive(Debug, Clone)]
25pub struct IndexSpec {
26    pub name: String,
27    /// Raw, app-controlled index expression, e.g. `"trace_id"` or a real expression.
28    pub expression: String,
29    /// Raw, app-controlled index type, e.g. `"bloom_filter(0.01)"` or
30    /// `"tokenbf_v1(8192, 3, 0)"`.
31    pub type_def: String,
32    pub granularity: u32,
33}
34
35/// A move-to-volume TTL tier.
36///
37/// **Safety posture:** both fields are app-controlled raw fragments emitted verbatim.
38#[derive(Debug, Clone)]
39pub struct TtlMove {
40    /// Raw INTERVAL fragment, e.g. `"14 DAY"`.
41    pub interval: String,
42    /// Volume name, e.g. `"cold"`.
43    pub volume: String,
44}
45
46/// Table TTL policy.
47///
48/// **Safety posture:** `column` is identifier-validated **and** must be a real column
49/// in the table. `interval`/`volume`/`delete_after` are app-controlled raw fragments
50/// emitted verbatim — never build them from untrusted input.
51#[derive(Debug, Clone)]
52pub struct TtlSpec {
53    pub column: String,
54    pub move_to_volume_after: Option<TtlMove>,
55    /// Raw INTERVAL fragment for the DELETE tier, e.g. `"180 DAY"`.
56    pub delete_after: Option<String>,
57}
58
59/// A table built from a runtime spec. `engine` is app-controlled (not user input);
60/// `order_by` entries are validated as column identifiers.
61///
62/// **Safety posture for the production-DDL knobs** (`partition_by`, `ttl`, `indexes`,
63/// `settings`): these are **app-controlled raw fragments** emitted verbatim, with the
64/// sole exception that identifiers (`ttl.column`, `indexes[].name`) are validated and
65/// `ttl.column` must be a real column. Never build the raw fragments from untrusted
66/// input.
67#[derive(Debug, Clone)]
68pub struct TableSpec {
69    pub name: String,
70    pub columns: Vec<ColumnSpec>,
71    pub engine: String,
72    pub order_by: Vec<String>,
73    /// App-controlled raw `PARTITION BY` expression, e.g.
74    /// `"(organization_id, toDate(started_at))"`.
75    pub partition_by: Option<String>,
76    /// Optional table TTL policy.
77    pub ttl: Option<TtlSpec>,
78    /// Secondary data-skipping indexes rendered inside the column parens.
79    pub indexes: Vec<IndexSpec>,
80    /// App-controlled `SETTINGS` pairs (key, raw-value RHS), e.g.
81    /// `("storage_policy", "'hot_cold'")`, `("index_granularity", "8192")`.
82    pub settings: Vec<(String, String)>,
83}
84
85/// Render the `CREATE TABLE IF NOT EXISTS` DDL for a runtime spec, enforcing
86/// identifier safety, the type allowlist, column bounds, and no duplicate columns.
87pub fn to_create_table_sql(
88    table: &TableSpec,
89    limits: &SchemaLimits,
90) -> Result<String, SchemaError> {
91    validate_identifier(&table.name, "table", limits)?;
92    assert_column_count(table.columns.len(), limits)?;
93
94    let mut seen = HashSet::new();
95    let mut col_lines = Vec::with_capacity(table.columns.len());
96    for c in &table.columns {
97        validate_identifier(&c.name, "column", limits)?;
98        if !seen.insert(c.name.as_str()) {
99            return Err(SchemaError::DuplicateColumn(c.name.clone()));
100        }
101        // Validate any untrusted type parameters (e.g. parametrised DateTime64
102        // precision/timezone) before they reach the rendered SQL.
103        c.type_spec.validate()?;
104        let default = c
105            .default
106            .as_deref()
107            .map(|d| format!(" DEFAULT {d}"))
108            .unwrap_or_default();
109        col_lines.push(format!(
110            "    {} {}{}",
111            c.name,
112            c.type_spec.to_ch_type(),
113            default
114        ));
115    }
116
117    // ORDER BY entries must be real columns (validated identifiers) — no expressions
118    // from untrusted input.
119    let known: HashSet<&str> = table.columns.iter().map(|c| c.name.as_str()).collect();
120    for ob in &table.order_by {
121        validate_identifier(ob, "column", limits)?;
122        if !known.contains(ob.as_str()) {
123            return Err(SchemaError::InvalidIdentifier {
124                kind: "order_by column",
125                name: ob.clone(),
126            });
127        }
128    }
129
130    // Secondary indexes render inside the column parens. `name` is identifier-validated;
131    // `expression`/`type_def` are app-controlled raw SQL emitted verbatim.
132    let mut paren_lines = col_lines;
133    for idx in &table.indexes {
134        validate_identifier(&idx.name, "index", limits)?;
135        paren_lines.push(format!(
136            "    INDEX {} {} TYPE {} GRANULARITY {}",
137            idx.name, idx.expression, idx.type_def, idx.granularity
138        ));
139    }
140
141    let mut sql = format!(
142        "CREATE TABLE IF NOT EXISTS {} (\n{}\n)\nENGINE = {}",
143        table.name,
144        paren_lines.join(",\n"),
145        table.engine,
146    );
147
148    // PARTITION BY sits between ENGINE and ORDER BY.
149    if let Some(partition_by) = &table.partition_by {
150        sql.push_str(&format!("\nPARTITION BY {partition_by}"));
151    }
152
153    sql.push_str(&format!("\nORDER BY ({})", table.order_by.join(", ")));
154
155    // TTL: the column must be a real, validated column. DateTime64 columns are wrapped
156    // in `toDateTime(...)` for the TTL expression; everything else uses the bare column.
157    if let Some(ttl) = &table.ttl {
158        validate_identifier(&ttl.column, "column", limits)?;
159        if !known.contains(ttl.column.as_str()) {
160            return Err(SchemaError::InvalidIdentifier {
161                kind: "ttl column",
162                name: ttl.column.clone(),
163            });
164        }
165        let type_spec = table
166            .columns
167            .iter()
168            .find(|c| c.name == ttl.column)
169            .map(|c| &c.type_spec);
170        let base = match type_spec {
171            Some(ts) if ts.is_datetime64() => format!("toDateTime({})", ttl.column),
172            _ => ttl.column.clone(),
173        };
174        let mut parts = Vec::new();
175        if let Some(mv) = &ttl.move_to_volume_after {
176            parts.push(format!(
177                "{base} + INTERVAL {} TO VOLUME '{}'",
178                mv.interval, mv.volume
179            ));
180        }
181        if let Some(after) = &ttl.delete_after {
182            parts.push(format!("{base} + INTERVAL {after} DELETE"));
183        }
184        if !parts.is_empty() {
185            sql.push_str(&format!("\nTTL {}", parts.join(", ")));
186        }
187    }
188
189    // SETTINGS render last. Values are app-controlled raw RHS fragments.
190    if !table.settings.is_empty() {
191        let rendered: Vec<String> = table
192            .settings
193            .iter()
194            .map(|(k, v)| format!("{k} = {v}"))
195            .collect();
196        sql.push_str(&format!("\nSETTINGS {}", rendered.join(", ")));
197    }
198
199    Ok(sql)
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::safety::ScalarType;
206
207    fn col(name: &str, t: ColumnTypeSpec) -> ColumnSpec {
208        ColumnSpec {
209            name: name.into(),
210            type_spec: t,
211            default: None,
212        }
213    }
214
215    fn sample() -> TableSpec {
216        TableSpec {
217            name: "events".into(),
218            columns: vec![
219                col("id", ColumnTypeSpec::Scalar(ScalarType::Uuid)),
220                col("ts", ColumnTypeSpec::Scalar(ScalarType::DateTime64)),
221                col("name", ColumnTypeSpec::Scalar(ScalarType::String)),
222                col("value", ColumnTypeSpec::Scalar(ScalarType::Float64)),
223                col(
224                    "tags",
225                    ColumnTypeSpec::Array {
226                        array: crate::safety::StringOnly::String,
227                    },
228                ),
229            ],
230            engine: "MergeTree()".into(),
231            order_by: vec!["id".into()],
232            partition_by: None,
233            ttl: None,
234            indexes: vec![],
235            settings: vec![],
236        }
237    }
238
239    #[test]
240    fn renders_create_table() {
241        let ddl = to_create_table_sql(&sample(), &SchemaLimits::default()).unwrap();
242        assert!(ddl.contains("CREATE TABLE IF NOT EXISTS events ("));
243        assert!(ddl.contains("id UUID"));
244        assert!(ddl.contains("ts DateTime64(3)"));
245        assert!(ddl.contains("tags Array(String)"));
246        assert!(ddl.contains("ENGINE = MergeTree()"));
247        assert!(ddl.contains("ORDER BY (id)"));
248    }
249
250    #[test]
251    fn rejects_duplicate_and_bad_identifiers() {
252        let mut t = sample();
253        t.columns
254            .push(col("id", ColumnTypeSpec::Scalar(ScalarType::String)));
255        assert!(matches!(
256            to_create_table_sql(&t, &SchemaLimits::default()),
257            Err(SchemaError::DuplicateColumn(_))
258        ));
259
260        let mut t2 = sample();
261        t2.columns[0].name = "id; DROP TABLE x".into();
262        assert!(to_create_table_sql(&t2, &SchemaLimits::default()).is_err());
263    }
264
265    #[test]
266    fn rejects_order_by_unknown_column() {
267        let mut t = sample();
268        t.order_by = vec!["nope".into()];
269        assert!(to_create_table_sql(&t, &SchemaLimits::default()).is_err());
270    }
271
272    /// The live `observability_traces` table — a real production DDL with partitioning,
273    /// two data-skipping indexes, a two-tier TTL, and settings.
274    fn observability_traces() -> TableSpec {
275        TableSpec {
276            name: "observability_traces".into(),
277            columns: vec![
278                col("started_at", ColumnTypeSpec::Scalar(ScalarType::DateTime64)),
279                col(
280                    "organization_id",
281                    ColumnTypeSpec::LowCardinality {
282                        low_cardinality: Box::new(ColumnTypeSpec::Scalar(ScalarType::String)),
283                    },
284                ),
285                col("trace_id", ColumnTypeSpec::Scalar(ScalarType::String)),
286                col("name", ColumnTypeSpec::Scalar(ScalarType::String)),
287                col(
288                    "service_name",
289                    ColumnTypeSpec::LowCardinality {
290                        low_cardinality: Box::new(ColumnTypeSpec::Scalar(ScalarType::String)),
291                    },
292                ),
293                col("has_error", ColumnTypeSpec::Scalar(ScalarType::UInt8)),
294                col(
295                    "attributes",
296                    ColumnTypeSpec::Map {
297                        map: (
298                            crate::safety::StringOnly::String,
299                            crate::safety::StringOnly::String,
300                        ),
301                    },
302                ),
303                ColumnSpec {
304                    name: "ingested_at".into(),
305                    type_spec: ColumnTypeSpec::Scalar(ScalarType::DateTime),
306                    default: Some("now()".into()),
307                },
308            ],
309            engine: "MergeTree()".into(),
310            order_by: vec![
311                "organization_id".into(),
312                "service_name".into(),
313                "started_at".into(),
314                "trace_id".into(),
315            ],
316            partition_by: Some("(organization_id, toDate(started_at))".into()),
317            ttl: Some(TtlSpec {
318                column: "started_at".into(),
319                move_to_volume_after: Some(TtlMove {
320                    interval: "14 DAY".into(),
321                    volume: "cold".into(),
322                }),
323                delete_after: Some("180 DAY".into()),
324            }),
325            indexes: vec![
326                IndexSpec {
327                    name: "idx_trace_id".into(),
328                    expression: "trace_id".into(),
329                    type_def: "bloom_filter(0.01)".into(),
330                    granularity: 1,
331                },
332                IndexSpec {
333                    name: "idx_name".into(),
334                    expression: "name".into(),
335                    type_def: "tokenbf_v1(8192, 3, 0)".into(),
336                    granularity: 1,
337                },
338            ],
339            settings: vec![
340                ("storage_policy".into(), "'hot_cold'".into()),
341                ("index_granularity".into(), "8192".into()),
342            ],
343        }
344    }
345
346    #[test]
347    fn reproduces_observability_traces_production_ddl() {
348        let ddl = to_create_table_sql(&observability_traces(), &SchemaLimits::default()).unwrap();
349
350        // Partitioning between ENGINE and ORDER BY.
351        assert!(
352            ddl.contains("PARTITION BY (organization_id, toDate(started_at))"),
353            "{ddl}"
354        );
355        // Both INDEX lines, verbatim, inside the column parens.
356        assert!(
357            ddl.contains("    INDEX idx_trace_id trace_id TYPE bloom_filter(0.01) GRANULARITY 1"),
358            "{ddl}"
359        );
360        assert!(
361            ddl.contains("    INDEX idx_name name TYPE tokenbf_v1(8192, 3, 0) GRANULARITY 1"),
362            "{ddl}"
363        );
364        // TTL line, verbatim — started_at is DateTime64 → wrapped in toDateTime(...).
365        assert!(
366            ddl.contains("TTL toDateTime(started_at) + INTERVAL 14 DAY TO VOLUME 'cold', toDateTime(started_at) + INTERVAL 180 DAY DELETE"),
367            "{ddl}"
368        );
369        // SETTINGS line, verbatim, last.
370        assert!(
371            ddl.contains("SETTINGS storage_policy = 'hot_cold', index_granularity = 8192"),
372            "{ddl}"
373        );
374
375        // Clause ordering sanity: ENGINE < PARTITION BY < ORDER BY < TTL < SETTINGS.
376        let pos = |needle: &str| ddl.find(needle).unwrap();
377        assert!(pos("ENGINE = MergeTree()") < pos("PARTITION BY"));
378        assert!(pos("PARTITION BY") < pos("ORDER BY ("));
379        assert!(pos("ORDER BY (") < pos("TTL "));
380        assert!(pos("TTL ") < pos("SETTINGS "));
381    }
382
383    #[test]
384    fn ttl_on_plain_datetime_is_not_wrapped() {
385        let mut t = sample();
386        t.columns
387            .push(col("created", ColumnTypeSpec::Scalar(ScalarType::DateTime)));
388        t.ttl = Some(TtlSpec {
389            column: "created".into(),
390            move_to_volume_after: None,
391            delete_after: Some("30 DAY".into()),
392        });
393        let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
394        assert!(
395            ddl.contains("TTL created + INTERVAL 30 DAY DELETE"),
396            "{ddl}"
397        );
398        assert!(!ddl.contains("toDateTime(created)"), "{ddl}");
399    }
400
401    #[test]
402    fn ttl_delete_only_renders_just_delete() {
403        let mut t = sample();
404        // `ts` is DateTime64 → wrapped.
405        t.ttl = Some(TtlSpec {
406            column: "ts".into(),
407            move_to_volume_after: None,
408            delete_after: Some("90 DAY".into()),
409        });
410        let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
411        assert!(
412            ddl.contains("TTL toDateTime(ts) + INTERVAL 90 DAY DELETE"),
413            "{ddl}"
414        );
415        assert!(!ddl.contains("TO VOLUME"), "{ddl}");
416    }
417
418    #[test]
419    fn ttl_unknown_column_is_rejected() {
420        let mut t = sample();
421        t.ttl = Some(TtlSpec {
422            column: "nope".into(),
423            move_to_volume_after: None,
424            delete_after: Some("1 DAY".into()),
425        });
426        assert!(matches!(
427            to_create_table_sql(&t, &SchemaLimits::default()),
428            Err(SchemaError::InvalidIdentifier {
429                kind: "ttl column",
430                ..
431            })
432        ));
433    }
434
435    #[test]
436    fn index_with_invalid_name_is_rejected() {
437        let mut t = sample();
438        t.indexes = vec![IndexSpec {
439            name: "bad name".into(),
440            expression: "name".into(),
441            type_def: "bloom_filter(0.01)".into(),
442            granularity: 1,
443        }];
444        assert!(matches!(
445            to_create_table_sql(&t, &SchemaLimits::default()),
446            Err(SchemaError::InvalidIdentifier { kind: "index", .. })
447        ));
448    }
449
450    #[test]
451    fn backward_compat_no_extra_clauses() {
452        // With all the new knobs absent, the output is exactly the legacy shape:
453        // no PARTITION BY / TTL / SETTINGS lines, no trailing INDEX lines.
454        let ddl = to_create_table_sql(&sample(), &SchemaLimits::default()).unwrap();
455        let expected = "CREATE TABLE IF NOT EXISTS events (\n    id UUID,\n    ts DateTime64(3),\n    name String,\n    value Float64,\n    tags Array(String)\n)\nENGINE = MergeTree()\nORDER BY (id)";
456        assert_eq!(ddl, expected);
457    }
458
459    #[test]
460    fn parametrised_datetime64_column_renders_with_timezone() {
461        let mut t = sample();
462        let dt: ColumnTypeSpec =
463            serde_json::from_str(r#"{"datetime64":{"precision":3,"timezone":"UTC"}}"#).unwrap();
464        t.columns.push(col("occurred_at", dt));
465        let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
466        assert!(ddl.contains("occurred_at DateTime64(3, 'UTC')"), "{ddl}");
467    }
468
469    #[test]
470    fn parametrised_datetime64_bad_params_rejected_at_ddl_boundary() {
471        // Bad timezone is caught in the per-column loop, before reaching SQL.
472        let mut t = sample();
473        let bad_tz: ColumnTypeSpec =
474            serde_json::from_str(r#"{"datetime64":{"precision":3,"timezone":"UTC'; DROP"}}"#)
475                .unwrap();
476        t.columns.push(col("occurred_at", bad_tz));
477        assert!(matches!(
478            to_create_table_sql(&t, &SchemaLimits::default()),
479            Err(SchemaError::InvalidIdentifier {
480                kind: "timezone",
481                ..
482            })
483        ));
484
485        // Out-of-range precision is also caught.
486        let mut t2 = sample();
487        let bad_p: ColumnTypeSpec =
488            serde_json::from_str(r#"{"datetime64":{"precision":12}}"#).unwrap();
489        t2.columns.push(col("occurred_at", bad_p));
490        assert!(matches!(
491            to_create_table_sql(&t2, &SchemaLimits::default()),
492            Err(SchemaError::InvalidDateTime64Precision { precision: 12 })
493        ));
494    }
495
496    #[test]
497    fn ttl_wraps_parametrised_datetime64_column() {
498        let mut t = sample();
499        let dt: ColumnTypeSpec =
500            serde_json::from_str(r#"{"datetime64":{"precision":3,"timezone":"UTC"}}"#).unwrap();
501        t.columns.push(col("occurred_at", dt));
502        t.ttl = Some(TtlSpec {
503            column: "occurred_at".into(),
504            move_to_volume_after: None,
505            delete_after: Some("30 DAY".into()),
506        });
507        let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
508        // The parametrised DateTime64 column is still wrapped in toDateTime(...).
509        assert!(
510            ddl.contains("TTL toDateTime(occurred_at) + INTERVAL 30 DAY DELETE"),
511            "{ddl}"
512        );
513    }
514}