cquill 0.0.9

Versioned CQL migrations for Cassandra and ScyllaDB
Documentation
use std::collections::VecDeque;
use std::path::PathBuf;

use anyhow::Result;
use scylla::Session;

use crate::cql_file::{CqlFile, CqlStatement};
use crate::queries;
use crate::queries::QueryError;

#[derive(thiserror::Error, Debug)]
pub enum MigrateError {
    #[error("error reading {filename} from disk: {error}")]
    CqlFileReadError { filename: String, error: String },
    #[error("cql query error: {source}")]
    CqlQueryError {
        #[from]
        source: QueryError,
    },
    #[error("previously migrated file '{filename}' has been modified (its current contents do not match the migrated cql file's content hash recorded in {cquill_keyspace}.{cquill_table})")]
    HashConflict {
        filename: String,
        cquill_keyspace: String,
        cquill_table: String,
    },
    #[error("errored saving migrate status of '{0}' to {cquill_keyspace}.{cquill_table}: {1}", error_state.failed_file.filename, error_state.error)]
    HistoryUpdateFailed {
        cquill_keyspace: String,
        cquill_table: String,
        error_state: Box<MigrateErrorState>,
    },
    #[error("errored during migrate of '{0}': {1}", error_state.failed_file.filename, error_state.error)]
    PartialMigration { error_state: Box<MigrateErrorState> },
    #[error("{source}")]
    Other {
        #[from]
        source: anyhow::Error,
    },
}

#[derive(Debug)]
pub struct MigrateErrorState {
    pub error: String,
    pub failed_cql: Option<CqlStatement>,
    pub failed_file: CqlFile,
    pub migrated: Vec<CqlFile>,
}

pub(crate) struct MigrateArgs {
    pub cql_dir: PathBuf,
    pub history_keyspace: String,
    pub history_table: String,
}

pub(crate) async fn perform(
    session: &Session,
    cql_files: &[CqlFile],
    args: MigrateArgs,
) -> Result<Vec<CqlFile>, MigrateError> {
    let mut previously_migrated = VecDeque::from(
        queries::migrated::files::select_all(
            session,
            &args.history_keyspace,
            &args.history_table,
            &args.cql_dir,
        )
        .await?,
    );
    let mut not_migrated: Vec<(CqlFile, Vec<CqlStatement>)> = Vec::new();
    for cql_file in cql_files {
        if let Some(migrated_cql_file) = previously_migrated.pop_front() {
            if cql_file.hash == migrated_cql_file.hash {
                continue;
            } else {
                return Err(MigrateError::HashConflict {
                    filename: cql_file.filename.clone(),
                    cquill_keyspace: args.history_keyspace.clone(),
                    cquill_table: args.history_table.clone(),
                });
            }
        }
        let cql = cql_file.read_statements()?;
        not_migrated.push((cql_file.clone(), cql));
    }
    let mut migrated: Vec<CqlFile> = Vec::new();
    for cql in not_migrated {
        for cql_statement in cql.1 {
            if let Err(err) = queries::exec(session, cql_statement.cql.clone()).await {
                return Err(MigrateError::PartialMigration {
                    error_state: Box::from(MigrateErrorState {
                        error: err.to_string(),
                        failed_cql: Some(cql_statement),
                        failed_file: cql.0,
                        migrated,
                    }),
                });
            }
        }
        migrated.push(cql.0.clone());
        if let Err(err) = queries::migrated::files::insert(
            session,
            &args.history_keyspace,
            &args.history_table,
            &cql.0,
        )
        .await
        {
            return Err(MigrateError::HistoryUpdateFailed {
                error_state: Box::from(MigrateErrorState {
                    error: err.to_string(),
                    failed_file: cql.0,
                    failed_cql: None,
                    migrated,
                }),
                cquill_keyspace: args.history_keyspace,
                cquill_table: args.history_table,
            });
        };
    }
    Ok(migrated)
}

#[cfg(test)]
mod tests {
    use crate::test_utils;

    use super::*;

