factstr-sqlite 0.5.0

Embedded SQLite store for FACTSTR with append, query, streams, and durable streams.
Documentation
mod support;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

use factstr::{EventFilter, EventQuery, EventStore, HandleStream, NewEvent};
use factstr_conformance as store_conformance;
use factstr_sqlite::SqliteStore;
use serde_json::json;

use support::TemporaryDatabaseFile;

#[test]
fn stream_all_handler_receives_a_future_committed_batch() {
    support::run_store_test(
        store_conformance::stream_all_handler_receives_a_future_committed_batch,
    );
}

#[test]
fn stream_does_not_replay_historical_events() {
    support::run_store_test(store_conformance::stream_does_not_replay_historical_events);
}

#[test]
fn two_streams_receive_the_same_committed_batches() {
    support::run_store_test(store_conformance::two_streams_receive_the_same_committed_batches);
}

#[test]
fn stream_batches_arrive_in_commit_order() {
    support::run_store_test(store_conformance::stream_batches_arrive_in_commit_order);
}

#[test]
fn append_if_conflict_emits_no_delivery() {
    support::run_store_test(store_conformance::append_if_conflict_emits_no_delivery);
}

#[test]
fn unsubscribing_one_stream_does_not_break_delivery_for_others() {
    support::run_store_test(
        store_conformance::unsubscribing_one_stream_does_not_break_delivery_for_others,
    );
}

#[test]
fn stream_delivery_preserves_the_committed_batch_shape() {
    support::run_store_test(store_conformance::stream_delivery_preserves_the_committed_batch_shape);
}

#[test]
fn filtered_stream_with_event_type_receives_only_matching_future_events() {
    support::run_store_test(
        store_conformance::filtered_stream_with_event_type_receives_only_matching_future_events,
    );
}

#[test]
fn filtered_stream_with_payload_predicate_receives_only_matching_future_events() {
    support::run_store_test(
        store_conformance::filtered_stream_with_payload_predicate_receives_only_matching_future_events,
    );
}

#[test]
fn filtered_stream_non_matching_commit_produces_no_delivery() {
    support::run_store_test(
        store_conformance::filtered_stream_non_matching_commit_produces_no_delivery,
    );
}

#[test]
fn filtered_stream_mixed_committed_batch_yields_one_filtered_batch() {
    support::run_store_test(
        store_conformance::filtered_stream_mixed_committed_batch_yields_one_filtered_batch,
    );
}

#[test]
fn filtered_stream_preserves_event_order_inside_delivered_batch() {
    support::run_store_test(
        store_conformance::filtered_stream_preserves_event_order_inside_delivered_batch,
    );
}

#[test]
fn append_if_conflict_emits_no_filtered_stream_delivery() {
    support::run_store_test(
        store_conformance::append_if_conflict_emits_no_filtered_stream_delivery,
    );
}

#[test]
fn differently_filtered_streams_observe_the_same_commit_differently() {
    support::run_store_test(
        store_conformance::differently_filtered_streams_observe_the_same_commit_differently,
    );
}

#[test]
fn handler_failure_does_not_roll_back_append_success() {
    support::run_store_test(store_conformance::handler_failure_does_not_roll_back_append_success);
}

