evento-sql-migrator 2.0.0-alpha.20

SQL migrations for evento event sourcing library.
Documentation
use sea_query::{ColumnDef, Expr, Table, TableCreateStatement, TableDropStatement};

use evento_sql::Subscriber;

pub struct Operation;

fn up_statement() -> TableCreateStatement {
    Table::create()
        .table(Subscriber::Table)
        .if_not_exists()
        .col(
            ColumnDef::new(Subscriber::Key)
                .string()
                .string_len(50)
                .not_null()
                .primary_key(),
        )
        .col(
            ColumnDef::new(Subscriber::WorkerId)
                .string()
                .not_null()
                .string_len(26),
        )
        .col(ColumnDef::new(Subscriber::Cursor).string())
        .col(ColumnDef::new(Subscriber::Lag).integer().not_null())
        .col(
            ColumnDef::new(Subscriber::Enabled)
                .boolean()
                .not_null()
                .default(true),
        )
        .col(
            ColumnDef::new(Subscriber::CreatedAt)
                .timestamp_with_time_zone()
                .not_null()
                .default(Expr::current_timestamp()),
        )
        .col(
            ColumnDef::new(Subscriber::UpdatedAt)
                .timestamp_with_time_zone()
                .null(),
        )
        .to_owned()
}

fn down_statement() -> TableDropStatement {
    Table::drop().table(Subscriber::Table).to_owned()
}

#[cfg(feature = "sqlite")]
#[async_trait::async_trait]
impl sqlx_migrator::Operation<sqlx::Sqlite> for Operation {
    async fn up(
        &self,
        connection: &mut sqlx::SqliteConnection,
    ) -> Result<(), sqlx_migrator::Error> {
        let statment = up_statement().to_string(sea_query::SqliteQueryBuilder);
        sqlx::query(sqlx::AssertSqlSafe(statment.as_str()))
            .execute(connection)
            .await?;

        Ok(())
    }

    async fn down(
        &self,
        connection: &mut sqlx::SqliteConnection,
    ) -> Result<(), sqlx_migrator::Error> {
        let statment = down_statement().to_string(sea_query::SqliteQueryBuilder);
        sqlx::query(sqlx::AssertSqlSafe(statment.as_str()))
            .execute(connection)
            .await?;

        Ok(())
    }
}

#[cfg(feature = "mysql")]
#[async_trait::async_trait]
impl sqlx_migrator::Operation<sqlx::MySql> for Operation {
    async fn up(&self, connection: &mut sqlx::MySqlConnection) -> Result<(), sqlx_migrator::Error> {
        let statment = up_statement().to_string(sea_query::MysqlQueryBuilder);
        sqlx::query(sqlx::AssertSqlSafe(statment.as_str()))
            .execute(connection)
            .await?;

        Ok(())
    }

    async fn down(
        &self,
        connection: &mut sqlx::MySqlConnection,
    ) -> Result<(), sqlx_migrator::Error> {
        let statment = down_statement().to_string(sea_query::MysqlQueryBuilder);
        sqlx::query(sqlx::AssertSqlSafe(statment.as_str()))
            .execute(connection)
            .await?;

        Ok(())
    }
}

#[cfg(feature = "postgres")]
#[async_trait::async_trait]
impl sqlx_migrator::Operation<sqlx::Postgres> for Operation {
    async fn up(&self, connection: &mut sqlx::PgConnection) -> Result<(), sqlx_migrator::Error> {
        let statment = up_statement().to_string(sea_query::PostgresQueryBuilder);
        sqlx::query(sqlx::AssertSqlSafe(statment.as_str()))
            .execute(connection)
            .await?;

        Ok(())
    }

    async fn down(&self, connection: &mut sqlx::PgConnection) -> Result<(), sqlx_migrator::Error> {
        let statment = down_statement().to_string(sea_query::PostgresQueryBuilder);
        sqlx::query(sqlx::AssertSqlSafe(statment.as_str()))
            .execute(connection)
            .await?;

        Ok(())
    }
}