use async_trait::async_trait;
use reinhardt_db::migrations::registry::LocalRegistry;
use reinhardt_db::migrations::{Migration, MigrationRepository, MigrationSource, Result};
use rstest::*;
use std::collections::HashMap;
#[cfg(feature = "testcontainers")]
use crate::fixtures::testcontainers::postgres_container;
#[cfg(feature = "testcontainers")]
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
#[cfg(feature = "testcontainers")]
use reinhardt_db::migrations::{DatabaseConnection, MigrationError, Operation};
#[cfg(feature = "testcontainers")]
use std::sync::Arc;
#[cfg(feature = "testcontainers")]
use testcontainers::{ContainerAsync, GenericImage};
#[fixture]
pub fn migration_registry() -> LocalRegistry {
LocalRegistry::new()
}
pub struct TestMigrationSource {
migrations: Vec<Migration>,
}
impl TestMigrationSource {
pub fn new() -> Self {
Self {
migrations: Vec::new(),
}
}
pub fn with_migrations(migrations: Vec<Migration>) -> Self {
Self { migrations }
}
pub fn add_migration(&mut self, migration: Migration) {
self.migrations.push(migration);
}
pub fn clear(&mut self) {
self.migrations.clear();
}
pub fn len(&self) -> usize {
self.migrations.len()
}
pub fn is_empty(&self) -> bool {
self.migrations.is_empty()
}
}
impl Default for TestMigrationSource {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl MigrationSource for TestMigrationSource {
async fn all_migrations(&self) -> Result<Vec<Migration>> {
Ok(self.migrations.clone())
}
}
pub struct InMemoryRepository {
migrations: HashMap<(String, String), Migration>,
}
impl InMemoryRepository {
pub fn new() -> Self {
Self {
migrations: HashMap::new(),
}
}
pub fn with_migrations(migrations: Vec<Migration>) -> Self {
let mut repo = Self::new();
for migration in migrations {
let key = (migration.app_label.to_string(), migration.name.to_string());
repo.migrations.insert(key, migration);
}
repo
}
pub fn clear(&mut self) {
self.migrations.clear();
}
pub fn len(&self) -> usize {
self.migrations.len()
}
pub fn is_empty(&self) -> bool {
self.migrations.is_empty()
}
}
impl Default for InMemoryRepository {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl MigrationRepository for InMemoryRepository {
async fn save(&mut self, migration: &Migration) -> Result<()> {
let key = (migration.app_label.to_string(), migration.name.to_string());
self.migrations.insert(key, migration.clone());
Ok(())
}
async fn get(&self, app_label: &str, name: &str) -> Result<Migration> {
let key = (app_label.to_string(), name.to_string());
self.migrations.get(&key).cloned().ok_or_else(|| {
reinhardt_db::migrations::MigrationError::NotFound(format!("{}.{}", app_label, name))
})
}
async fn list(&self, app_label: &str) -> Result<Vec<Migration>> {
Ok(self
.migrations
.values()
.filter(|m| m.app_label == app_label)
.cloned()
.collect())
}
async fn delete(&mut self, app_label: &str, name: &str) -> Result<()> {
let key = (app_label.to_string(), name.to_string());
self.migrations.remove(&key).ok_or_else(|| {
reinhardt_db::migrations::MigrationError::NotFound(format!("{}.{}", app_label, name))
})?;
Ok(())
}
}
#[fixture]
pub fn test_migration_source() -> TestMigrationSource {
TestMigrationSource::new()
}
#[fixture]
pub fn in_memory_repository() -> InMemoryRepository {
InMemoryRepository::new()
}
#[cfg(feature = "testcontainers")]
pub type MigrationExecutorFixture = (
DatabaseMigrationExecutor,
ContainerAsync<GenericImage>,
Arc<sqlx::PgPool>,
u16,
String,
);
#[cfg(feature = "testcontainers")]
pub struct PostgresTableCreator {
executor: DatabaseMigrationExecutor,
pool: Arc<sqlx::PgPool>,
container: ContainerAsync<GenericImage>,
port: u16,
url: String,
}
#[cfg(feature = "testcontainers")]
impl PostgresTableCreator {
pub fn new(
executor: DatabaseMigrationExecutor,
container: ContainerAsync<GenericImage>,
pool: Arc<sqlx::PgPool>,
port: u16,
url: String,
) -> Self {
Self {
executor,
pool,
container,
port,
url,
}
}
pub async fn apply(&mut self, schema: Vec<Operation>) -> Result<()> {
let mut migration = Migration::new("0001_test_schema", "testapp");
for operation in schema {
migration = migration.add_operation(operation);
}
self.executor
.apply_migrations(&[migration])
.await
.expect("Failed to apply test schema migrations");
Ok(())
}
pub fn pool(&self) -> &Arc<sqlx::PgPool> {
&self.pool
}
pub fn url(&self) -> &str {
&self.url
}
pub fn port(&self) -> u16 {
self.port
}
pub fn container(&self) -> &ContainerAsync<GenericImage> {
&self.container
}
pub async fn insert_data(
&self,
table: &str,
columns: Vec<&str>,
values: Vec<Vec<reinhardt_query::prelude::Value>>,
) -> Result<()> {
use reinhardt_query::prelude::{Alias, PostgresQueryBuilder, Query, QueryStatementBuilder};
for row_values in values {
let mut query = Query::insert();
query
.into_table(Alias::new(table))
.columns(columns.iter().map(|&c| Alias::new(c)));
query.values_panic(row_values);
let sql = query.to_string(PostgresQueryBuilder::new());
sqlx::query(&sql)
.execute(self.pool.as_ref())
.await
.map_err(MigrationError::SqlError)?;
}
Ok(())
}
pub async fn execute_sql(&self, sql: &str) -> Result<sqlx::postgres::PgQueryResult> {
sqlx::query(sql)
.execute(self.pool.as_ref())
.await
.map_err(MigrationError::SqlError)
}
pub async fn begin_transaction(&self) -> Result<sqlx::Transaction<'_, sqlx::Postgres>> {
self.pool.begin().await.map_err(MigrationError::SqlError)
}
}
#[cfg(feature = "testcontainers")]
#[fixture]
pub async fn migration_executor(
#[future] postgres_container: (ContainerAsync<GenericImage>, Arc<sqlx::PgPool>, u16, String),
) -> MigrationExecutorFixture {
let (container, pool, port, url) = postgres_container.await;
let connection = DatabaseConnection::connect_postgres(&url)
.await
.expect("Failed to connect to test database");
let executor = DatabaseMigrationExecutor::new(connection);
(executor, container, pool, port, url)
}
#[cfg(feature = "testcontainers")]
#[fixture]
pub async fn postgres_table_creator(
#[future] migration_executor: MigrationExecutorFixture,
) -> PostgresTableCreator {
let (executor, container, pool, port, url) = migration_executor.await;
PostgresTableCreator::new(executor, container, pool, port, url)
}
#[cfg(test)]
mod tests {
use super::*;
use reinhardt_db::migrations::Migration;
use reinhardt_db::migrations::registry::MigrationRegistry;
#[rstest]
fn test_migration_registry_fixture(migration_registry: LocalRegistry) {
assert!(migration_registry.all_migrations().is_empty());
}
#[rstest]
fn test_registry_isolation_between_tests(migration_registry: LocalRegistry) {
assert_eq!(migration_registry.all_migrations().len(), 0);
migration_registry
.register(Migration::new("0001_initial", "test_app"))
.unwrap();
assert_eq!(migration_registry.all_migrations().len(), 1);
}
#[rstest]
fn test_another_isolated_test(migration_registry: LocalRegistry) {
assert_eq!(migration_registry.all_migrations().len(), 0);
}
#[cfg(feature = "testcontainers")]
mod testcontainer_fixtures {
use super::*;
use reinhardt_db::migrations::{ColumnDefinition, DatabaseType, FieldType, Operation};
#[rstest]
#[tokio::test]
async fn test_migration_executor_fixture(
#[future] migration_executor: MigrationExecutorFixture,
) {
let (executor, _container, _pool, _port, _url) = migration_executor.await;
assert_eq!(executor.database_type(), DatabaseType::Postgres);
}
#[rstest]
#[tokio::test]
async fn test_postgres_table_creator_fixture(
#[future] postgres_table_creator: PostgresTableCreator,
) {
let mut creator = postgres_table_creator.await;
let schema = vec![Operation::CreateTable {
name: "fixture_test_table".to_string(),
columns: vec![
ColumnDefinition::new("id", FieldType::Integer),
ColumnDefinition::new("value", FieldType::Text),
],
constraints: vec![],
without_rowid: None,
interleave_in_parent: None,
partition: None,
}];
creator.apply(schema).await.unwrap();
let pool = creator.pool();
let result = sqlx::query("SELECT * FROM fixture_test_table")
.fetch_all(pool.as_ref())
.await
.unwrap();
assert!(result.is_empty());
}
}
}