    #[tokio::test]
    async fn test_migrate_fresh_state() {
        let harness = test_utils::TestHarness::builder()
            .cql_file("v001.cql", "")
            .cql_file("v002.cql", "")
            .cql_file("v003.cql", "")
            .initialize()
            .await;

        let migrate_result =
            perform(&harness.session, &harness.cql_files, harness.migrate_args()).await;
        match migrate_result {
            Err(err) => test_utils::error_panic(&err),
            Ok(migrated_files) => {
                assert_eq!(migrated_files.len(), 3);
            }
        }

        harness.drop_keyspace().await;
    }

    #[tokio::test]
    async fn test_migrate_skip_migrated() {
        let harness = test_utils::TestHarness::builder()
            .cql_file("v001.cql", "")
            .cql_file("v002.cql", "")
            .cql_file("v003.cql", "")
            .initialize()
            .await;
        queries::migrated::files::insert(
            &harness.session,
            &harness.cquill_keyspace,
            &harness.cquill_table.clone(),
            &CqlFile::from_path(harness.cql_file_path("v001.cql")).unwrap(),
        )
        .await
        .expect("save migrated file");

        let migrate_result =
            perform(&harness.session, &harness.cql_files, harness.migrate_args()).await;
        match migrate_result {
            Err(err) => test_utils::error_panic(&err),
            Ok(migrated_files) => {
                assert_eq!(migrated_files.len(), 2);
                let migrated_file_names: Vec<&str> =
                    migrated_files.iter().map(|f| f.filename.as_str()).collect();
                assert!(!migrated_file_names.contains(&"v001.cql"));
            }
        }

        harness.drop_keyspace().await;
    }

    #[tokio::test]
    async fn test_migrate_errors_when_executed_cql_content_changed() {
        let harness = test_utils::TestHarness::builder()
            .cql_file("v001.cql", "")
            .cql_file("v002.cql", "")
            .cql_file("v003.cql", "")
            .initialize()
            .await;
        queries::migrated::files::insert(
            &harness.session,
            &harness.cquill_keyspace,
            &harness.cquill_table.clone(),
            &CqlFile {
                version: 1,
                hash: "abc".to_string(),
                path: harness.cql_file_path("v001.cql"),
                filename: "v001.cql".to_string(),
            },
        )
        .await
        .expect("save migrated file");

        let migrate_result =
            perform(&harness.session, &harness.cql_files, harness.migrate_args()).await;
        match migrate_result {
            Ok(_) => panic!(),
            Err(err) => {
                assert_eq!(
                    err.to_string(),
                    format!("previously migrated file 'v001.cql' has been modified (its current contents do not match the migrated cql file's content hash recorded in {}.{})",
                            harness.cquill_keyspace, harness.cquill_table)
                );
            }
        }

        harness.drop_keyspace().await;
    }

    #[tokio::test]
    async fn test_partial_migration_error_state() {
        let keyspace = test_utils::keyspace_name();
        let harness = test_utils::TestHarness::builder()
            .cql_file("v001.cql", "")
            .cql_file(
                "v002.cql",
                format!(
                    "CREATE TABLE {}.asdf (id UUID PRIMARY KEY, data TEXT); CREATE TABLE;",
                    keyspace
                )
                .as_str(),
            )
            .cquill_history(keyspace.as_str(), "cquill")
            .initialize()
            .await;

        let migrate_result =
            perform(&harness.session, &harness.cql_files, harness.migrate_args()).await;
        match migrate_result {
            Ok(_) => panic!(),
            Err(err) => match err {
                MigrateError::PartialMigration { error_state } => {
                    assert_eq!(error_state.migrated.len(), 1);
                    assert_eq!(error_state.migrated.get(0).unwrap().filename, "v001.cql");
                    assert!(error_state.failed_cql.is_some());
                    assert_eq!(error_state.failed_cql.unwrap().cql, "CREATE TABLE");
                    assert_eq!(error_state.failed_file.filename, "v002.cql");
                    assert_eq!(
                        error_state.error,
                        "cql query error: Database returned an error: The submitted query has a syntax error, Error message: line 1:0 no viable alternative at input '<EOF>'"
                    );
                }
                _ => panic!("error was not a MigrateError::PartialMigration"),
            },
        }

        harness.drop_keyspace().await;
    }
}