thal 0.0.1

Reactive semantic runtime — molecules, reactions, and effect actors for building LLM-backed applications as dataflow programs.
Documentation
use std::time::Duration;
use thal::runtime::Molecule;
use thal::value::Value;
use thal::Reactor;

const PROGRAM: &str = r#"
molecule Failure {
    id: Int
    ts: Timestamp
    primary_key: id
}

molecule Edit {
    id: Int
    ts: Timestamp
    primary_key: id
}

molecule Suspect {
    failure_id: Int
    edit_id:    Int
    primary_key: (failure_id, edit_id)
}

reaction FindSuspects {
    when:  Failure(f), Edit(e)
    where: e.ts > f.ts
    emit:  Suspect { failure_id: f.id, edit_id: e.id }
}
"#;

fn failure(id: i64, ts: i64) -> Molecule {
    Molecule::builder("Failure")
        .field("id", Value::Int(id))
        .field("ts", Value::Timestamp(ts))
        .build()
}

fn edit(id: i64, ts: i64) -> Molecule {
    Molecule::builder("Edit")
        .field("id", Value::Int(id))
        .field("ts", Value::Timestamp(ts))
        .build()
}

#[tokio::test(flavor = "multi_thread")]
async fn join_with_where_filter() {
    let program = thal::load_str(PROGRAM).expect("load");
    let reactor = Reactor::new(program);
    let store = reactor.store();
    let handle = reactor.handle();
    let task = tokio::spawn(reactor.run());

    // Failures at ts=100 and ts=200.
    handle.emit(failure(1, 100)).await.expect("emit f1");
    handle.emit(failure(2, 200)).await.expect("emit f2");

    // Edits at ts=50 (before all), 150 (after f1 only), 300 (after both).
    handle.emit(edit(10, 50)).await.expect("emit e10");
    handle.emit(edit(11, 150)).await.expect("emit e11");
    handle.emit(edit(12, 300)).await.expect("emit e12");

    // Let the cross product settle.
    tokio::time::sleep(Duration::from_millis(100)).await;
    task.abort();

    // Expected suspects (where edit.ts > failure.ts):
    //   (f1=100, e11=150) ✓
    //   (f1=100, e12=300) ✓
    //   (f2=200, e12=300) ✓
    // Filtered out:
    //   (_,    e10=50)   too early
    //   (f2,   e11=150)  edit predates failure
    let suspects = store.scan_by_name("Suspect");
    assert_eq!(suspects.len(), 3, "got: {suspects:#?}");

    let mut pairs: Vec<(i64, i64)> = suspects
        .iter()
        .map(|s| {
            let f = s.fields["failure_id"].as_int().expect("failure_id is Int");
            let e = s.fields["edit_id"].as_int().expect("edit_id is Int");
            (f, e)
        })
        .collect();
    pairs.sort();
    assert_eq!(pairs, vec![(1, 11), (1, 12), (2, 12)]);
}