clickhouse_kit/
migrate.rs1use crate::client::{ChError, ChExecutor};
7use std::path::Path;
8
9const MIGRATIONS_TABLE: &str = "_ch_migrations";
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct MigrationRunResult {
15 pub discovered: Vec<String>,
17 pub skipped: Vec<String>,
19 pub applied: Vec<String>,
21}
22
23pub async fn run_migrations(
27 exec: &impl ChExecutor,
28 dir: &Path,
29) -> Result<MigrationRunResult, ChError> {
30 ensure_migrations_table(exec).await?;
31
32 let discovered = discover_migration_files(dir)?;
33 let already_applied = fetch_applied(exec).await?;
34
35 let mut skipped = Vec::new();
36 let mut applied = Vec::new();
37
38 for filename in &discovered {
39 if already_applied.contains(filename) {
40 skipped.push(filename.clone());
41 continue;
42 }
43
44 let path = dir.join(filename);
45 let sql = std::fs::read_to_string(&path)?;
46 for statement in split_sql_statements(&sql) {
47 exec.command(&statement).await?;
48 }
49 exec.command(&record_statement(filename)).await?;
50 applied.push(filename.clone());
51 }
52
53 Ok(MigrationRunResult {
54 discovered,
55 skipped,
56 applied,
57 })
58}
59
60async fn ensure_migrations_table(exec: &impl ChExecutor) -> Result<(), ChError> {
62 let ddl = format!(
63 "CREATE TABLE IF NOT EXISTS {MIGRATIONS_TABLE} (\n\
64 \x20 filename String,\n\
65 \x20 applied_at DateTime DEFAULT now()\n\
66 )\nENGINE = MergeTree\nORDER BY filename"
67 );
68 exec.command(&ddl).await
69}
70
71async fn fetch_applied(exec: &impl ChExecutor) -> Result<Vec<String>, ChError> {
73 exec.fetch_strings(&format!(
74 "SELECT filename FROM {MIGRATIONS_TABLE} ORDER BY filename"
75 ))
76 .await
77}
78
79fn record_statement(filename: &str) -> String {
81 let escaped = filename.replace('\'', "''");
84 format!("INSERT INTO {MIGRATIONS_TABLE} (filename) VALUES ('{escaped}')")
85}
86
87fn discover_migration_files(dir: &Path) -> Result<Vec<String>, ChError> {
89 let mut files = Vec::new();
90 for entry in std::fs::read_dir(dir)? {
91 let entry = entry?;
92 let path = entry.path();
93 if path.extension().and_then(|e| e.to_str()) == Some("sql") {
94 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
95 files.push(name.to_string());
96 }
97 }
98 }
99 files.sort();
100 Ok(files)
101}
102
103pub fn split_sql_statements(sql: &str) -> Vec<String> {
106 let stripped: String = sql
107 .lines()
108 .map(|line| match line.find("--") {
109 Some(idx) => &line[..idx],
110 None => line,
111 })
112 .collect::<Vec<_>>()
113 .join("\n");
114
115 stripped
116 .split(';')
117 .map(str::trim)
118 .filter(|s| !s.is_empty())
119 .map(str::to_string)
120 .collect()
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn splits_and_strips_comments() {
129 let sql = "-- create the table\n\
130 CREATE TABLE x (a Int32) ENGINE = Memory; -- trailing\n\
131 INSERT INTO x VALUES (1);\n\
132 \n\
133 -- a whole-line comment\n";
134 let stmts = split_sql_statements(sql);
135 assert_eq!(stmts.len(), 2);
136 assert_eq!(stmts[0], "CREATE TABLE x (a Int32) ENGINE = Memory");
137 assert_eq!(stmts[1], "INSERT INTO x VALUES (1)");
138 }
139
140 #[test]
141 fn empty_input_yields_no_statements() {
142 assert!(split_sql_statements("").is_empty());
143 assert!(split_sql_statements(" \n ;; \n -- only a comment").is_empty());
144 }
145
146 #[test]
147 fn record_statement_escapes_quotes() {
148 assert_eq!(
149 record_statement("001_init.sql"),
150 "INSERT INTO _ch_migrations (filename) VALUES ('001_init.sql')"
151 );
152 assert_eq!(
153 record_statement("o'brien.sql"),
154 "INSERT INTO _ch_migrations (filename) VALUES ('o''brien.sql')"
155 );
156 }
157}