clickhouse-qol 0.1.2

Quality of Life tools for ClickHouse
Documentation
use std::{cmp::max, collections::HashMap, path::PathBuf};

use clickhouse_operations::{create_migrations_table, get_migrations_in_database};

use crate::{error::ClickhouseToolError, init_client};

mod clickhouse_operations;
mod migration;

#[derive(Debug, Clone)]
pub struct ClickhouseMigration {
    pub version: u64,
    pub name: String,
    pub path: PathBuf,
}

impl ClickhouseMigration {
    pub fn from_file(path: String) -> Result<Self, ClickhouseToolError> {
        let path = PathBuf::from(path);
        let name = path
            .clone()
            .file_name()
            .ok_or_else(|| ClickhouseToolError::InvaidArgs("Invalid migration file name".to_string()))?
            .to_os_string()
            .into_string()
            .map_err(|_| ClickhouseToolError::InvaidArgs("Invalid migration file name".to_string()))?;

        let (version_str, name) = name
            .split_once('_')
            .ok_or_else(|| ClickhouseToolError::InvaidArgs("Invalid migration file name".to_string()))?;

        let version = version_str
            .parse::<u64>()
            .map_err(|_| ClickhouseToolError::InvaidArgs("Invalid migration file name".to_string()))?;

        Ok(Self {
            version,
            name: name.to_string(),
            path,
        })
    }

    pub async fn get_up_query(&self) -> Result<String, ClickhouseToolError> {
        tokio::fs::read_to_string(&self.path.join("up.sql")).await.map_err(|_| {
            ClickhouseToolError::IoError(std::io::Error::new(
                std::io::ErrorKind::Other,
                "Failed to read migration file",
            ))
        })
    }

    pub async fn get_down_query(&self) -> Result<String, ClickhouseToolError> {
        tokio::fs::read_to_string(&self.path.join("down.sql"))
            .await
            .map_err(|_| {
                ClickhouseToolError::IoError(std::io::Error::new(
                    std::io::ErrorKind::Other,
                    "Failed to read migration file",
                ))
            })
    }
}

#[cfg(test)]
mod test {
    use super::ClickhouseMigration;

    #[test]
    fn test_clickhouse_migration() {
        let migration = ClickhouseMigration::from_file("20230101_001_create_users".to_string());

        assert!(migration.is_ok());
        let migration = migration.unwrap();
        assert_eq!(migration.version, 20230101);
        assert_eq!(migration.name, "001_create_users");
    }

    #[test]
    fn test_clickhouse_migration_in_directory() {
        let migration = ClickhouseMigration::from_file("./clickhouse/migrations/20230101_001_create_users".to_string());

        assert!(migration.is_ok());
        let migration = migration.unwrap();
        assert_eq!(migration.version, 20230101);
        assert_eq!(migration.name, "001_create_users");
    }
}

pub struct ClickhouseMigrator {
    client: clickhouse::Client,
    directory: String,
}

impl ClickhouseMigrator {
    pub async fn new(uri: &str, directory: &str) -> Result<Self, ClickhouseToolError> {
        let client = init_client(uri).await?;
        Ok(Self {
            client,
            directory: directory.to_string(),
        })
    }

    pub async fn init(&self) -> Result<(), ClickhouseToolError> {
        create_migrations_table(&self.client).await
    }

    pub async fn create_migration(&self, name: &str) -> Result<(), ClickhouseToolError> {
        let now = chrono::Local::now().format("%Y%m%d%H%M%S");
        let migration_name = format!("{}_{}", now, name);
        let migration_path = format!("{}/{}", self.directory, migration_name);
        let up_file_path = format!("{}/up.sql", migration_path);
        let down_file_path = format!("{}/down.sql", migration_path);
        tokio::fs::create_dir_all(&migration_path).await?;
        tokio::fs::write(&up_file_path, "-- up migration").await?;
        tokio::fs::write(&down_file_path, "-- down migration").await?;
        Ok(())
    }

    pub async fn up(&self, step: Option<usize>) -> Result<(), ClickhouseToolError> {
        let migration = self.get_migrations().await?;
        let migration_exist = get_migrations_in_database(&self.client).await?;
        let migration = if migration_exist.is_empty() {
            migration
        } else {
            let last_migration = migration_exist
                .first()
                .ok_or(ClickhouseToolError::InternalError("No migration found".to_string()))?;
            migration
                .into_iter()
                .filter(|m| m.version > last_migration.version)
                .map(|m| m.to_owned())
                .collect::<Vec<ClickhouseMigration>>()
        };

        let step = max(migration.len(), step.unwrap_or(migration.len()));
        let migration = migration[..step].to_vec();
        for m in migration {
            clickhouse_operations::apply_migration(&self.client, &m).await?;
        }

        Ok(())
    }

    pub async fn down(&self, step: Option<usize>) -> Result<(), ClickhouseToolError> {
        let migration = self.get_migrations().await?;
        let migration_map: HashMap<u64, ClickhouseMigration> =
            migration.iter().map(|m| (m.version, m.clone())).collect();
        let migration_exist = get_migrations_in_database(&self.client).await?;
        if migration_exist.is_empty() {
            return Ok(());
        } else {
            let mut exist_step = step.unwrap_or(migration_exist.len());
            for m in migration_exist.iter() {
                if exist_step == 0 {
                    break;
                }
                if let Some(migration) = migration_map.get(&m.version) {
                    clickhouse_operations::rollback_migration(&self.client, migration).await?;
                }
                exist_step -= 1;
            }
            Ok(())
        }
    }

    pub async fn get_migrations(&self) -> Result<Vec<ClickhouseMigration>, ClickhouseToolError> {
        let mut migrations = Vec::new();
        let mut entries = tokio::fs::read_dir(&self.directory).await?;

        while let Some(entry) = entries
            .next_entry()
            .await
            .map_err(|e| ClickhouseToolError::IoError(e))?
        {
            if entry.file_type().await?.is_dir() {
                let migration = ClickhouseMigration::from_file(entry.path().to_string_lossy().to_string())?;
                migrations.push(migration);
            }
        }

        migrations.sort_by(|a, b| a.version.cmp(&b.version));
        Ok(migrations)
    }
}