Skip to main content

clickhouse_kit/
migrate.rs

1//! Forward-only migration runner. Applies `*.sql` files from a directory in
2//! lexical order, recording each in a `_ch_migrations` bookkeeping table so
3//! re-runs are idempotent. There is no auto-diff and no down-migration — schema
4//! change is expressed as ordered, append-only SQL files.
5
6use crate::client::{ChError, ChExecutor};
7use std::path::Path;
8
9/// The bookkeeping table that records which migration files have been applied.
10const MIGRATIONS_TABLE: &str = "_ch_migrations";
11
12/// Outcome of a [`run_migrations`] pass.
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct MigrationRunResult {
15    /// Every `*.sql` file found in the directory, lexically sorted.
16    pub discovered: Vec<String>,
17    /// Files already recorded as applied (skipped this pass).
18    pub skipped: Vec<String>,
19    /// Files applied during this pass, in the order they ran.
20    pub applied: Vec<String>,
21}
22
23/// Ensure the bookkeeping table exists, then apply every pending `*.sql` file in
24/// `dir` (lexical order), recording each as it succeeds. Idempotent: files
25/// already present in `_ch_migrations` are skipped.
26pub async fn run_migrations(
27    exec: &impl ChExecutor,
28    dir: &Path,
29) -> Result<MigrationRunResult, ChError> {
30    ensure_migrations_table(exec).await?;
31
32    let discovered = discover_migration_files(dir)?;
33    let already_applied = fetch_applied(exec).await?;
34
35    let mut skipped = Vec::new();
36    let mut applied = Vec::new();
37
38    for filename in &discovered {
39        if already_applied.contains(filename) {
40            skipped.push(filename.clone());
41            continue;
42        }
43
44        let path = dir.join(filename);
45        let sql = std::fs::read_to_string(&path)?;
46        for statement in split_sql_statements(&sql) {
47            exec.command(&statement).await?;
48        }
49        exec.command(&record_statement(filename)).await?;
50        applied.push(filename.clone());
51    }
52
53    Ok(MigrationRunResult {
54        discovered,
55        skipped,
56        applied,
57    })
58}
59
60/// Create the `_ch_migrations` table if it does not already exist.
61async fn ensure_migrations_table(exec: &impl ChExecutor) -> Result<(), ChError> {
62    let ddl = format!(
63        "CREATE TABLE IF NOT EXISTS {MIGRATIONS_TABLE} (\n\
64         \x20   filename String,\n\
65         \x20   applied_at DateTime DEFAULT now()\n\
66         )\nENGINE = MergeTree\nORDER BY filename"
67    );
68    exec.command(&ddl).await
69}
70
71/// Read the already-applied migration filenames from the bookkeeping table.
72async fn fetch_applied(exec: &impl ChExecutor) -> Result<Vec<String>, ChError> {
73    exec.fetch_strings(&format!(
74        "SELECT filename FROM {MIGRATIONS_TABLE} ORDER BY filename"
75    ))
76    .await
77}
78
79/// Record a single applied migration filename.
80fn record_statement(filename: &str) -> String {
81    // Filenames come from a trusted directory listing, but escape single quotes
82    // defensively so an odd filename can't break the INSERT.
83    let escaped = filename.replace('\'', "''");
84    format!("INSERT INTO {MIGRATIONS_TABLE} (filename) VALUES ('{escaped}')")
85}
86
87/// List `*.sql` files in `dir`, returning their filenames in lexical order.
88fn discover_migration_files(dir: &Path) -> Result<Vec<String>, ChError> {
89    let mut files = Vec::new();
90    for entry in std::fs::read_dir(dir)? {
91        let entry = entry?;
92        let path = entry.path();
93        if path.extension().and_then(|e| e.to_str()) == Some("sql") {
94            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
95                files.push(name.to_string());
96            }
97        }
98    }
99    files.sort();
100    Ok(files)
101}
102
103/// Split a migration file into individual statements: strip `--` line comments,
104/// split on `;`, and drop empty fragments.
105pub fn split_sql_statements(sql: &str) -> Vec<String> {
106    let stripped: String = sql
107        .lines()
108        .map(|line| match line.find("--") {
109            Some(idx) => &line[..idx],
110            None => line,
111        })
112        .collect::<Vec<_>>()
113        .join("\n");
114
115    stripped
116        .split(';')
117        .map(str::trim)
118        .filter(|s| !s.is_empty())
119        .map(str::to_string)
120        .collect()
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn splits_and_strips_comments() {
129        let sql = "-- create the table\n\
130                   CREATE TABLE x (a Int32) ENGINE = Memory; -- trailing\n\
131                   INSERT INTO x VALUES (1);\n\
132                   \n\
133                   -- a whole-line comment\n";
134        let stmts = split_sql_statements(sql);
135        assert_eq!(stmts.len(), 2);
136        assert_eq!(stmts[0], "CREATE TABLE x (a Int32) ENGINE = Memory");
137        assert_eq!(stmts[1], "INSERT INTO x VALUES (1)");
138    }
139
140    #[test]
141    fn empty_input_yields_no_statements() {
142        assert!(split_sql_statements("").is_empty());
143        assert!(split_sql_statements("   \n  ;; \n -- only a comment").is_empty());
144    }
145
146    #[test]
147    fn record_statement_escapes_quotes() {
148        assert_eq!(
149            record_statement("001_init.sql"),
150            "INSERT INTO _ch_migrations (filename) VALUES ('001_init.sql')"
151        );
152        assert_eq!(
153            record_statement("o'brien.sql"),
154            "INSERT INTO _ch_migrations (filename) VALUES ('o''brien.sql')"
155        );
156    }
157}