batpak 0.5.0

Event sourcing with causal graphs and policy gates. Sync API, zero async.
Documentation
use batpak::prelude::*;
use batpak::store::cursor::{CursorWorkerAction, CursorWorkerConfig};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

#[allow(clippy::print_stdout)]
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let dir = tempfile::tempdir()?;
    let store = Arc::new(Store::open(StoreConfig::new(dir.path()))?);
    let coord = Coordinate::new("player:cursor", "room:worker")?;
    let kind = EventKind::custom(0xF, 6);
    let processed = Arc::new(AtomicUsize::new(0));

    let worker = store.cursor_worker(
        &Region::entity("player:cursor"),
        CursorWorkerConfig {
            batch_size: 1,
            idle_sleep: Duration::from_millis(5),
            ..CursorWorkerConfig::default()
        },
        {
            let processed = Arc::clone(&processed);
            move |_batch, _store| {
                let seen = processed.fetch_add(1, Ordering::SeqCst) + 1;
                if seen >= 3 {
                    CursorWorkerAction::Stop
                } else {
                    CursorWorkerAction::Continue
                }
            }
        },
    )?;

    for n in 0..3u32 {
        store.append(&coord, kind, &serde_json::json!({"n": n}))?;
    }

    let deadline = Instant::now() + Duration::from_secs(5);
    while processed.load(Ordering::SeqCst) < 3 {
        if Instant::now() >= deadline {
            return Err("cursor worker example timed out".into());
        }
        std::thread::sleep(Duration::from_millis(10));
    }

    worker.join()?;
    println!(
        "cursor worker processed {} event(s) through the guaranteed-delivery path",
        processed.load(Ordering::SeqCst)
    );

    Ok(())
}