#[test]
fn append_if_success_delivers_one_committed_batch() {
    let database_file = TemporaryDatabaseFile::new("subscriptions-append-if-success");
    let store = SqliteStore::open(database_file.path()).expect("sqlite store should open");
    let delivery_log = Arc::new(Mutex::new(Vec::new()));

    let _subscription = store
        .stream_all(recording_handle(Arc::clone(&delivery_log)))
        .expect("subscribe_all should succeed");

    store
        .append(vec![NewEvent::new(
            "account-opened",
            json!({ "accountId": "a1" }),
        )])
        .expect("append should succeed");

    let append_result = store
        .append_if(
            vec![
                NewEvent::new("account-renamed", json!({ "accountId": "a1" })),
                NewEvent::new("account-credited", json!({ "accountId": "a1" })),
            ],
            &EventQuery::all().with_filters([
                EventFilter::default().with_payload_predicates([json!({ "accountId": "a1" })])
            ]),
            Some(1),
        )
        .expect("conditional append should succeed");

    assert_eq!(append_result.first_sequence_number, 2);
    assert_eq!(append_result.last_sequence_number, 3);

    let delivered_batches = wait_for_delivery_count(&delivery_log, 2);
    assert_eq!(delivered_batches.len(), 2);
    assert_eq!(delivered_batches[1].len(), 2);
    assert_eq!(delivered_batches[1][0].sequence_number, 2);
    assert_eq!(delivered_batches[1][1].sequence_number, 3);
}

#[test]
fn empty_append_emits_no_delivery() {
    let database_file = TemporaryDatabaseFile::new("subscriptions-empty-append");
    let store = SqliteStore::open(database_file.path()).expect("sqlite store should open");
    let delivery_log = Arc::new(Mutex::new(Vec::new()));

    let _subscription = store
        .stream_all(recording_handle(Arc::clone(&delivery_log)))
        .expect("subscribe_all should succeed");

    let error = store
        .append(Vec::new())
        .expect_err("empty append should fail");
    assert_eq!(error, factstr::EventStoreError::EmptyAppend);
    assert_no_delivery(&delivery_log);
}

#[test]
fn already_snapshotted_delivery_may_arrive_after_unsubscribe_but_future_commits_do_not() {
    let database_file = TemporaryDatabaseFile::new("subscriptions-unsubscribe-snapshot");
    let store = SqliteStore::open(database_file.path()).expect("sqlite store should open");
    let delivered_batches = Arc::new(Mutex::new(Vec::new()));
    let invocation_count = Arc::new(AtomicUsize::new(0));
    let (first_handler_started_sender, first_handler_started_receiver) = mpsc::channel();
    let release_first_handler = Arc::new((Mutex::new(false), std::sync::Condvar::new()));

    let subscription = store
        .stream_all(HandleStream::new({
            let delivered_batches = Arc::clone(&delivered_batches);
            let invocation_count = Arc::clone(&invocation_count);
            let release_first_handler = Arc::clone(&release_first_handler);
            move |event_records| {
                let delivered_batches = Arc::clone(&delivered_batches);
                let invocation_count = Arc::clone(&invocation_count);
                let release_first_handler = Arc::clone(&release_first_handler);
                let first_handler_started_sender = first_handler_started_sender.clone();

                async move {
                    delivered_batches
                        .lock()
                        .expect("delivery log lock should succeed")
                        .push(event_records);

                    if invocation_count.fetch_add(1, Ordering::SeqCst) == 0 {
                        let _ = first_handler_started_sender.send(());
                        let (lock, condvar) = &*release_first_handler;
                        let mut released = lock.lock().expect("handler gate lock should succeed");
                        while !*released {
                            released = condvar
                                .wait(released)
                                .expect("handler gate wait should succeed");
                        }
                    }

                    Ok(())
                }
            }
        }))
        .expect("subscribe_all should succeed");

    store
        .append(vec![NewEvent::new(
            "account-opened",
            json!({ "accountId": "a1" }),
        )])
        .expect("first append should succeed");

    first_handler_started_receiver
        .recv_timeout(Duration::from_secs(1))
        .expect("first delivery should start and block");

    store
        .append(vec![NewEvent::new(
            "account-credited",
            json!({ "accountId": "a1" }),
        )])
        .expect("second append should succeed while the first handler is blocked");

    subscription.unsubscribe();

    store
        .append(vec![NewEvent::new(
            "account-closed",
            json!({ "accountId": "a1" }),
        )])
        .expect("third append should succeed after unsubscribe");

    let (lock, condvar) = &*release_first_handler;
    *lock.lock().expect("handler gate lock should succeed") = true;
    condvar.notify_all();

    let deadline = Instant::now() + Duration::from_secs(1);
    loop {
        let delivered_batches = delivered_batches
            .lock()
            .expect("delivery log lock should succeed")
            .clone();

        if delivered_batches.len() == 2 {
            let delivered_sequences = delivered_batches
                .iter()
                .map(|batch| batch[0].sequence_number)
                .collect::<Vec<_>>();
            assert_eq!(delivered_sequences, vec![1, 2]);
            return;
        }

        assert!(
            Instant::now() < deadline,
            "expected exactly two delivered batches after unsubscribe, got {}",
            delivered_batches.len()
        );

        thread::sleep(Duration::from_millis(10));
    }
}

