clickhouse_utils/
migration.rs1use std::{fs, path::Path, time};
2
3use serde::{Deserialize, Serialize};
4
5struct MigrationFile {
6 name: String,
7 hash: String,
8 content: String,
9}
10
11#[derive(Debug, Serialize, Deserialize, clickhouse::Row)]
12struct MigrationEntry {
13 name: String,
14 hash: String,
15 created_at: u64,
16}
17
18pub async fn migrate(client: &clickhouse::Client) -> crate::error::Result<()> {
27 let migration_folder = Path::new("migrations");
28 if !migration_folder.exists() {
29 return Err(crate::error::ClickhouseUtilsError::new(
30 "migrations folder does not exist".to_string(),
31 ));
32 }
33
34 let mut migrations = Vec::new();
35 for entry in fs::read_dir(migration_folder)? {
36 let entry = entry?;
37 let path = entry.path();
38 let file_content = fs::read_to_string(&path)?;
39 let Some(file_name) = path.file_name().map(|f| f.to_string_lossy().to_string()) else {
40 continue;
41 };
42
43 if !file_name.ends_with(".sql") {
44 tracing::warn!("skipping non-sql file: {}", file_name);
45 continue;
46 }
47 let file_hash = md5::compute(file_content.as_bytes());
48 migrations.push(MigrationFile {
49 name: file_name,
50 hash: format!("{:x}", file_hash),
51 content: file_content,
52 });
53 }
54 if migrations.is_empty() {
55 tracing::warn!("no migrations to run");
56 return Ok(());
57 }
58
59 migrations.sort_by_key(|m| m.name.clone());
60
61 client
62 .query(
63 "CREATE TABLE IF NOT EXISTS __migrations (
64 name String,
65 hash String,
66 created_at DateTime64(0),
67 PRIMARY KEY (name)
68 )",
69 )
70 .execute()
71 .await?;
72 let existing_migrations: Vec<MigrationEntry> = client
73 .query("SELECT name, hash, created_at FROM __migrations")
74 .fetch_all()
75 .await?;
76
77 for migration in migrations {
78 if let Some(existing_migration) = existing_migrations
79 .iter()
80 .find(|m| m.name == migration.name)
81 {
82 if existing_migration.hash == migration.hash {
83 continue;
84 }
85 return Err(crate::error::ClickhouseUtilsError::new(format!(
86 "Migration {} has changed, originally created_at: {}",
87 migration.name, existing_migration.created_at
88 )));
89 }
90
91 let created_at = time::SystemTime::now()
92 .duration_since(time::UNIX_EPOCH)?
93 .as_secs();
94 let mut migration_insert = client.insert("__migrations")?;
95 migration_insert
96 .write(&MigrationEntry {
97 name: migration.name.clone(),
98 hash: migration.hash,
99 created_at,
100 })
101 .await?;
102
103 client.query(&migration.content).execute().await?;
105 tracing::info!("{} - migrated", migration.name);
106 migration_insert.end().await?;
108 }
109
110 Ok(())
111}