#![allow(
clippy::unwrap_used,
clippy::disallowed_methods,
clippy::cast_possible_truncation,
clippy::needless_borrows_for_generic_args,
clippy::panic
)]
use batpak::event::Reactive;
mod support;
use batpak::store::{Store, StoreConfig, StoreError};
use std::sync::Arc;
use support::prelude::*;
use tempfile::TempDir;
#[path = "support/small_store.rs"]
mod small_store_support;
fn test_store() -> (TempDir, Store) {
small_store_support::small_segment_store().expect("small segment store")
}
#[test]
fn pipeline_commit_bypass_persists() {
use batpak::pipeline::bypass::BypassReason;
struct TestBypass;
impl BypassReason for TestBypass {
fn name(&self) -> &'static str {
"test-bypass"
}
fn justification(&self) -> &'static str {
"testing commit_bypass"
}
}
let (_dir, store) = test_store();
let coord = Coordinate::new("entity:bypass", "scope:test").expect("valid coord");
let kind = EventKind::custom(0xF, 1);
let proposal = Proposal::new(serde_json::json!({"bypassed": true}));
let bypass_receipt = Pipeline::<()>::bypass(proposal, &TestBypass);
let committed: Committed<serde_json::Value> =
Pipeline::<()>::commit_bypass(bypass_receipt, |p| -> Result<_, StoreError> {
let r = store.append(&coord, kind, &p)?;
CommitMetadata::from_append_receipt(&r)
})
.expect("commit_bypass");
let committed_event_id = committed.event_id();
let committed_audit = committed
.bypass_audit()
.expect("commit_bypass should retain bypass audit");
let stored = store
.get(batpak::id::EventId::from(committed_event_id))
.expect("get");
assert_eq!(
stored.event.event_kind(),
kind,
"PROPERTY: commit_bypass must persist the event through the store.\n\
Investigate: src/pipeline/mod.rs commit_bypass.\n\
Common causes: commit_fn not called, payload not forwarded.\n\
Run: cargo test --test store_advanced pipeline_commit_bypass_persists"
);
assert_eq!(
committed_audit.reason,
"test-bypass",
"PROPERTY: commit_bypass must retain the bypass audit reason alongside the persisted event."
);
store.close().expect("close");
}
#[test]
fn react_loop_spawns_and_processes() {
use batpak::event::sourcing::Reactive;
struct TestReactor;
impl Reactive<serde_json::Value> for TestReactor {
fn react(
&self,
event: &batpak::prelude::Event<serde_json::Value>,
) -> Vec<(Coordinate, EventKind, serde_json::Value)> {
if event.event_kind() == EventKind::custom(0xA, 1) {
vec![(
Coordinate::new("entity:reactions", "scope:test").expect("valid"),
EventKind::custom(0xA, 2),
serde_json::json!({"reacted_to": event.event_id().to_string()}),
)]
} else {
vec![]
}
}
}
let dir = TempDir::new().expect("create temp dir");
let config = StoreConfig::new(dir.path())
.with_segment_max_bytes(4096)
.with_sync_every_n_events(1);
let store = Arc::new(Store::open(config).expect("open store"));
let region = Region::entity("entity:trigger");
let _handle = store
.react_loop(®ion, TestReactor)
.expect("spawn reactor");
let coord = Coordinate::new("entity:trigger", "scope:test").expect("valid coord");
store
.append(
&coord,
EventKind::custom(0xA, 1),
&serde_json::json!({"trigger": true}),
)
.expect("append");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
let reactions = loop {
let r = store.query(&Region::entity("entity:reactions"));
if !r.is_empty() {
break r;
}
if std::time::Instant::now() >= deadline {
panic!(
"PROPERTY: react_loop must produce reaction events when the reactor emits them. \
Got nothing after 5s deadline. \
Investigate: src/store/mod.rs react_loop, src/event/sourcing.rs Reactive."
);
}
std::thread::yield_now();
};
assert_eq!(
reactions[0].event_kind(),
EventKind::custom(0xA, 2),
"PROPERTY: reaction event must have the kind returned by the reactor.\n\
Investigate: src/store/mod.rs react_loop.\n\
Run: cargo test --test store_advanced react_loop_spawns_and_processes"
);
store.sync().expect("sync");
}
struct OrderReactor;
impl batpak::event::Reactive<serde_json::Value> for OrderReactor {
fn react(
&self,
event: &Event<serde_json::Value>,
) -> Vec<(Coordinate, EventKind, serde_json::Value)> {
if event.event_kind() == EventKind::custom(0xA, 1) {
vec![(
Coordinate::new("order:reactions", "scope:test").expect("valid"),
EventKind::custom(0xA, 2),
serde_json::json!({"reacted_to": event.event_id().to_string()}),
)]
} else {
vec![]
}
}
}
#[test]
fn reactive_subscribe_react_append_pattern() {
let dir = TempDir::new().expect("temp dir");
let config = StoreConfig::new(dir.path()).with_sync_every_n_events(1);
let store = Arc::new(Store::open(config).expect("open"));
let coord = Coordinate::new("order:1", "scope:test").expect("valid");
let kind = EventKind::custom(0xA, 1);
let region = Region::all();
let sub = store.subscribe_lossy(®ion);
let store_w = Arc::clone(&store);
let coord_w = coord.clone();
let writer = std::thread::Builder::new()
.name("store-advanced-reactive-writer".into())
.spawn(move || {
store_w
.append(&coord_w, kind, &serde_json::json!({"item": "widget"}))
.expect("append root")
})
.expect("spawn reactive writer thread");
let root_receipt = writer.join().expect("writer thread");
let rx = sub.receiver();
let notif = rx
.recv_timeout(std::time::Duration::from_secs(2))
.expect("should receive notification");
let reactor = OrderReactor;
let header = EventHeader::new(
notif.event_id,
notif.correlation_id,
notif.causation_id,
0,
DagPosition::root(),
0,
notif.kind,
);
let event = Event::<serde_json::Value>::new(header, serde_json::Value::Null);
let reactions = reactor.react(&event);
assert_eq!(
reactions.len(),
1,
"PROPERTY: OrderReactor must produce exactly 1 reaction for a create_order event.\n\
Investigate: src/event/sourcing.rs Reactive trait react() method.\n\
Common causes: react() returning an empty vec because event_kind comparison \
fails, or EventKind::custom encoding mismatch between writer and reactor.\n\
Run: cargo test --test store_advanced reactive_subscribe_react_append_pattern"
);
for (react_coord, react_kind, react_payload) in reactions {
store
.append_reaction(
&react_coord,
react_kind,
&react_payload,
batpak::id::CorrelationId::from(u128::from(root_receipt.event_id)),
batpak::id::CausationId::from(u128::from(root_receipt.event_id)),
)
.expect("append reaction");
}
let stats = store.stats();
assert_eq!(
stats.event_count, 3,
"PROPERTY: After root event + 1 reaction, store must contain the lifecycle event plus those 2 user-visible events.\n\
Investigate: src/store/mod.rs Store::append_reaction() src/event/sourcing.rs.\n\
Common causes: append_reaction() not writing to the store, or stats.event_count \
not counting reaction events that go to a different coordinate.\n\
Run: cargo test --test store_advanced reactive_subscribe_react_append_pattern"
);
store.sync().expect("sync");
}