clickhouse_utils/
migration.rs

1use std::{fs, path::Path, time};
2
3use serde::{Deserialize, Serialize};
4
5struct MigrationFile {
6    name: String,
7    hash: String,
8    content: String,
9}
10
11#[derive(Debug, Serialize, Deserialize, clickhouse::Row)]
12struct MigrationEntry {
13    name: String,
14    hash: String,
15    created_at: u64,
16}
17
18/// Run the migrations in the migrations folder.
19///
20/// - Migration are run in alphabetical order.
21/// - If the migration has changed, will halt with an error.
22/// - If the migration has been run before, it will be skipped.
23///
24///  We MD5 the content of the migration file to detect changes.
25///  A table __migrations is created and used to track the migrations that have been run.
26pub async fn migrate(client: &clickhouse::Client) -> crate::error::Result<()> {
27    let migration_folder = Path::new("migrations");
28    if !migration_folder.exists() {
29        return Err(crate::error::ClickhouseUtilsError::new(
30            "migrations folder does not exist".to_string(),
31        ));
32    }
33
34    let mut migrations = Vec::new();
35    for entry in fs::read_dir(migration_folder)? {
36        let entry = entry?;
37        let path = entry.path();
38        let file_content = fs::read_to_string(&path)?;
39        let Some(file_name) = path.file_name().map(|f| f.to_string_lossy().to_string()) else {
40            continue;
41        };
42
43        if !file_name.ends_with(".sql") {
44            tracing::warn!("skipping non-sql file: {}", file_name);
45            continue;
46        }
47        let file_hash = md5::compute(file_content.as_bytes());
48        migrations.push(MigrationFile {
49            name: file_name,
50            hash: format!("{:x}", file_hash),
51            content: file_content,
52        });
53    }
54    if migrations.is_empty() {
55        tracing::warn!("no migrations to run");
56        return Ok(());
57    }
58
59    migrations.sort_by_key(|m| m.name.clone());
60
61    client
62        .query(
63            "CREATE TABLE IF NOT EXISTS __migrations (
64                name String,
65                hash String,
66                created_at DateTime64(0),
67                PRIMARY KEY (name)
68            )",
69        )
70        .execute()
71        .await?;
72    let existing_migrations: Vec<MigrationEntry> = client
73        .query("SELECT name, hash, created_at FROM __migrations")
74        .fetch_all()
75        .await?;
76
77    for migration in migrations {
78        if let Some(existing_migration) = existing_migrations
79            .iter()
80            .find(|m| m.name == migration.name)
81        {
82            if existing_migration.hash == migration.hash {
83                continue;
84            }
85            return Err(crate::error::ClickhouseUtilsError::new(format!(
86                "Migration {} has changed, originally created_at: {}",
87                migration.name, existing_migration.created_at
88            )));
89        }
90
91        let created_at = time::SystemTime::now()
92            .duration_since(time::UNIX_EPOCH)?
93            .as_secs();
94        let mut migration_insert = client.insert("__migrations")?;
95        migration_insert
96            .write(&MigrationEntry {
97                name: migration.name.clone(),
98                hash: migration.hash,
99                created_at,
100            })
101            .await?;
102
103        // Run the migration
104        client.query(&migration.content).execute().await?;
105        tracing::info!("{} - migrated", migration.name);
106        // Force the insert to be executed, because the migration was executed
107        migration_insert.end().await?;
108    }
109
110    Ok(())
111}