sim-kernel 0.1.0-rc.1

SIM workspace package for sim kernel.
Documentation
use crate::{Cx, Error, Expr, NumberLiteral, Symbol};

use super::{
    CatalogRow, CatalogSnapshot, CatalogStore, CatalogTableSpec, CatalogTx, CatalogWritePolicy,
};

fn sym(name: &str) -> Symbol {
    Symbol::new(name)
}

fn table(name: &str) -> Symbol {
    Symbol::qualified("delta-test", name)
}

fn field(name: &str) -> Symbol {
    sym(name)
}

fn number(value: u64) -> Expr {
    Expr::Number(NumberLiteral {
        domain: Symbol::qualified("numbers", "i64"),
        canonical: value.to_string(),
    })
}

fn spec(name: &str, policy: CatalogWritePolicy) -> CatalogTableSpec {
    CatalogTableSpec::new(table(name), policy).with_required_fields(vec![field("value")])
}

fn row(table: &Symbol, key: &str, value: u64) -> CatalogRow {
    CatalogRow::new(table.clone(), sym(key)).with_data(field("value"), number(value))
}

fn commit_rows(store: &mut CatalogStore, rows: Vec<CatalogRow>) {
    let mut tx = CatalogTx::new();
    for row in rows {
        tx.put_row(row);
    }
    tx.commit(store).unwrap();
}

#[test]
fn delta_from_empty_to_loaded_registry_applies_cleanly() {
    let cx = Cx::stub();
    let loaded = CatalogStore::from_snapshot(cx.registry().catalog_snapshot()).unwrap();
    let delta = loaded.delta_since(0).unwrap();
    let mut replica = CatalogStore::new();

    replica.apply_delta(delta).unwrap();

    assert_eq!(
        CatalogSnapshot::from_store(&replica),
        CatalogSnapshot::from_store(&loaded)
    );
}

#[test]
fn stale_delta_is_rejected_without_mutating_target() {
    let mut source = CatalogStore::new();
    let table = table("stale");
    source
        .install_table(spec("stale", CatalogWritePolicy::Mutable))
        .unwrap();
    commit_rows(&mut source, vec![row(&table, "row", 1)]);
    let delta = source.delta_since(0).unwrap();
    let mut target = CatalogStore::new();
    target.epoch = 1;

    let before = CatalogSnapshot::from_store(&target);
    let err = target.apply_delta(delta).unwrap_err();

    assert!(matches!(
        err,
        Error::CatalogSchema { table, message }
            if table == Symbol::qualified("catalog", "delta")
                && message == "catalog delta source epoch mismatch"
    ));
    assert_eq!(CatalogSnapshot::from_store(&target), before);
}

#[test]
fn sealed_conflict_is_rejected_without_mutating_target() {
    let mut source = CatalogStore::new();
    let table = table("sealed");
    source
        .install_table(spec("sealed", CatalogWritePolicy::Sealed))
        .unwrap();
    commit_rows(&mut source, vec![row(&table, "row", 2)]);
    let delta = source.delta_since(0).unwrap();

    let mut target = CatalogStore::new();
    target
        .install_table(spec("sealed", CatalogWritePolicy::Sealed))
        .unwrap();
    target
        .rows
        .entry(table.clone())
        .or_default()
        .insert(sym("row"), row(&table, "row", 1));
    let before = CatalogSnapshot::from_store(&target);

    let err = target.apply_delta(delta).unwrap_err();

    assert!(matches!(
        err,
        Error::CatalogConflict {
            table: conflict_table,
            key
        } if conflict_table == table && key == sym("row")
    ));
    assert_eq!(CatalogSnapshot::from_store(&target), before);
}

#[test]
fn applying_incremental_delta_then_snapshot_equals_direct_snapshot() {
    let mut source = CatalogStore::new();
    let table = table("incremental");
    let sequence = sym("delta-seq/incremental");
    source
        .install_table(spec("incremental", CatalogWritePolicy::Mutable))
        .unwrap();
    commit_rows(&mut source, vec![row(&table, "first", 1)]);

    let mut replica = CatalogStore::new();
    replica.apply_delta(source.delta_since(0).unwrap()).unwrap();

    let mut tx = CatalogTx::new();
    tx.put_row(row(&table, "first", 2));
    tx.put_row(row(&table, "second", 3));
    tx.bump_sequence(sequence.clone(), 5);
    tx.commit(&mut source).unwrap();

    let delta = source.delta_since(replica.epoch()).unwrap();
    replica.apply_delta(delta).unwrap();

    assert_eq!(
        CatalogSnapshot::from_store(&replica),
        CatalogSnapshot::from_store(&source)
    );
    assert_eq!(replica.sequence(&sequence), Some(5));
}

#[test]
fn incompatible_table_specs_are_rejected() {
    let mut source = CatalogStore::new();
    let table = table("spec");
    source
        .install_table(spec("spec", CatalogWritePolicy::Mutable))
        .unwrap();
    commit_rows(&mut source, vec![row(&table, "row", 1)]);
    let delta = source.delta_since(0).unwrap();

    let mut target = CatalogStore::new();
    target
        .install_table(spec("spec", CatalogWritePolicy::Sealed))
        .unwrap();
    let err = target.apply_delta(delta).unwrap_err();

    assert!(matches!(
        err,
        Error::CatalogSchema {
            table: spec_table,
            message
        } if spec_table == table && message == "incompatible catalog table spec"
    ));
}