eventful-sql-server 0.2.1

A library for event sourcing in Rust
Documentation
use bb8::Pool;
use bb8_tiberius::ConnectionManager;
use chrono::Utc;
use eventful::event_sourced_system::EventSourcedSystem;
use eventful::{JournalStore, SnapshotStore};
use eventful_sql_server::{build_connection, SqlServerJournalStore, SqlServerSnapshotStore};
use eventful_test_shared::{CatCommand, CatEvent, CatModel, InMemoryAnimalCache, InMemoryAnimalPersistenceStore, TestAnimalStreamContext};
use tokio::sync::OnceCell;

static INIT_DATABASE: OnceCell<Result<(), String>> = OnceCell::const_new();

async fn init_store<'a>() -> Result<(SqlServerJournalStore, SqlServerSnapshotStore), String> {
    async fn initialise_test_database(pool: &Pool<ConnectionManager>) -> Result<(), String> {
        println!("Initialising test database");
        let mut conn = pool.get().await.map_err(|e| e.to_string())?;
        conn.execute("delete from journal", &[]).await.map_err(|e| e.to_string())?;
        conn.execute("delete from snapshot", &[]).await.map_err(|e| e.to_string())?;
        conn.execute("delete from named_offset", &[]).await.map_err(|e| e.to_string())?;
        println!("Built test connection pool and initialised test database");
        Ok(())
    }

    println!("Building test connection pool");

    let pool = build_connection(&std::env::var("SQL_SERVER_TEST_DATABASE").expect("SQL_SERVER_TEST_DATABASE environment variable must be set"))
        .await
        .map_err(|e| e.to_string())?;

    INIT_DATABASE.get_or_init(|| async { initialise_test_database(&pool).await }).await;

    Ok((SqlServerJournalStore { pool: pool.clone() }, SqlServerSnapshotStore { pool: pool.clone() }))
}

async fn can_get_initial_sequence_number_for_empty_database(store: &SqlServerJournalStore) -> Result<(), String> {
    let initial_sequence_number = store.get_initial_offset().await.map_err(|e| e.to_string())?;
    assert_eq!(0, initial_sequence_number, "expected initial sequence number to be 0");
    Ok(())
}

const ENT1: &str = "entity_type_1";
const ENT2: &str = "entity_type_2";
const EVT1: &str = "event_type_1";
const EVT2: &str = "event_type_2";
const EVT3: &str = "event_type_3";
const EVD1: &[u8] = "event_data_1".as_bytes();
const EVD2: &[u8] = "event_data_2".as_bytes();
const EVD3: &[u8] = "event_data_3".as_bytes();
const EVD4: &[u8] = "event_data_4".as_bytes();
const EVD5: &[u8] = "event_data_5".as_bytes();
const PID1: &str = "persistence_id_1";
const PID2: &str = "persistence_id_2";

async fn can_write_journal_events(store: &SqlServerJournalStore) -> Result<(), String> {
    let dt = Utc::now();
    store.persist_event_to_journal(ENT1, EVT1, &dt, EVD1, PID1, 0).await.map_err(|e| e.to_string())?;
    store.persist_event_to_journal(ENT1, EVT2, &dt, EVD2, PID1, 1).await.map_err(|e| e.to_string())?;
    store.persist_event_to_journal(ENT1, EVT1, &dt, EVD3, PID2, 2).await.map_err(|e| e.to_string())?;
    store.persist_event_to_journal(ENT2, EVT3, &dt, EVD4, PID1, 3).await.map_err(|e| e.to_string())?;
    store.persist_event_to_journal(ENT1, EVT3, &dt, EVD5, PID1, 4).await.map_err(|e| e.to_string())?;
    Ok(())
}