#[test]
fn panicking_handler_does_not_roll_back_append_success() {
    let database_file = TemporaryDatabaseFile::new("subscriptions-panic-isolation");
    let store = SqliteStore::open(database_file.path()).expect("sqlite store should open");
    let successful_delivery_log = Arc::new(Mutex::new(Vec::new()));

    let _panicking_subscription = store
        .stream_all(HandleStream::new(|_| async move {
            panic!("expected sqlite subscription panic")
        }))
        .expect("subscribe_all should succeed");
    let _successful_subscription = store
        .stream_all(recording_handle(Arc::clone(&successful_delivery_log)))
        .expect("subscribe_all should succeed");

    let append_result = store
        .append(vec![NewEvent::new(
            "account-opened",
            json!({ "accountId": "a1" }),
        )])
        .expect("append should still succeed");

    assert_eq!(append_result.first_sequence_number, 1);
    assert_eq!(append_result.last_sequence_number, 1);

    let query_result = store
        .query(&EventQuery::all())
        .expect("query should still succeed after handler panic");
    assert_eq!(query_result.event_records.len(), 1);
    assert_eq!(query_result.event_records[0].sequence_number, 1);

    let delivered_batches = wait_for_delivery_count(&successful_delivery_log, 1);
    assert_eq!(delivered_batches.len(), 1);
    assert_eq!(delivered_batches[0].len(), 1);
    assert_eq!(delivered_batches[0][0].sequence_number, 1);
}

fn recording_handle(
    delivery_log: Arc<Mutex<Vec<Vec<factstr::EventRecord>>>>,
) -> factstr::HandleStream {
    factstr::HandleStream::new(move |event_records| {
        let delivery_log = Arc::clone(&delivery_log);

        async move {
            delivery_log
                .lock()
                .expect("delivery log lock should succeed")
                .push(event_records);
            Ok(())
        }
    })
}

fn delivered_batches(
    delivery_log: &Arc<Mutex<Vec<Vec<factstr::EventRecord>>>>,
) -> Vec<Vec<factstr::EventRecord>> {
    delivery_log
        .lock()
        .expect("delivery log lock should succeed")
        .clone()
}

fn wait_for_delivery_count(
    delivery_log: &Arc<Mutex<Vec<Vec<factstr::EventRecord>>>>,
    expected_count: usize,
) -> Vec<Vec<factstr::EventRecord>> {
    let deadline = Instant::now() + Duration::from_secs(1);

    loop {
        let delivered_batches = delivered_batches(delivery_log);
        if delivered_batches.len() == expected_count {
            return delivered_batches;
        }

        assert!(
            Instant::now() < deadline,
            "expected {expected_count} delivered batches, got {}",
            delivered_batches.len()
        );

        thread::sleep(Duration::from_millis(10));
    }
}

fn assert_no_delivery(delivery_log: &Arc<Mutex<Vec<Vec<factstr::EventRecord>>>>) {
    let deadline = Instant::now() + Duration::from_millis(100);

    while Instant::now() < deadline {
        let delivered_batches = delivered_batches(delivery_log);
        assert!(
            delivered_batches.is_empty(),
            "expected no delivered batches, got {}",
            delivered_batches.len()
        );
        thread::sleep(Duration::from_millis(5));
    }
}