use std::time::Duration;
use std::time::Instant;
use futures_core::future::BoxFuture;
use sqlx_core::migrate::*;
use sqlx_core::sql_str::AssertSqlSafe;
use sqlx_core::connection::Connection;
use sqlx_core::error::Error;
use sqlx_core::executor::Executor;
use sqlx_core::query::query;
use sqlx_core::query_as::query_as;
use crate::connection::AssertSend;
use crate::{Firebird, FirebirdConnection};
impl MigrateDatabase for Firebird {
fn create_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
AssertSend(async move {
let _ = firebirust::ConnectionAsync::create_database_url(url)
.await
.map_err(|e| Error::Protocol(format!("{:?}", e)))?;
Ok(())
})
}
fn database_exists(url: &str) -> impl std::future::Future<Output = Result<bool, Error>> + Send + '_ {
AssertSend(async move {
match firebirust::ConnectionAsync::connect_url(url).await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
})
}
fn drop_database(url: &str) -> impl std::future::Future<Output = Result<(), Error>> + Send + '_ {
AssertSend(async move {
let mut conn = firebirust::ConnectionAsync::connect_url(url)
.await
.map_err(|e| Error::Protocol(format!("{:?}", e)))?;
conn.execute_batch("DROP DATABASE")
.await
.map_err(|e| Error::Protocol(format!("{:?}", e)))?;
Ok(())
})
}
}
impl Migrate for FirebirdConnection {
fn create_schema_if_not_exists<'e>(
&'e mut self,
_schema_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move { Ok(()) })
}
fn ensure_migrations_table<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<(), MigrateError>> {
Box::pin(async move {
let exists: Option<(i32,)> = query_as(AssertSqlSafe(format!(
"SELECT 1 FROM RDB$RELATIONS WHERE RDB$RELATION_NAME = '{}'",
table_name.to_uppercase()
)))
.fetch_optional(&mut *self)
.await?;
if exists.is_none() {
self.execute(AssertSqlSafe(format!(
r#"
CREATE TABLE {table_name} (
version BIGINT NOT NULL PRIMARY KEY,
description VARCHAR(255) NOT NULL,
installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
success SMALLINT NOT NULL,
checksum BLOB NOT NULL,
execution_time BIGINT NOT NULL
)
"#
)))
.await?;
}
Ok(())
})
}
fn dirty_version<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Option<i64>, MigrateError>> {
Box::pin(async move {
let row: Option<(i64,)> = query_as(AssertSqlSafe(format!(
"SELECT FIRST 1 version FROM {table_name} WHERE success = 0 ORDER BY version"
)))
.fetch_optional(self)
.await?;
Ok(row.map(|r| r.0))
})
}
fn list_applied_migrations<'e>(
&'e mut self,
table_name: &'e str,
) -> BoxFuture<'e, Result<Vec<AppliedMigration>, MigrateError>> {
Box::pin(async move {
let rows: Vec<(i64, Vec<u8>)> = query_as(AssertSqlSafe(format!(
"SELECT version, checksum FROM {table_name} ORDER BY version"
)))
.fetch_all(self)
.await?;
let migrations = rows
.into_iter()
.map(|(version, checksum)| AppliedMigration {
version,
checksum: checksum.into(),
})
.collect();
Ok(migrations)
})
}
fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
Box::pin(async move { Ok(()) })
}
fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
Box::pin(async move { Ok(()) })
}
fn apply<'e>(
&'e mut self,
table_name: &'e str,
migration: &'e Migration,
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
Box::pin(async move {
let mut tx = Connection::begin(self).await?;
let start = Instant::now();
let _ = query(AssertSqlSafe(format!(
"INSERT INTO {table_name} ( version, description, success, checksum, execution_time ) \
VALUES ( ?, ?, 0, ?, -1 )"
)))
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(&mut *tx)
.await?;
let _: crate::FirebirdQueryResult = Executor::execute(
&mut *tx,
migration.sql.clone(),
)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
let _ = query(AssertSqlSafe(format!(
"UPDATE {table_name} SET success = 1 WHERE version = ?"
)))
.bind(migration.version)
.execute(&mut *tx)
.await?;
tx.commit().await?;
let elapsed = start.elapsed();
#[allow(clippy::cast_possible_truncation)]
let _ = query(AssertSqlSafe(format!(
"UPDATE {table_name} SET execution_time = ? WHERE version = ?"
)))
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
.await?;
Ok(elapsed)
})
}
fn revert<'e>(
&'e mut self,
table_name: &'e str,
migration: &'e Migration,
) -> BoxFuture<'e, Result<Duration, MigrateError>> {
Box::pin(async move {
let mut tx = Connection::begin(self).await?;
let start = Instant::now();
let _ = query(AssertSqlSafe(format!(
"UPDATE {table_name} SET success = 0 WHERE version = ?"
)))
.bind(migration.version)
.execute(&mut *tx)
.await?;
let _: crate::FirebirdQueryResult = Executor::execute(
&mut *tx,
migration.sql.clone(),
)
.await?;
let _ = query(AssertSqlSafe(format!(
"DELETE FROM {table_name} WHERE version = ?"
)))
.bind(migration.version)
.execute(&mut *tx)
.await?;
tx.commit().await?;
let elapsed = start.elapsed();
Ok(elapsed)
})
}
}