evento-sql 2.0.0-alpha.15

SQL database implementations for evento event sourcing library.
Documentation
use evento::sql::{RwSqlite, Sql};
use sqlx::{
    sqlite::{SqliteConnectOptions, SqlitePoolOptions},
    SqlitePool,
};
use std::str::FromStr;

mod pool;

fn rw_from_pools(pools: (SqlitePool, SqlitePool)) -> RwSqlite {
    let (r, w) = pools;
    (evento::Sqlite::from(r), evento::Sqlite::from(w)).into()
}

#[tokio::test]
async fn sqlite_routing_key() -> anyhow::Result<()> {
    let executor = create_sqlite_executor("routing_key").await?;

    evento_test::routing_key(&executor).await
}

#[tokio::test]
async fn sqlite_load() -> anyhow::Result<()> {
    let executor = create_sqlite_executor("load").await?;

    evento_test::load(&executor).await
}

#[tokio::test]
async fn sqlite_load_multiple_aggregator() -> anyhow::Result<()> {
    let executor = create_sqlite_executor("load_multiple_aggregator").await?;

    evento_test::load_multiple_aggregator(&executor).await
}

#[tokio::test]
async fn sqlite_load_with_snapshot() -> anyhow::Result<()> {
    let executor = create_sqlite_executor("load_with_snapshot").await?;

    evento_test::load_with_snapshot(&executor).await
}

#[tokio::test]
async fn sqlite_invalid_original_version() -> anyhow::Result<()> {
    let executor = create_sqlite_executor("invalid_original_version").await?;

    evento_test::invalid_original_version(&executor).await
}

#[tokio::test]
async fn sqlite_subscriber_running() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("subscriber_running").await?;

    evento_test::subscriber_running::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn sqlite_subscribe() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("subscribe").await?;

    evento_test::subscribe::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn sqlite_subscribe_routing_key() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("subscribe_routing_key").await?;

    evento_test::subscribe_routing_key::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn sqlite_subscribe_default() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("subscribe_default").await?;

    evento_test::subscribe_default::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn sqlite_subscribe_multiple_aggregator() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("subscribe_multiple_aggregator").await?;

    evento_test::subscribe_multiple_aggregator::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn sqlite_subscribe_routing_key_multiple_aggregator() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("subscribe_routing_key_multiple_aggregator").await?;

    evento_test::subscribe_routing_key_multiple_aggregator::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn sqlite_subscribe_default_multiple_aggregator() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("subscribe_default_multiple_aggregator").await?;

    evento_test::subscribe_default_multiple_aggregator::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn sqlite_all_commands() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("all_commands").await?;

    evento_test::all_commands::<Sql<sqlx::Sqlite>>(&pool.into()).await
}

#[tokio::test]
async fn rw_sqlite_routing_key() -> anyhow::Result<()> {
    let executor = create_rw_sqlite_executor("routing_key").await?;

    evento_test::routing_key(&executor).await
}

#[tokio::test]
async fn rw_sqlite_load() -> anyhow::Result<()> {
    let executor = create_rw_sqlite_executor("load").await?;

    evento_test::load(&executor).await
}

#[tokio::test]
async fn rw_sqlite_load_multiple_aggregator() -> anyhow::Result<()> {
    let executor = create_rw_sqlite_executor("load_multiple_aggregator").await?;

    evento_test::load_multiple_aggregator(&executor).await
}

#[tokio::test]
async fn rw_sqlite_load_with_snapshot() -> anyhow::Result<()> {
    let executor = create_rw_sqlite_executor("load_with_snapshot").await?;

    evento_test::load_with_snapshot(&executor).await
}

#[tokio::test]
async fn rw_sqlite_invalid_original_version() -> anyhow::Result<()> {
    let executor = create_rw_sqlite_executor("invalid_original_version").await?;

    evento_test::invalid_original_version(&executor).await
}

