1use crate::safety::{
6 assert_column_count, validate_identifier, ColumnTypeSpec, SchemaError, SchemaLimits,
7};
8use std::collections::HashSet;
9
10#[derive(Debug, Clone)]
12pub struct ColumnSpec {
13 pub name: String,
14 pub type_spec: ColumnTypeSpec,
15 pub default: Option<String>,
17}
18
19#[derive(Debug, Clone)]
25pub struct IndexSpec {
26 pub name: String,
27 pub expression: String,
29 pub type_def: String,
32 pub granularity: u32,
33}
34
35#[derive(Debug, Clone)]
39pub struct TtlMove {
40 pub interval: String,
42 pub volume: String,
44}
45
46#[derive(Debug, Clone)]
52pub struct TtlSpec {
53 pub column: String,
54 pub move_to_volume_after: Option<TtlMove>,
55 pub delete_after: Option<String>,
57}
58
59#[derive(Debug, Clone)]
68pub struct TableSpec {
69 pub name: String,
70 pub columns: Vec<ColumnSpec>,
71 pub engine: String,
72 pub order_by: Vec<String>,
73 pub partition_by: Option<String>,
76 pub ttl: Option<TtlSpec>,
78 pub indexes: Vec<IndexSpec>,
80 pub settings: Vec<(String, String)>,
83}
84
85pub fn to_create_table_sql(
88 table: &TableSpec,
89 limits: &SchemaLimits,
90) -> Result<String, SchemaError> {
91 validate_identifier(&table.name, "table", limits)?;
92 assert_column_count(table.columns.len(), limits)?;
93
94 let mut seen = HashSet::new();
95 let mut col_lines = Vec::with_capacity(table.columns.len());
96 for c in &table.columns {
97 validate_identifier(&c.name, "column", limits)?;
98 if !seen.insert(c.name.as_str()) {
99 return Err(SchemaError::DuplicateColumn(c.name.clone()));
100 }
101 c.type_spec.validate()?;
104 let default = c
105 .default
106 .as_deref()
107 .map(|d| format!(" DEFAULT {d}"))
108 .unwrap_or_default();
109 col_lines.push(format!(
110 " {} {}{}",
111 c.name,
112 c.type_spec.to_ch_type(),
113 default
114 ));
115 }
116
117 let known: HashSet<&str> = table.columns.iter().map(|c| c.name.as_str()).collect();
120 for ob in &table.order_by {
121 validate_identifier(ob, "column", limits)?;
122 if !known.contains(ob.as_str()) {
123 return Err(SchemaError::InvalidIdentifier {
124 kind: "order_by column",
125 name: ob.clone(),
126 });
127 }
128 }
129
130 let mut paren_lines = col_lines;
133 for idx in &table.indexes {
134 validate_identifier(&idx.name, "index", limits)?;
135 paren_lines.push(format!(
136 " INDEX {} {} TYPE {} GRANULARITY {}",
137 idx.name, idx.expression, idx.type_def, idx.granularity
138 ));
139 }
140
141 let mut sql = format!(
142 "CREATE TABLE IF NOT EXISTS {} (\n{}\n)\nENGINE = {}",
143 table.name,
144 paren_lines.join(",\n"),
145 table.engine,
146 );
147
148 if let Some(partition_by) = &table.partition_by {
150 sql.push_str(&format!("\nPARTITION BY {partition_by}"));
151 }
152
153 sql.push_str(&format!("\nORDER BY ({})", table.order_by.join(", ")));
154
155 if let Some(ttl) = &table.ttl {
158 validate_identifier(&ttl.column, "column", limits)?;
159 if !known.contains(ttl.column.as_str()) {
160 return Err(SchemaError::InvalidIdentifier {
161 kind: "ttl column",
162 name: ttl.column.clone(),
163 });
164 }
165 let type_spec = table
166 .columns
167 .iter()
168 .find(|c| c.name == ttl.column)
169 .map(|c| &c.type_spec);
170 let base = match type_spec {
171 Some(ts) if ts.is_datetime64() => format!("toDateTime({})", ttl.column),
172 _ => ttl.column.clone(),
173 };
174 let mut parts = Vec::new();
175 if let Some(mv) = &ttl.move_to_volume_after {
176 parts.push(format!(
177 "{base} + INTERVAL {} TO VOLUME '{}'",
178 mv.interval, mv.volume
179 ));
180 }
181 if let Some(after) = &ttl.delete_after {
182 parts.push(format!("{base} + INTERVAL {after} DELETE"));
183 }
184 if !parts.is_empty() {
185 sql.push_str(&format!("\nTTL {}", parts.join(", ")));
186 }
187 }
188
189 if !table.settings.is_empty() {
191 let rendered: Vec<String> = table
192 .settings
193 .iter()
194 .map(|(k, v)| format!("{k} = {v}"))
195 .collect();
196 sql.push_str(&format!("\nSETTINGS {}", rendered.join(", ")));
197 }
198
199 Ok(sql)
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::safety::ScalarType;
206
207 fn col(name: &str, t: ColumnTypeSpec) -> ColumnSpec {
208 ColumnSpec {
209 name: name.into(),
210 type_spec: t,
211 default: None,
212 }
213 }
214
215 fn sample() -> TableSpec {
216 TableSpec {
217 name: "events".into(),
218 columns: vec![
219 col("id", ColumnTypeSpec::Scalar(ScalarType::Uuid)),
220 col("ts", ColumnTypeSpec::Scalar(ScalarType::DateTime64)),
221 col("name", ColumnTypeSpec::Scalar(ScalarType::String)),
222 col("value", ColumnTypeSpec::Scalar(ScalarType::Float64)),
223 col(
224 "tags",
225 ColumnTypeSpec::Array {
226 array: crate::safety::StringOnly::String,
227 },
228 ),
229 ],
230 engine: "MergeTree()".into(),
231 order_by: vec!["id".into()],
232 partition_by: None,
233 ttl: None,
234 indexes: vec![],
235 settings: vec![],
236 }
237 }
238
239 #[test]
240 fn renders_create_table() {
241 let ddl = to_create_table_sql(&sample(), &SchemaLimits::default()).unwrap();
242 assert!(ddl.contains("CREATE TABLE IF NOT EXISTS events ("));
243 assert!(ddl.contains("id UUID"));
244 assert!(ddl.contains("ts DateTime64(3)"));
245 assert!(ddl.contains("tags Array(String)"));
246 assert!(ddl.contains("ENGINE = MergeTree()"));
247 assert!(ddl.contains("ORDER BY (id)"));
248 }
249
250 #[test]
251 fn rejects_duplicate_and_bad_identifiers() {
252 let mut t = sample();
253 t.columns
254 .push(col("id", ColumnTypeSpec::Scalar(ScalarType::String)));
255 assert!(matches!(
256 to_create_table_sql(&t, &SchemaLimits::default()),
257 Err(SchemaError::DuplicateColumn(_))
258 ));
259
260 let mut t2 = sample();
261 t2.columns[0].name = "id; DROP TABLE x".into();
262 assert!(to_create_table_sql(&t2, &SchemaLimits::default()).is_err());
263 }
264
265 #[test]
266 fn rejects_order_by_unknown_column() {
267 let mut t = sample();
268 t.order_by = vec!["nope".into()];
269 assert!(to_create_table_sql(&t, &SchemaLimits::default()).is_err());
270 }
271
272 fn observability_traces() -> TableSpec {
275 TableSpec {
276 name: "observability_traces".into(),
277 columns: vec![
278 col("started_at", ColumnTypeSpec::Scalar(ScalarType::DateTime64)),
279 col(
280 "organization_id",
281 ColumnTypeSpec::LowCardinality {
282 low_cardinality: Box::new(ColumnTypeSpec::Scalar(ScalarType::String)),
283 },
284 ),
285 col("trace_id", ColumnTypeSpec::Scalar(ScalarType::String)),
286 col("name", ColumnTypeSpec::Scalar(ScalarType::String)),
287 col(
288 "service_name",
289 ColumnTypeSpec::LowCardinality {
290 low_cardinality: Box::new(ColumnTypeSpec::Scalar(ScalarType::String)),
291 },
292 ),
293 col("has_error", ColumnTypeSpec::Scalar(ScalarType::UInt8)),
294 col(
295 "attributes",
296 ColumnTypeSpec::Map {
297 map: (
298 crate::safety::StringOnly::String,
299 crate::safety::StringOnly::String,
300 ),
301 },
302 ),
303 ColumnSpec {
304 name: "ingested_at".into(),
305 type_spec: ColumnTypeSpec::Scalar(ScalarType::DateTime),
306 default: Some("now()".into()),
307 },
308 ],
309 engine: "MergeTree()".into(),
310 order_by: vec![
311 "organization_id".into(),
312 "service_name".into(),
313 "started_at".into(),
314 "trace_id".into(),
315 ],
316 partition_by: Some("(organization_id, toDate(started_at))".into()),
317 ttl: Some(TtlSpec {
318 column: "started_at".into(),
319 move_to_volume_after: Some(TtlMove {
320 interval: "14 DAY".into(),
321 volume: "cold".into(),
322 }),
323 delete_after: Some("180 DAY".into()),
324 }),
325 indexes: vec![
326 IndexSpec {
327 name: "idx_trace_id".into(),
328 expression: "trace_id".into(),
329 type_def: "bloom_filter(0.01)".into(),
330 granularity: 1,
331 },
332 IndexSpec {
333 name: "idx_name".into(),
334 expression: "name".into(),
335 type_def: "tokenbf_v1(8192, 3, 0)".into(),
336 granularity: 1,
337 },
338 ],
339 settings: vec![
340 ("storage_policy".into(), "'hot_cold'".into()),
341 ("index_granularity".into(), "8192".into()),
342 ],
343 }
344 }
345
346 #[test]
347 fn reproduces_observability_traces_production_ddl() {
348 let ddl = to_create_table_sql(&observability_traces(), &SchemaLimits::default()).unwrap();
349
350 assert!(
352 ddl.contains("PARTITION BY (organization_id, toDate(started_at))"),
353 "{ddl}"
354 );
355 assert!(
357 ddl.contains(" INDEX idx_trace_id trace_id TYPE bloom_filter(0.01) GRANULARITY 1"),
358 "{ddl}"
359 );
360 assert!(
361 ddl.contains(" INDEX idx_name name TYPE tokenbf_v1(8192, 3, 0) GRANULARITY 1"),
362 "{ddl}"
363 );
364 assert!(
366 ddl.contains("TTL toDateTime(started_at) + INTERVAL 14 DAY TO VOLUME 'cold', toDateTime(started_at) + INTERVAL 180 DAY DELETE"),
367 "{ddl}"
368 );
369 assert!(
371 ddl.contains("SETTINGS storage_policy = 'hot_cold', index_granularity = 8192"),
372 "{ddl}"
373 );
374
375 let pos = |needle: &str| ddl.find(needle).unwrap();
377 assert!(pos("ENGINE = MergeTree()") < pos("PARTITION BY"));
378 assert!(pos("PARTITION BY") < pos("ORDER BY ("));
379 assert!(pos("ORDER BY (") < pos("TTL "));
380 assert!(pos("TTL ") < pos("SETTINGS "));
381 }
382
383 #[test]
384 fn ttl_on_plain_datetime_is_not_wrapped() {
385 let mut t = sample();
386 t.columns
387 .push(col("created", ColumnTypeSpec::Scalar(ScalarType::DateTime)));
388 t.ttl = Some(TtlSpec {
389 column: "created".into(),
390 move_to_volume_after: None,
391 delete_after: Some("30 DAY".into()),
392 });
393 let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
394 assert!(
395 ddl.contains("TTL created + INTERVAL 30 DAY DELETE"),
396 "{ddl}"
397 );
398 assert!(!ddl.contains("toDateTime(created)"), "{ddl}");
399 }
400
401 #[test]
402 fn ttl_delete_only_renders_just_delete() {
403 let mut t = sample();
404 t.ttl = Some(TtlSpec {
406 column: "ts".into(),
407 move_to_volume_after: None,
408 delete_after: Some("90 DAY".into()),
409 });
410 let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
411 assert!(
412 ddl.contains("TTL toDateTime(ts) + INTERVAL 90 DAY DELETE"),
413 "{ddl}"
414 );
415 assert!(!ddl.contains("TO VOLUME"), "{ddl}");
416 }
417
418 #[test]
419 fn ttl_unknown_column_is_rejected() {
420 let mut t = sample();
421 t.ttl = Some(TtlSpec {
422 column: "nope".into(),
423 move_to_volume_after: None,
424 delete_after: Some("1 DAY".into()),
425 });
426 assert!(matches!(
427 to_create_table_sql(&t, &SchemaLimits::default()),
428 Err(SchemaError::InvalidIdentifier {
429 kind: "ttl column",
430 ..
431 })
432 ));
433 }
434
435 #[test]
436 fn index_with_invalid_name_is_rejected() {
437 let mut t = sample();
438 t.indexes = vec![IndexSpec {
439 name: "bad name".into(),
440 expression: "name".into(),
441 type_def: "bloom_filter(0.01)".into(),
442 granularity: 1,
443 }];
444 assert!(matches!(
445 to_create_table_sql(&t, &SchemaLimits::default()),
446 Err(SchemaError::InvalidIdentifier { kind: "index", .. })
447 ));
448 }
449
450 #[test]
451 fn backward_compat_no_extra_clauses() {
452 let ddl = to_create_table_sql(&sample(), &SchemaLimits::default()).unwrap();
455 let expected = "CREATE TABLE IF NOT EXISTS events (\n id UUID,\n ts DateTime64(3),\n name String,\n value Float64,\n tags Array(String)\n)\nENGINE = MergeTree()\nORDER BY (id)";
456 assert_eq!(ddl, expected);
457 }
458
459 #[test]
460 fn parametrised_datetime64_column_renders_with_timezone() {
461 let mut t = sample();
462 let dt: ColumnTypeSpec =
463 serde_json::from_str(r#"{"datetime64":{"precision":3,"timezone":"UTC"}}"#).unwrap();
464 t.columns.push(col("occurred_at", dt));
465 let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
466 assert!(ddl.contains("occurred_at DateTime64(3, 'UTC')"), "{ddl}");
467 }
468
469 #[test]
470 fn parametrised_datetime64_bad_params_rejected_at_ddl_boundary() {
471 let mut t = sample();
473 let bad_tz: ColumnTypeSpec =
474 serde_json::from_str(r#"{"datetime64":{"precision":3,"timezone":"UTC'; DROP"}}"#)
475 .unwrap();
476 t.columns.push(col("occurred_at", bad_tz));
477 assert!(matches!(
478 to_create_table_sql(&t, &SchemaLimits::default()),
479 Err(SchemaError::InvalidIdentifier {
480 kind: "timezone",
481 ..
482 })
483 ));
484
485 let mut t2 = sample();
487 let bad_p: ColumnTypeSpec =
488 serde_json::from_str(r#"{"datetime64":{"precision":12}}"#).unwrap();
489 t2.columns.push(col("occurred_at", bad_p));
490 assert!(matches!(
491 to_create_table_sql(&t2, &SchemaLimits::default()),
492 Err(SchemaError::InvalidDateTime64Precision { precision: 12 })
493 ));
494 }
495
496 #[test]
497 fn ttl_wraps_parametrised_datetime64_column() {
498 let mut t = sample();
499 let dt: ColumnTypeSpec =
500 serde_json::from_str(r#"{"datetime64":{"precision":3,"timezone":"UTC"}}"#).unwrap();
501 t.columns.push(col("occurred_at", dt));
502 t.ttl = Some(TtlSpec {
503 column: "occurred_at".into(),
504 move_to_volume_after: None,
505 delete_after: Some("30 DAY".into()),
506 });
507 let ddl = to_create_table_sql(&t, &SchemaLimits::default()).unwrap();
508 assert!(
510 ddl.contains("TTL toDateTime(occurred_at) + INTERVAL 30 DAY DELETE"),
511 "{ddl}"
512 );
513 }
514}