chm 0.1.17

Migrations CLI tool for ClickHouse
Documentation
use chrono::Utc;
use clickhouse::{Client, Row};
use serde::{Deserialize, Serialize};

use crate::{errors::CLIError, tools::migrations::SetupArgs};

use super::migrations_operators::MigrationOnDisk;

#[derive(Row, Serialize, Deserialize, Debug, Clone)]
pub struct MigrationRow {
    pub version: String,
    pub ran_at: u32,
}

pub async fn create_migrations_table(client: clickhouse::Client) -> Result<(), CLIError> {
    client
        .query(
            "
            CREATE TABLE IF NOT EXISTS ch_migrations (
                version String,
                ran_at DateTime,
                ) ENGINE = MergeTree()
            ORDER BY(ran_at, version)
            ",
        )
        .execute()
        .await
        .map_err(|e| e.into())
}

pub async fn check_if_migrations_table_exists(
    client: clickhouse::Client,
) -> Result<bool, CLIError> {
    let table_exists = client
        .query(
            "
            SELECT name FROM system.tables WHERE name = 'ch_migrations'
            ",
        )
        .fetch_all::<String>()
        .await?;

    Ok(!table_exists.is_empty())
}

pub async fn get_clickhouse_client_and_ping(args: SetupArgs) -> Result<Client, CLIError> {
    let mut client = Client::default().with_url(
        args.url
            .ok_or(CLIError::BadArgs("Missing URL".to_string()))?,
    );

    if let Some(user) = args.user {
        client = client.with_user(user);
    }

    if let Some(password) = args.password {
        client = client.with_password(password);
    }

    if let Some(database) = args.database {
        client = client.with_database(database);
    }

    client.query("SELECT 1").execute().await?;

    Ok(client)
}

pub async fn get_migrations_from_clickhouse(
    client: clickhouse::Client,
) -> Result<Vec<MigrationRow>, CLIError> {
    let migrations = client
        .query(
            "
        SELECT ?fields FROM ch_migrations ORDER BY ran_at
        ",
        )
        .fetch_all::<MigrationRow>()
        .await?;
    Ok(migrations)
}

pub async fn apply_migrations(
    client: clickhouse::Client,
    migrations: Vec<MigrationOnDisk>,
) -> Result<(), CLIError> {
    for migration in &migrations {
        let mut insert = client.insert::<MigrationRow>("ch_migrations")?;

        insert
            .write(&MigrationRow {
                ran_at: Utc::now().timestamp() as u32,
                version: migration.version.clone(),
            })
            .await?;

        let up_query = migration.get_up_query().await?;
        let queries = up_query
            .split(';')
            .filter(|s| {
                !s.is_empty()
                    && !s.contains("--")
                    && !s.chars().all(|c| c.is_whitespace() || c == '\n')
            })
            .collect::<Vec<&str>>();

        println!("Running migration {}", migration.name);

        for query in queries {
            client.query(query).execute().await?;
        }

        insert.end().await?;
    }
    Ok(())
}

pub async fn undo_migration(
    client: clickhouse::Client,
    migration: MigrationOnDisk,
) -> Result<(), CLIError> {
    let down_query = migration.get_down_query().await?;
    let mut queries = down_query
        .split(';')
        .filter(|s| !s.is_empty())
        .collect::<Vec<&str>>();

    queries.truncate(queries.len().saturating_sub(1));

    println!("Reverting migration {}", migration.name);

    for query in queries {
        client.query(query).execute().await?;
    }

    client
        .query(
            format!(
                "DELETE FROM ch_migrations WHERE version = '{}'",
                migration.version
            )
            .as_str(),
        )
        .execute()
        .await?;

    Ok(())
}