#[tokio::test]
async fn rw_sqlite_subscriber_running() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("subscriber_running").await?;

    evento_test::subscriber_running::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn rw_sqlite_subscribe() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("subscribe").await?;

    evento_test::subscribe::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn rw_sqlite_subscribe_routing_key() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("subscribe_routing_key").await?;

    evento_test::subscribe_routing_key::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn rw_sqlite_subscribe_default() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("subscribe_default").await?;

    evento_test::subscribe_default::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn rw_sqlite_subscribe_multiple_aggregator() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("subscribe_multiple_aggregator").await?;

    evento_test::subscribe_multiple_aggregator::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn rw_sqlite_subscribe_routing_key_multiple_aggregator() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("subscribe_routing_key_multiple_aggregator").await?;

    evento_test::subscribe_routing_key_multiple_aggregator::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn rw_sqlite_subscribe_default_multiple_aggregator() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("subscribe_default_multiple_aggregator").await?;

    evento_test::subscribe_default_multiple_aggregator::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn rw_sqlite_all_commands() -> anyhow::Result<()> {
    let pool = create_rw_sqlite_pool("all_commands").await?;

    evento_test::all_commands::<RwSqlite>(&rw_from_pools(pool)).await
}

#[tokio::test]
async fn sqlite_forward_asc() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("forward_asc").await?;

    pool::forward_asc(pool).await
}

#[tokio::test]
async fn sqlite_forward_desc() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("forward_desc").await?;

    pool::forward_desc(pool).await
}

#[tokio::test]
async fn sqlite_backward_asc() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("backward_asc").await?;

    pool::backward_asc(pool).await
}

#[tokio::test]
async fn sqlite_backward_desc() -> anyhow::Result<()> {
    let pool = create_sqlite_pool("backward_desc").await?;

    pool::backward_desc(pool).await
}

async fn create_rw_sqlite_executor(
    key: impl Into<String>,
) -> anyhow::Result<evento::Rw<evento::Sqlite, evento::Sqlite>> {
    let (r, w) = create_rw_sqlite_pool(key).await?;
    Ok((evento::Sqlite::from(r), evento::Sqlite::from(w)).into())
}

async fn create_sqlite_executor(key: impl Into<String>) -> anyhow::Result<evento::Evento> {
    let executor: evento::Sqlite = create_sqlite_pool(key).await?.into();

    Ok(executor.into())
}

async fn create_rw_sqlite_pool(key: impl Into<String>) -> anyhow::Result<(SqlitePool, SqlitePool)> {
    let key = key.into();
    let key = format!("rw_{key}");
    let url = format!("sqlite:../target/tmp/test_sql_{key}.db");

    let w = create_sqlite_pool(key).await?;
    sqlx::query("PRAGMA journal_mode = WAL").execute(&w).await?;
    sqlx::query("PRAGMA busy_timeout = 5000")
        .execute(&w)
        .await?;
    sqlx::query("PRAGMA synchronous = NORMAL")
        .execute(&w)
        .await?;
    sqlx::query("PRAGMA cache_size = -20000")
        .execute(&w)
        .await?;
    sqlx::query("PRAGMA foreign_keys = true")
        .execute(&w)
        .await?;
    sqlx::query("PRAGMA temp_store = memory")
        .execute(&w)
        .await?;

    let options = SqliteConnectOptions::from_str(&url)?.read_only(true);

    let r = SqlitePoolOptions::new().connect_with(options).await?;
    sqlx::query("PRAGMA journal_mode = WAL").execute(&r).await?;
    sqlx::query("PRAGMA busy_timeout = 5000")
        .execute(&r)
        .await?;
    sqlx::query("PRAGMA synchronous = NORMAL")
        .execute(&r)
        .await?;
    sqlx::query("PRAGMA cache_size = -20000")
        .execute(&r)
        .await?;
    sqlx::query("PRAGMA foreign_keys = true")
        .execute(&r)
        .await?;
    sqlx::query("PRAGMA temp_store = memory")
        .execute(&r)
        .await?;

    Ok((r, w))
}

async fn create_sqlite_pool(key: impl Into<String>) -> anyhow::Result<SqlitePool> {
    let key = key.into();
    let url = format!("sqlite:../target/tmp/test_sql_{key}.db");

    pool::create_pool(url).await
}