use std::time::Duration;
use thal::runtime::Molecule;
use thal::value::Value;
use thal::Reactor;
const PROGRAM: &str = r#"
molecule Failure {
id: Int
ts: Timestamp
primary_key: id
}
molecule Edit {
id: Int
ts: Timestamp
primary_key: id
}
molecule Suspect {
failure_id: Int
edit_id: Int
primary_key: (failure_id, edit_id)
}
reaction FindSuspects {
when: Failure(f), Edit(e)
where: e.ts > f.ts
emit: Suspect { failure_id: f.id, edit_id: e.id }
}
"#;
fn failure(id: i64, ts: i64) -> Molecule {
Molecule::builder("Failure")
.field("id", Value::Int(id))
.field("ts", Value::Timestamp(ts))
.build()
}
fn edit(id: i64, ts: i64) -> Molecule {
Molecule::builder("Edit")
.field("id", Value::Int(id))
.field("ts", Value::Timestamp(ts))
.build()
}
#[tokio::test(flavor = "multi_thread")]
async fn join_with_where_filter() {
let program = thal::load_str(PROGRAM).expect("load");
let reactor = Reactor::new(program);
let store = reactor.store();
let handle = reactor.handle();
let task = tokio::spawn(reactor.run());
handle.emit(failure(1, 100)).await.expect("emit f1");
handle.emit(failure(2, 200)).await.expect("emit f2");
handle.emit(edit(10, 50)).await.expect("emit e10");
handle.emit(edit(11, 150)).await.expect("emit e11");
handle.emit(edit(12, 300)).await.expect("emit e12");
tokio::time::sleep(Duration::from_millis(100)).await;
task.abort();
let suspects = store.scan_by_name("Suspect");
assert_eq!(suspects.len(), 3, "got: {suspects:#?}");
let mut pairs: Vec<(i64, i64)> = suspects
.iter()
.map(|s| {
let f = s.fields["failure_id"].as_int().expect("failure_id is Int");
let e = s.fields["edit_id"].as_int().expect("edit_id is Int");
(f, e)
})
.collect();
pairs.sort();
assert_eq!(pairs, vec![(1, 11), (1, 12), (2, 12)]);
}