factstr-sqlite 0.4.0

Embedded SQLite store for FACTSTR with append, query, streams, and durable streams.
Documentation
mod support;

use std::sync::{
    Arc,
    atomic::{AtomicBool, Ordering},
};
use std::thread;

use factstr::{EventFilter, EventQuery, EventStore, NewEvent};
use factstr_conformance as store_conformance;
use factstr_sqlite::SqliteStore;
use serde_json::json;

use support::TemporaryDatabaseFile;

#[test]
fn query_returns_events_in_ascending_order() {
    support::run_store_test(store_conformance::query_returns_events_in_ascending_order);
}

#[test]
fn query_records_include_occurred_at() {
    support::run_store_test(store_conformance::query_records_include_occurred_at);
}

#[test]
fn query_with_min_sequence_number_only_returns_events_after_that_sequence() {
    support::run_store_test(
        store_conformance::query_with_min_sequence_number_only_returns_events_after_that_sequence,
    );
}

#[test]
fn query_against_an_empty_store_returns_explicit_empty_result() {
    support::run_store_test(
        store_conformance::query_against_an_empty_store_returns_explicit_empty_result,
    );
}

#[test]
fn current_context_version_for_payload_filtered_queries_uses_the_full_matching_context() {
    support::run_store_test(
        store_conformance::current_context_version_for_payload_filtered_queries_uses_the_full_matching_context,
    );
}

#[test]
fn min_sequence_number_does_not_narrow_current_context_version_for_payload_filtered_queries() {
    support::run_store_test(
        store_conformance::min_sequence_number_does_not_narrow_current_context_version_for_payload_filtered_queries,
    );
}

#[test]
fn all_events_query_and_filtered_query_report_their_own_context_versions() {
    support::run_store_test(
        store_conformance::all_events_query_and_filtered_query_report_their_own_context_versions,
    );
}

#[test]
fn query_result_stays_self_consistent_while_concurrent_appends_happen() {
    let database_file = TemporaryDatabaseFile::new("query-snapshot-consistency");
    let query_store = SqliteStore::open(database_file.path()).expect("sqlite store should open");
    let append_store = SqliteStore::open(database_file.path()).expect("sqlite store should open");

    query_store
        .append(vec![
            NewEvent::new("account-opened", json!({ "accountId": "a1" })),
            NewEvent::new("account-opened", json!({ "accountId": "a2" })),
        ])
        .expect("initial append should succeed");

    let should_stop = Arc::new(AtomicBool::new(false));
    let append_should_stop = Arc::clone(&should_stop);

    let append_thread = thread::spawn(move || {
        let mut next_account_number = 3_u64;

        while !append_should_stop.load(Ordering::Relaxed) {
            append_store
                .append(vec![NewEvent::new(
                    "account-opened",
                    json!({ "accountId": format!("a{next_account_number}") }),
                )])
                .expect("concurrent append should succeed");
            next_account_number += 1;
        }
    });

    let filtered_query = EventQuery::all()
        .with_filters([EventFilter::for_event_types(["account-opened"])])
        .with_min_sequence_number(1);

    for _ in 0..200 {
        let query_result = query_store
            .query(&filtered_query)
            .expect("query should succeed while appends happen");

        if let Some(last_returned_sequence_number) = query_result.last_returned_sequence_number {
            let current_context_version = query_result
                .current_context_version
                .expect("matching context should exist when rows were returned");

            assert!(
                last_returned_sequence_number <= current_context_version,
                "query result combined inconsistent snapshots: last returned sequence was {last_returned_sequence_number}, current context version was {current_context_version}"
            );
        }
    }

    should_stop.store(true, Ordering::Relaxed);
    append_thread
        .join()
        .expect("append thread should stop without panicking");
}