use crate::client::{ChError, ChExecutor};
use crate::table::TableSpec;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Drift {
MissingTable { table: String },
MissingColumn {
table: String,
column: String,
expected_type: String,
},
ExtraColumn {
table: String,
column: String,
actual_type: String,
},
TypeMismatch {
table: String,
column: String,
expected_type: String,
actual_type: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DriftResult {
pub checked: Vec<String>,
pub drift: Vec<Drift>,
}
impl DriftResult {
pub fn is_clean(&self) -> bool {
self.drift.is_empty()
}
}
fn normalize_type(ch_type: &str) -> String {
ch_type.chars().filter(|c| !c.is_whitespace()).collect()
}
pub async fn check_drift(
exec: &impl ChExecutor,
tables: &[TableSpec],
) -> Result<DriftResult, ChError> {
let mut checked = Vec::with_capacity(tables.len());
let mut drift = Vec::new();
for table in tables {
checked.push(table.name.clone());
let live = exec.fetch_columns(&table.name).await?;
if live.is_empty() {
drift.push(Drift::MissingTable {
table: table.name.clone(),
});
continue;
}
let live_by_name: HashMap<&str, String> = live
.iter()
.map(|c| (c.name.as_str(), normalize_type(&c.type_name)))
.collect();
let expected_names: HashMap<&str, ()> = table
.columns
.iter()
.map(|c| (c.name.as_str(), ()))
.collect();
for col in &table.columns {
let expected_type = col.type_spec.to_ch_type();
match live_by_name.get(col.name.as_str()) {
None => drift.push(Drift::MissingColumn {
table: table.name.clone(),
column: col.name.clone(),
expected_type,
}),
Some(actual) if *actual != normalize_type(&expected_type) => {
drift.push(Drift::TypeMismatch {
table: table.name.clone(),
column: col.name.clone(),
expected_type,
actual_type: actual.clone(),
});
}
Some(_) => {}
}
}
for col in &live {
if !expected_names.contains_key(col.name.as_str()) {
drift.push(Drift::ExtraColumn {
table: table.name.clone(),
column: col.name.clone(),
actual_type: col.type_name.clone(),
});
}
}
}
Ok(DriftResult { checked, drift })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::client::LiveColumn;
use crate::safety::ScalarType;
use crate::table::{ColumnSpec, TableSpec};
use crate::ColumnTypeSpec;
use std::future::Future;
fn col(name: &str, t: ScalarType) -> ColumnSpec {
ColumnSpec {
name: name.into(),
type_spec: ColumnTypeSpec::Scalar(t),
default: None,
}
}
fn spec() -> TableSpec {
TableSpec {
name: "events".into(),
columns: vec![col("id", ScalarType::Uuid), col("name", ScalarType::String)],
engine: "MergeTree()".into(),
order_by: vec!["id".into()],
partition_by: None,
ttl: None,
indexes: vec![],
settings: vec![],
}
}
struct FakeExec(Vec<LiveColumn>);
fn lc(name: &str, ty: &str) -> LiveColumn {
LiveColumn {
name: name.into(),
type_name: ty.into(),
}
}
#[allow(clippy::manual_async_fn)]
impl ChExecutor for FakeExec {
fn command(&self, _sql: &str) -> impl Future<Output = Result<(), ChError>> + Send {
async { Ok(()) }
}
fn fetch_strings(
&self,
_sql: &str,
) -> impl Future<Output = Result<Vec<String>, ChError>> + Send {
async { Ok(vec![]) }
}
fn fetch_columns(
&self,
_table: &str,
) -> impl Future<Output = Result<Vec<LiveColumn>, ChError>> + Send {
let cols = self.0.clone();
async move { Ok(cols) }
}
}
#[tokio::test]
async fn no_drift_when_schema_matches() {
let exec = FakeExec(vec![lc("id", "UUID"), lc("name", "String")]);
let result = check_drift(&exec, &[spec()]).await.unwrap();
assert_eq!(result.checked, vec!["events".to_string()]);
assert!(
result.is_clean(),
"expected no drift, got {:?}",
result.drift
);
}
#[tokio::test]
async fn reports_missing_table() {
let exec = FakeExec(vec![]);
let result = check_drift(&exec, &[spec()]).await.unwrap();
assert_eq!(
result.drift,
vec![Drift::MissingTable {
table: "events".into()
}]
);
}
#[tokio::test]
async fn reports_missing_extra_and_mismatch() {
let exec = FakeExec(vec![lc("id", "String"), lc("extra", "Int32")]);
let result = check_drift(&exec, &[spec()]).await.unwrap();
assert!(result.drift.contains(&Drift::TypeMismatch {
table: "events".into(),
column: "id".into(),
expected_type: "UUID".into(),
actual_type: "String".into(),
}));
assert!(result.drift.contains(&Drift::MissingColumn {
table: "events".into(),
column: "name".into(),
expected_type: "String".into(),
}));
assert!(result.drift.contains(&Drift::ExtraColumn {
table: "events".into(),
column: "extra".into(),
actual_type: "Int32".into(),
}));
}
#[tokio::test]
async fn whitespace_in_types_is_normalized() {
let mut s = spec();
s.columns.push(ColumnSpec {
name: "attrs".into(),
type_spec: ColumnTypeSpec::Map {
map: (
crate::safety::StringOnly::String,
crate::safety::StringOnly::String,
),
},
default: None,
});
let exec = FakeExec(vec![
lc("id", "UUID"),
lc("name", "String"),
lc("attrs", "Map(String,String)"),
]);
let result = check_drift(&exec, &[s]).await.unwrap();
assert!(result.is_clean(), "drift: {:?}", result.drift);
}
}