argentum_db_infrastructure 0.3.1

The infrastructure layer of database component
Documentation
use crate::adapter::DbAdapterError;
use crate::slqx_postgres::SqlxPostgresAdapter;
use crate::slqx_postgres::migration::MigrationDto;
use crate::slqx_postgres::migration::collection::MigrationCollection;
use argentum_log_business::LoggerTrait;
use sqlx::Transaction;
use sqlx::types::chrono::Utc;
use sqlx_postgres::Postgres;
use std::sync::Arc;

pub struct Migrator<'a, L>
where
    L: LoggerTrait,
{
    adapter: Arc<SqlxPostgresAdapter<L>>,
    migrations: MigrationCollection<'a>,
    migration_table_name: &'a str,
    logger: Arc<L>,
}

impl<'a, L> Migrator<'a, L>
where
    L: LoggerTrait,
{
    pub fn new(
        adapter: Arc<SqlxPostgresAdapter<L>>,
        migrations: MigrationCollection<'a>,
        migration_table_name: &'a str,
        logger: Arc<L>,
    ) -> Self {
        Self {
            adapter,
            migrations,
            migration_table_name,
            logger,
        }
    }

    async fn create_migration_table(&self) -> Result<(), String> {
        self.logger.info("Ensuring that migration table exists...");
        let sql = format!(
            "CREATE TABLE IF NOT EXISTS {} (\
                id INT PRIMARY KEY generated always as identity, \
                executed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_DATE, \
                version VARCHAR NOT NULL UNIQUE\
            );",
            self.migration_table_name
        );

        let query = sqlx::query(&sql);
        let res = self.adapter.exec(query).await;
        match res {
            Ok(_) => {
                self.logger.info("Migration table is ensured.");
                Ok(())
            }
            Err(e) => {
                self.logger
                    .critical(format!("Ensuring was failed with error: {e}"));
                Err(e.to_string())
            }
        }
    }

    async fn rollback(&self, tx: Transaction<'a, Postgres>) -> Result<(), String> {
        if let Err(rollback_error) = self.adapter.rollback(tx).await {
            self.logger
                .critical(format!("Can't rollback transaction: {rollback_error}"));

            return Err(rollback_error.to_string());
        }

        Ok(())
    }

    async fn rollback_with_error(
        &self,
        tx: Transaction<'a, Postgres>,
        e: DbAdapterError,
    ) -> Result<(), String> {
        self.logger.critical(e.to_string());

        self.rollback(tx).await?;

        Err(e.to_string())
    }

    pub async fn migrate_one(&self, version: &str, migration: &Vec<String>) -> Result<(), String> {
        self.logger.info(format!("Migrate version {version}"));

        let tx_res: Result<Transaction<'static, Postgres>, DbAdapterError> =
            self.adapter.begin_transaction().await;
        if let Err(e) = tx_res {
            self.logger
                .critical(format!("Can't start transaction: {e}"));

            return Err(e.to_string());
        }

        let mut tx = tx_res.map_err(|e| e.to_string())?;

        let sql = format!(
            "SELECT * FROM {} WHERE version = $1 LIMIT 1;",
            self.migration_table_name
        );

        let query = sqlx::query_as(&sql).bind(version);

        let result: Result<Option<MigrationDto>, DbAdapterError> =
            self.adapter.fetch_one(query).await;

        if let Err(e) = result {
            return self.rollback_with_error(tx, e).await;
        }

        if let Ok(Some(_)) = result {
            self.logger
                .info(format!("Migration {version} already migrated"));

            self.rollback(tx).await?;

            return Ok(());
        }

        let dto = MigrationDto {
            version: version.to_string(),
            executed_at: Utc::now(),
        };

        let sql = format!(
            "INSERT INTO {} (version, executed_at) values($1, $2);",
            self.migration_table_name
        );

        let query = sqlx::query(&sql).bind(&dto.version).bind(dto.executed_at);

        let result = self.adapter.exec_with_executor(query, &mut *tx).await;

        if let Err(e) = result {
            return self.rollback_with_error(tx, e).await;
        }

        for sql in migration {
            let query = sqlx::query(sql);
            let result = self.adapter.exec_with_executor(query, &mut *tx).await;

            if let Err(e) = result {
                return self.rollback_with_error(tx, e).await;
            }
        }

        let res = self.adapter.commit(tx).await;

        match res {
            Ok(_) => Ok(()),
            Err(e) => {
                self.logger
                    .critical(format!("Can't commit transaction: {e}"));

                Err(e.to_string())
            }
        }
    }

    pub async fn migrate(&self) -> Result<(), String> {
        self.create_migration_table().await?;

        for (key, migration) in &self.migrations {
            self.migrate_one(key, migration).await?;
        }

        Ok(())
    }
}