#![allow(clippy::print_stdout, clippy::disallowed_methods)]
use batpak::event::{StoredEvent, TypedReactive};
use batpak::prelude::*;
use batpak::store::{CausationRef, ReactionBatch, ReactorConfig};
use std::sync::Arc;
use std::time::{Duration, Instant};
fn wait_for(cond: impl Fn() -> bool, timeout: Duration) -> Result<(), &'static str> {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if cond() {
return Ok(());
}
std::thread::sleep(Duration::from_millis(10));
}
Err("timed out waiting for condition")
}
#[derive(serde::Serialize, serde::Deserialize, EventPayload)]
#[batpak(category = 0xA, type_id = 1)]
struct PayloadA {
n: u64,
}
#[derive(serde::Serialize, serde::Deserialize, EventPayload)]
#[batpak(category = 0xA, type_id = 2)]
struct PayloadB {
derived_from: u64,
doubled: u64,
}
struct Doubler {
seen: u64,
reaction_coord: Coordinate,
}
impl TypedReactive<PayloadA> for Doubler {
type Error = StoreError;
fn react(
&mut self,
event: &StoredEvent<PayloadA>,
out: &mut ReactionBatch,
_witness: Option<&batpak::store::AtLeastOnce>,
) -> Result<(), Self::Error> {
self.seen += 1;
out.push_typed(
self.reaction_coord.clone(),
&PayloadB {
derived_from: event.event.payload.n,
doubled: event.event.payload.n * 2,
},
CausationRef::None,
)
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let dir = tempfile::tempdir()?;
let store = Arc::new(Store::open(StoreConfig::new(dir.path()))?);
let reaction_coord = Coordinate::new("entity:reactions", "scope:example")?;
let source = Coordinate::new("entity:sources", "scope:example")?;
let handle = store.react_loop_typed::<PayloadA, _>(
&Region::all(),
ReactorConfig::default(),
Doubler {
seen: 0,
reaction_coord,
},
)?;
for n in [1, 2, 3, 7] {
store.append_typed(&source, &PayloadA { n })?;
}
wait_for(
|| store.by_fact_typed::<PayloadB>().len() >= 4,
Duration::from_secs(5),
)
.expect("reactor reacted in time");
if let Err(e) = handle.stop_and_join() {
return Err(format!("reactor join failed: {e}").into());
}
let reactions = store.by_fact_typed::<PayloadB>();
println!(
"Typed reactor emitted {} reactions for 4 source events:",
reactions.len()
);
for entry in &reactions {
let stored = store.get(batpak::id::EventId::from(entry.event_id()))?;
println!(
" reaction event_id={} payload={}",
entry.event_id(),
stored.event.payload
);
}
drop(store);
Ok(())
}