sqlx-firebirdsql 0.1.0

Firebird SQL driver for SQLx
use std::time::Duration;
use std::time::Instant;

use futures_core::future::BoxFuture;
use sqlx_core::migrate::*;
use sqlx_core::sql_str::AssertSqlSafe;

use sqlx_core::connection::Connection;
use sqlx_core::error::Error;
use sqlx_core::executor::Executor;
use sqlx_core::query::query;
use sqlx_core::query_as::query_as;
use crate::connection::AssertSend;
use crate::{Firebird, FirebirdConnection};

impl MigrateDatabase for Firebird {
    fn create_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
        AssertSend(async move {
            let _ = firebirust::ConnectionAsync::create_database_url(url)
                .await
                .map_err(|e| Error::Protocol(format!("{:?}", e)))?;
            Ok(())
        })
    }

    fn database_exists(url: &str) -> impl std::future::Future<Output = Result<bool, Error>> + Send + '_ {
        AssertSend(async move {
            match firebirust::ConnectionAsync::connect_url(url).await {
                Ok(_) => Ok(true),
                Err(_) => Ok(false),
            }
        })
    }

    fn drop_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
        AssertSend(async move {
            // Connect to the database and execute DROP DATABASE.
            // In Firebird, DROP DATABASE drops the currently connected database.
            let mut conn = firebirust::ConnectionAsync::connect_url(url)
                .await
                .map_err(|e| Error::Protocol(format!("{:?}", e)))?;
            conn.execute_batch("DROP DATABASE")
                .await
                .map_err(|e| Error::Protocol(format!("{:?}", e)))?;
            Ok(())
        })
    }
}

impl Migrate for FirebirdConnection {
    fn create_schema_if_not_exists<'e>(
        &'e mut self,
        _schema_name: &'e str,
    ) -> BoxFuture<'e, Result<(), MigrateError>> {
        // Firebird does not support schemas; this is a no-op
        Box::pin(async move { Ok(()) })
    }

    fn ensure_migrations_table<'e>(
        &'e mut self,
        table_name: &'e str,
    ) -> BoxFuture<'e, Result<(), MigrateError>> {
        Box::pin(async move {
            // Firebird doesn't have IF NOT EXISTS for CREATE TABLE in older versions.
            // Check if the table exists first.
            let exists: Option<(i32,)> = query_as(AssertSqlSafe(format!(
                "SELECT 1 FROM RDB$RELATIONS WHERE RDB$RELATION_NAME = '{}'",
                table_name.to_uppercase()
            )))
            .fetch_optional(&mut *self)
            .await?;

            if exists.is_none() {
                self.execute(AssertSqlSafe(format!(
                    r#"
CREATE TABLE {table_name} (
    version BIGINT NOT NULL PRIMARY KEY,
    description VARCHAR(255) NOT NULL,
    installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
    success SMALLINT NOT NULL,
    checksum BLOB NOT NULL,
    execution_time BIGINT NOT NULL
)
                    "#
                )))
                .await?;
            }

            Ok(())
        })
    }

    fn dirty_version<'e>(
        &'e mut self,
        table_name: &'e str,
    ) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
        Box::pin(async move {
            let row: Option<(i64,)> = query_as(AssertSqlSafe(format!(
                "SELECT FIRST 1 version FROM {table_name} WHERE success = 0 ORDER BY version"
            )))
            .fetch_optional(self)
            .await?;

            Ok(row.map(|r| r.0))
        })
    }

    fn list_applied_migrations<'e>(
        &'e mut self,
        table_name: &'e str,
    ) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
        Box::pin(async move {
            let rows: Vec<(i64, Vec<u8>)> = query_as(AssertSqlSafe(format!(
                "SELECT version, checksum FROM {table_name} ORDER BY version"
            )))
            .fetch_all(self)
            .await?;

            let migrations = rows
                .into_iter()
                .map(|(version, checksum)| AppliedMigration {
                    version,
                    checksum: checksum.into(),
                })
                .collect();

            Ok(migrations)
        })
    }

    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
        // Firebird does not have advisory locks.
        // We rely on the transaction isolation to prevent concurrent migrations.
        Box::pin(async move { Ok(()) })
    }

    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
        Box::pin(async move { Ok(()) })
    }

    fn apply<'e>(
        &'e mut self,
        table_name: &'e str,
        migration: &'e Migration,
    ) -> BoxFuture<'e, Result<Duration, MigrateError>> {
        Box::pin(async move {
            let mut tx = Connection::begin(self).await?;
            let start = Instant::now();

            let _ = query(AssertSqlSafe(format!(
                "INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) \
                 VALUES ( ?, ?, 0, ?, -1 )"
            )))
            .bind(migration.version)
            .bind(&*migration.description)
            .bind(&*migration.checksum)
            .execute(&mut *tx)
            .await?;

            let _: crate::FirebirdQueryResult = Executor::execute(
                &mut *tx,
                migration.sql.clone(),
            )
            .await
            .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;

            let _ = query(AssertSqlSafe(format!(
                "UPDATE {table_name} SET success = 1 WHERE version = ?"
            )))
            .bind(migration.version)
            .execute(&mut *tx)
            .await?;

            tx.commit().await?;

            let elapsed = start.elapsed();

            #[allow(clippy::cast_possible_truncation)]
            let _ = query(AssertSqlSafe(format!(
                "UPDATE {table_name} SET execution_time = ? WHERE version = ?"
            )))
            .bind(elapsed.as_nanos() as i64)
            .bind(migration.version)
            .execute(self)
            .await?;

            Ok(elapsed)
        })
    }

    fn revert<'e>(
        &'e mut self,
        table_name: &'e str,
        migration: &'e Migration,
    ) -> BoxFuture<'e, Result<Duration, MigrateError>> {
        Box::pin(async move {
            let mut tx = Connection::begin(self).await?;
            let start = Instant::now();

            let _ = query(AssertSqlSafe(format!(
                "UPDATE {table_name} SET success = 0 WHERE version = ?"
            )))
            .bind(migration.version)
            .execute(&mut *tx)
            .await?;

            let _: crate::FirebirdQueryResult = Executor::execute(
                &mut *tx,
                migration.sql.clone(),
            )
            .await?;

            let _ = query(AssertSqlSafe(format!(
                "DELETE FROM {table_name} WHERE version = ?"
            )))
            .bind(migration.version)
            .execute(&mut *tx)
            .await?;

            tx.commit().await?;

            let elapsed = start.elapsed();

            Ok(elapsed)
        })
    }
}