swellow 0.2.0

SQL-first migration CLI
mod arrow_utils;
mod catalog;
mod error;
mod postgres;
mod spark;
mod sql_common;
pub use catalog::Catalog;
pub use error::{EngineError, EngineErrorKind};
pub use postgres::PostgresEngine;
pub use spark::SparkEngine;

use crate::{cli, migration::MigrationDirection};

use sqlparser;


pub enum EngineBackend {
    Postgres(PostgresEngine),
    SparkDelta(SparkEngine),
    SparkIceberg(SparkEngine),
}

impl EngineBackend {
    pub fn engine(&self) -> cli::Engine {
        match self {
            EngineBackend::Postgres(_) => cli::Engine::Postgres,
            EngineBackend::SparkDelta(_) => cli::Engine::SparkDelta,
            EngineBackend::SparkIceberg(_) => cli::Engine::SparkIceberg,
        }
    }

    pub fn disable_transactions(&mut self) -> () {
        match self {
            EngineBackend::Postgres(engine) => engine.disable_transactions(),
            _ => (),
        }
    }

    pub async fn ensure_table(&mut self) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.ensure_table().await,
            EngineBackend::SparkDelta(engine) => engine.ensure_table().await,
            EngineBackend::SparkIceberg(engine) => engine.ensure_table().await,
        }
    }

    pub async fn begin(&mut self) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.begin().await,
            _ => Ok(()),
        }
    }

    pub async fn fetch_latest_applied_version(&mut self) -> Result<Option<i64>, EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.fetch_latest_applied_version().await,
            EngineBackend::SparkDelta(engine) => engine.fetch_latest_applied_version().await,
            EngineBackend::SparkIceberg(engine) => engine.fetch_latest_applied_version().await,
        }
    }

    pub async fn acquire_lock(&mut self) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.acquire_lock().await,
            EngineBackend::SparkDelta(engine) => engine.acquire_lock().await,
            EngineBackend::SparkIceberg(engine) => engine.acquire_lock().await,
        }
    }

    pub async fn release_lock(&mut self) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.release_lock().await,
            EngineBackend::SparkDelta(engine) => engine.release_lock().await,
            EngineBackend::SparkIceberg(engine) => engine.release_lock().await,
        }
    }

    pub async fn disable_records(&mut self, current_version_id: i64) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.disable_records(current_version_id).await,
            EngineBackend::SparkDelta(engine) => engine.disable_records(current_version_id).await,
            EngineBackend::SparkIceberg(engine) => engine.disable_records(current_version_id).await,
        }
    }

    pub async fn upsert_record(
        &mut self,
        object_type: &sqlparser::ast::ObjectType,
        object_name_before: &str,
        object_name_after: &str,
        version_id: i64,
        checksum: &str
    ) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.upsert_record(
                object_type,
                object_name_before,
                object_name_after,
                version_id,
                checksum,
            ).await,
            EngineBackend::SparkDelta(engine) => engine.upsert_record(
                object_type,
                object_name_before,
                object_name_after,
                version_id,
                checksum,
            ).await,
            EngineBackend::SparkIceberg(engine) => engine.upsert_record(
                object_type,
                object_name_before,
                object_name_after,
                version_id,
                checksum,
            ).await,
        }
    }

    pub async fn execute(&mut self, sql: &str) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.execute(sql).await,
            EngineBackend::SparkDelta(engine) => engine.execute(sql).await,
            EngineBackend::SparkIceberg(engine) => engine.execute(sql).await,
        }
    }

    pub async fn update_record(
        &mut self, 
        direction: &MigrationDirection,
        version_id: i64
    ) -> Result<(), EngineError> {
        let status = match direction {
            MigrationDirection::Up => "APPLIED",
            MigrationDirection::Down => "ROLLED_BACK"
        };

        match self {
            EngineBackend::Postgres(engine) => engine.update_record(status, version_id).await,
            EngineBackend::SparkDelta(engine) => engine.update_record(status, version_id).await,
            EngineBackend::SparkIceberg(engine) => engine.update_record(status, version_id).await,
        }
    }

    pub async fn rollback(&mut self) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.rollback().await,
            _ => Ok(()),
        }
    }

    pub async fn commit(&mut self) -> Result<(), EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.commit().await,
            _ => Ok(()),
        }
    }

    pub async fn snapshot(&mut self) -> Result<String, EngineError> {
        match self {
            EngineBackend::Postgres(engine) => engine.snapshot().await,
            EngineBackend::SparkDelta(engine) => engine.snapshot().await,
            EngineBackend::SparkIceberg(engine) => engine.snapshot().await,
        }
    }
}


/// A trait defining the required business operations for Swellow.
/// This abstracts over different database engines.
pub trait DbEngine {
    async fn ensure_table(&mut self) -> Result<(), EngineError>;
    async fn execute(&mut self, sql: &str) -> Result<(), EngineError>;
    async fn fetch_latest_applied_version(&mut self) -> Result<Option<i64>, EngineError>;
    async fn acquire_lock(&mut self) -> Result<(), EngineError>;
    async fn release_lock(&mut self) -> Result<(), EngineError>;
    async fn disable_records(&mut self, current_version_id: i64) -> Result<(), EngineError>;
    async fn upsert_record(
        &mut self,
        object_type: &sqlparser::ast::ObjectType,
        object_name_before: &str,
        object_name_after: &str,
        version_id: i64,
        checksum: &str
    ) -> Result<(), EngineError>;
    async fn update_record(&mut self, status: &str, version_id: i64) -> Result<(), EngineError>;
    async fn snapshot(&mut self) -> Result<String, EngineError>;
}