use crate::safety::{
assert_not_reserved, validate_identifier, ColumnTypeSpec, ScalarType, SchemaError,
SchemaLimits, StringOnly, DEFAULT_RESERVED_COLUMNS,
};
use crate::table::{ColumnSpec, IndexSpec, TableSpec, TtlSpec};
#[derive(Debug, Clone)]
pub struct FlexibleConfig {
pub mandatory: Vec<ColumnSpec>,
pub promoted: Vec<ColumnSpec>,
pub engine: String,
pub order_by: Vec<String>,
pub reserved: Option<Vec<String>>,
pub partition_by: Option<String>,
pub ttl: Option<TtlSpec>,
pub indexes: Vec<IndexSpec>,
pub settings: Vec<(String, String)>,
}
pub fn flexible_table(
name: &str,
config: FlexibleConfig,
limits: &SchemaLimits,
) -> Result<TableSpec, SchemaError> {
validate_identifier(name, "table", limits)?;
let reserved_owned: Option<Vec<String>> = config.reserved.clone();
let reserved: Vec<&str> = match &reserved_owned {
Some(r) => r.iter().map(String::as_str).collect(),
None => DEFAULT_RESERVED_COLUMNS.to_vec(),
};
for c in config.mandatory.iter().chain(config.promoted.iter()) {
validate_identifier(&c.name, "column", limits)?;
assert_not_reserved(&c.name, &reserved)?;
}
let mut columns: Vec<ColumnSpec> =
Vec::with_capacity(config.mandatory.len() + config.promoted.len() + 2);
columns.extend(config.mandatory);
columns.extend(config.promoted);
columns.push(ColumnSpec {
name: "attrs".into(),
type_spec: ColumnTypeSpec::Map {
map: (StringOnly::String, StringOnly::String),
},
default: None,
});
columns.push(ColumnSpec {
name: "raw".into(),
type_spec: ColumnTypeSpec::Scalar(ScalarType::String),
default: None,
});
Ok(TableSpec {
name: name.to_string(),
columns,
engine: config.engine,
order_by: config.order_by,
partition_by: config.partition_by,
ttl: config.ttl,
indexes: config.indexes,
settings: config.settings,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::table::to_create_table_sql;
fn col(name: &str, t: ColumnTypeSpec) -> ColumnSpec {
ColumnSpec {
name: name.into(),
type_spec: t,
default: None,
}
}
fn config() -> FlexibleConfig {
FlexibleConfig {
mandatory: vec![
col("org_id", ColumnTypeSpec::Scalar(ScalarType::String)),
col("ts", ColumnTypeSpec::Scalar(ScalarType::DateTime64)),
],
promoted: vec![col("status", ColumnTypeSpec::Scalar(ScalarType::String))],
engine: "MergeTree()".into(),
order_by: vec!["org_id".into(), "ts".into()],
reserved: None,
partition_by: None,
ttl: None,
indexes: vec![],
settings: vec![],
}
}
#[test]
fn renders_flexible_table() {
let spec = flexible_table("events", config(), &SchemaLimits::default()).unwrap();
let ddl = to_create_table_sql(&spec, &SchemaLimits::default()).unwrap();
assert!(ddl.contains("CREATE TABLE IF NOT EXISTS events ("));
assert!(ddl.contains("org_id String"));
assert!(ddl.contains("ts DateTime64(3)"));
assert!(ddl.contains("status String"));
assert!(ddl.contains("attrs Map(String, String)"));
assert!(ddl.contains("raw String"));
assert!(ddl.contains("ENGINE = MergeTree()"));
assert!(ddl.contains("ORDER BY (org_id, ts)"));
}
#[test]
fn carries_partition_ttl_indexes_settings_into_ddl() {
use crate::table::{IndexSpec, TtlMove, TtlSpec};
let mut cfg = config();
cfg.partition_by = Some("(org_id, toDate(ts))".into());
cfg.ttl = Some(TtlSpec {
column: "ts".into(),
move_to_volume_after: Some(TtlMove {
interval: "7 DAY".into(),
volume: "cold".into(),
}),
delete_after: Some("90 DAY".into()),
});
cfg.indexes = vec![IndexSpec {
name: "idx_status".into(),
expression: "status".into(),
type_def: "bloom_filter(0.01)".into(),
granularity: 1,
}];
cfg.settings = vec![("index_granularity".into(), "8192".into())];
let spec = flexible_table("events", cfg, &SchemaLimits::default()).unwrap();
let ddl = to_create_table_sql(&spec, &SchemaLimits::default()).unwrap();
assert!(ddl.contains("PARTITION BY (org_id, toDate(ts))"), "{ddl}");
assert!(
ddl.contains(" INDEX idx_status status TYPE bloom_filter(0.01) GRANULARITY 1"),
"{ddl}"
);
assert!(
ddl.contains("TTL toDateTime(ts) + INTERVAL 7 DAY TO VOLUME 'cold', toDateTime(ts) + INTERVAL 90 DAY DELETE"),
"{ddl}"
);
assert!(ddl.contains("SETTINGS index_granularity = 8192"), "{ddl}");
}
#[test]
fn rejects_promoted_column_colliding_with_reserved() {
let mut cfg = config();
cfg.promoted
.push(col("attrs", ColumnTypeSpec::Scalar(ScalarType::String)));
assert!(matches!(
flexible_table("events", cfg, &SchemaLimits::default()),
Err(SchemaError::ReservedColumn(_))
));
}
#[test]
fn rejects_mandatory_column_colliding_with_reserved() {
let mut cfg = config();
cfg.mandatory
.push(col("raw", ColumnTypeSpec::Scalar(ScalarType::String)));
assert!(matches!(
flexible_table("events", cfg, &SchemaLimits::default()),
Err(SchemaError::ReservedColumn(_))
));
}
#[test]
fn rejects_bad_table_name() {
assert!(
flexible_table("events; DROP TABLE x", config(), &SchemaLimits::default()).is_err()
);
}
#[test]
fn rejects_bad_column_name() {
let mut cfg = config();
cfg.mandatory[0].name = "org id".into();
assert!(matches!(
flexible_table("events", cfg, &SchemaLimits::default()),
Err(SchemaError::InvalidIdentifier { .. })
));
}
#[test]
fn custom_reserved_set_overrides_default() {
let mut cfg = config();
cfg.reserved = Some(vec!["secret".into()]);
cfg.promoted
.push(col("secret", ColumnTypeSpec::Scalar(ScalarType::String)));
assert!(matches!(
flexible_table("events", cfg, &SchemaLimits::default()),
Err(SchemaError::ReservedColumn(_))
));
}
}