async fn can_read_all_journal_events(store: &SqlServerJournalStore) -> Result<(), String> {
    let evs = store.get_events_from_journal(0).await.map_err(|e| e.to_string())?;
    assert_eq!(evs.len(), 5);
    assert_eq!(evs[0], (0, EVD1.to_vec()));
    assert_eq!(evs[1], (1, EVD2.to_vec()));
    assert_eq!(evs[2], (2, EVD3.to_vec()));
    assert_eq!(evs[3], (3, EVD4.to_vec()));
    assert_eq!(evs[4], (4, EVD5.to_vec()));
    Ok(())
}

async fn can_read_journal_events_from_offset(store: &SqlServerJournalStore) -> Result<(), String> {
    let evs = store.get_events_from_journal(2).await.map_err(|e| e.to_string())?;
    assert_eq!(evs.len(), 3);
    assert_eq!(evs[0], (2, EVD3.to_vec()));
    assert_eq!(evs[1], (3, EVD4.to_vec()));
    assert_eq!(evs[2], (4, EVD5.to_vec()));
    Ok(())
}

async fn can_get_events_for_entity(store: &SqlServerJournalStore) -> Result<(), String> {
    let entity_events_1 = store.load_entity_events(ENT1, PID1, 0).await.map_err(|e| e.to_string())?;
    assert_eq!(entity_events_1.len(), 3);
    assert_eq!(entity_events_1[0], EVD1.to_vec());
    assert_eq!(entity_events_1[1], EVD2.to_vec());
    assert_eq!(entity_events_1[2], EVD5.to_vec());

    let entity_events_2 = store.load_entity_events(ENT1, PID2, 0).await.map_err(|e| e.to_string())?;
    assert_eq!(entity_events_2.len(), 1);
    assert_eq!(entity_events_2[0], EVD3.to_vec());

    let entity_events_3 = store.load_entity_events(ENT2, PID1, 0).await.map_err(|e| e.to_string())?;
    assert_eq!(entity_events_3.len(), 1);
    assert_eq!(entity_events_3[0], EVD4.to_vec());

    Ok(())
}

#[tokio::test]
async fn can_write_and_read_snapshots() -> Result<(), String> {
    let (_, snapshot_store) = init_store().await?;
    let sd1 = "This is the first snapshot data for entity type 1";
    let sd2 = "This is the second snapshot data for entity type 1";
    let sd3 = "This is the first snapshot data for entity type 2";
    let sd4 = "This is the second snapshot data for entity type 2";
    snapshot_store.write_snapshot("entity-type-1|sd-1", 0, &sd1).await.map_err(|e| e.to_string())?;
    snapshot_store.write_snapshot("entity-type-1|sd-2", 0, &sd2).await.map_err(|e| e.to_string())?;
    snapshot_store.write_snapshot("entity-type-2|sd-1", 0, &sd3).await.map_err(|e| e.to_string())?;
    snapshot_store.write_snapshot("entity-type-2|sd-2", 0, &sd4).await.map_err(|e| e.to_string())?;
    // assert_eq!(snapshot_store.read_snapshot::<String>("entity-type-1|sd-1").await.map_err(|e| e.to_string())?.unwrap().value, sd1.to_string());
    // assert_eq!(snapshot_store.read_snapshot::<String>("entity-type-1|sd-2").await.map_err(|e| e.to_string())?.unwrap().value, sd2.to_string());
    // assert_eq!(snapshot_store.read_snapshot::<String>("entity-type-2|sd-1").await.map_err(|e| e.to_string())?.unwrap().value, sd3.to_string());
    // assert_eq!(snapshot_store.read_snapshot::<String>("entity-type-2|sd-2").await.map_err(|e| e.to_string())?.unwrap().value, sd4.to_string());
    Ok(())
}

#[tokio::test]
async fn journal_tests() -> Result<(), String> {
    let (journal_store, _) = init_store().await?;
    can_get_initial_sequence_number_for_empty_database(&journal_store).await?;
    can_write_journal_events(&journal_store).await?;
    can_read_all_journal_events(&journal_store).await?;
    can_read_journal_events_from_offset(&journal_store).await?;
    can_get_events_for_entity(&journal_store).await?;
    Ok(())
}