use batpak::prelude::*;
use std::collections::HashSet;
#[test]
fn batch_append_reaction_batch() {
let tmp = tempfile::tempdir().expect("create temp dir for reaction batch test");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open store for reaction batch test");
let trigger_coord = Coordinate::new("user", "trigger").expect("valid trigger coordinate");
let trigger = store
.append(
&trigger_coord,
EventKind::DATA,
&serde_json::json!({"trigger": true}),
)
.expect("append trigger event for reaction batch");
let reaction_coord = Coordinate::new("user", "reactions").expect("valid reaction coordinate");
let items: Vec<BatchAppendItem> = (0..3)
.map(|i| {
BatchAppendItem::new(
reaction_coord.clone(),
EventKind::DATA,
&serde_json::json!({"reaction": i}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct reaction batch item")
})
.collect();
let correlation_id = trigger.event_id;
let causation_id = trigger.event_id;
let receipts = store
.append_reaction_batch(correlation_id, causation_id, items)
.expect("append reaction batch");
assert_eq!(receipts.len(), 3);
}
#[test]
fn batch_config_max_bytes() {
let tmp = tempfile::tempdir().expect("create temp dir for batch_max_bytes test");
let config = StoreConfig::new(tmp.path()).with_batch_max_bytes(1024 * 1024); let store = Store::open(config).expect("open store for batch_max_bytes test");
let coord = Coordinate::new("test", "bytes").expect("valid bytes coordinate");
let items: Vec<BatchAppendItem> = (0..10)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"index": i, "data": "x".repeat(100)}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct item under batch_max_bytes")
})
.collect();
let result = store.append_batch(items);
assert!(result.is_ok(), "batch under max_bytes should succeed");
}
#[test]
fn batch_empty_is_noop_and_store_remains_usable() {
let tmp = tempfile::tempdir().expect("create temp dir for empty-batch test");
let config = StoreConfig::new(tmp.path()).with_batch_max_size(4);
let store = Store::open(config).expect("open store for empty-batch test");
let items = vec![];
let result = store.append_batch(items);
assert!(
result.is_ok(),
"PROPERTY: an empty batch must succeed as a no-op (writer must \
tolerate zero items without panicking or returning an error). \
Investigate: src/store/writer.rs handle_append_batch validate_batch \
early-return for empty input."
);
let receipts = result.expect("empty batch ok");
assert!(
receipts.is_empty(),
"PROPERTY: an empty batch must return zero receipts, got {}",
receipts.len()
);
let receipt = store
.append(
&Coordinate::new("test", "atomicity").expect("valid atomicity coordinate"),
EventKind::DATA,
&serde_json::json!({"test": true}),
)
.expect("append post-empty-batch event");
assert!(
receipt.event_id != 0,
"PROPERTY: after an empty batch, the next append must succeed \
and produce a non-zero event_id (the writer must not be in a \
broken state). Got event_id = 0."
);
let visible_count = store.cursor_guaranteed(&Region::all()).poll_batch(10).len();
assert_eq!(
visible_count, 1,
"PROPERTY: after empty batch + one append, exactly 1 event must \
be visible. Got {visible_count}. The empty batch must not have \
exposed any phantom entries."
);
}
#[test]
fn batch_oversized_item_no_partial_visibility() {
let tmp = tempfile::tempdir().expect("create temp dir");
let config = StoreConfig::new(tmp.path())
.with_batch_max_bytes(2 * 1024)
.with_batch_max_size(8);
let store = Store::open(config).expect("open store");
let coord = Coordinate::new("entity:atomic", "scope:test").expect("valid coord");
let mut items: Vec<BatchAppendItem> = (0..3)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"i": i, "small": true}),
AppendOptions::default(),
CausationRef::None,
)
.expect("small item builds")
})
.collect();
let oversized_payload = serde_json::json!({"big": "x".repeat(4 * 1024)});
items.push(
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&oversized_payload,
AppendOptions::default(),
CausationRef::None,
)
.expect("item builder doesn't enforce batch byte cap; writer does"),
);
let result = store.append_batch(items);
assert!(
matches!(result, Err(StoreError::BatchFailed { .. })),
"PROPERTY: a batch whose total bytes exceed batch_max_bytes must \
fail with BatchFailed; got {result:?}. \
Investigate: src/store/writer.rs validate_batch byte-cap branch \
and StoreError::BatchFailed mapping for the Validating stage."
);
let visible_count = store
.cursor_guaranteed(&Region::all())
.poll_batch(100)
.len();
assert_eq!(
visible_count, 0,
"PROPERTY: BATCH ATOMICITY VIOLATION — a batch that failed during \
validation must not expose ANY of its items to readers. Found \
{visible_count} visible events; expected 0. \
Investigate: src/store/writer.rs handle_append_batch must validate \
BEFORE reserving sequences and writing frames, OR must roll back \
all visibility on failure. src/store/index.rs publish() must not \
have advanced the watermark."
);
let post_failure = store
.append(
&coord,
EventKind::DATA,
&serde_json::json!({"recovery": true}),
)
.expect("store usable after failed batch");
assert_eq!(
post_failure.sequence, 0,
"PROPERTY: the first event after a failed batch must occupy \
sequence 0 — the failed batch must not have burned any sequence \
slots that would shift the next append's sequence. Got sequence \
{}. Investigate: src/store/writer.rs validate_batch ordering \
relative to reserve_sequences.",
post_failure.sequence
);
}
#[test]
fn batch_atomicity_full_visibility_on_success() {
let tmp = tempfile::tempdir().expect("create temp dir for full-visibility test");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open store for full-visibility test");
let coord = Coordinate::new("user", "profile").expect("valid profile coordinate");
let items: Vec<BatchAppendItem> = (0..5)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"index": i}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct full-visibility batch item")
})
.collect();
let receipts = store
.append_batch(items)
.expect("append full-visibility batch");
assert_eq!(receipts.len(), 5);
let mut cursor = store.cursor_guaranteed(&Region::all());
let mut found = HashSet::new();
for entry in cursor.poll_batch(10) {
found.insert(entry.event_id);
}
for receipt in &receipts {
assert!(
found.contains(&receipt.event_id),
"event {} should be visible",
receipt.event_id
);
}
}
#[test]
fn batch_marker_invisible() {
let tmp = tempfile::tempdir().expect("create temp dir for marker invisibility test");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open store for marker invisibility test");
let coord = Coordinate::new("test", "marker").expect("valid marker coordinate");
let items = vec![BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"data": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct marker invisibility item")];
store
.append_batch(items)
.expect("append batch with invisible marker envelope");
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].kind, EventKind::DATA);
}
#[test]
fn batch_intra_batch_causation() {
let tmp = tempfile::tempdir().expect("create temp dir for intra-batch causation test");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open store for intra-batch causation test");
let coord = Coordinate::new("chain", "test").expect("valid chain coordinate");
let item1 = BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct first causation item");
let item2 = BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 2}),
AppendOptions::default(),
CausationRef::PriorItem(0),
)
.expect("construct second causation item");
let receipts = store
.append_batch(vec![item1, item2])
.expect("append causation-linked batch");
assert_eq!(receipts.len(), 2);
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(entries.len(), 2);
let first_id = entries[0].event_id;
let second_causation = entries[1].causation_id;
assert_eq!(second_causation, Some(first_id));
}
#[test]
fn batch_size_limits() {
let tmp = tempfile::tempdir().expect("create temp dir for batch size limit test");
let config = StoreConfig::new(tmp.path()).with_batch_max_size(2);
let store = Store::open(config).expect("open store for batch size limit test");
let coord = Coordinate::new("limit", "test").expect("valid limit coordinate");
let items: Vec<_> = (0..3)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"i": i}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct oversized batch item")
})
.collect();
let result = store.append_batch(items);
let err = result.expect_err(
"PROPERTY: a batch exceeding batch_max_size must fail. \
Investigate: src/store/writer.rs validate_batch size check.",
);
assert!(
matches!(
err,
StoreError::BatchFailed {
stage: BatchStage::Validation,
..
}
),
"PROPERTY: batch size violation must be reported as \
BatchFailed{{stage: Validation, ..}}, got {err:?}"
);
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn batch_restart_recovery_discards_incomplete_after_begin() {
use batpak::store::CountdownInjector;
let tmp = tempfile::tempdir().expect("create temp dir for after-begin recovery test");
let coord = Coordinate::new("crash", "test").expect("valid crash-test coordinate");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open baseline store for after-begin recovery");
let _receipt1 = store
.append(&coord, EventKind::DATA, &serde_json::json!({"seq": 1}))
.expect("append baseline event before injected fault");
drop(store);
let mut config = StoreConfig::new(tmp.path());
config.fault_injector = Some(std::sync::Arc::new(CountdownInjector::after_batch_begin()));
let store = Store::open(config).expect("open fault-injected store after begin");
let items = vec![
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 2}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct second event in after-begin recovery batch"),
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 3}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct third event in after-begin recovery batch"),
];
let result = store.append_batch(items);
let err = result.expect_err(
"PROPERTY: fault injection at BatchBeginWritten must propagate as a \
BatchFailed or FaultInjected error.",
);
assert!(
matches!(err, StoreError::BatchFailed { .. })
|| matches!(err, StoreError::FaultInjected(_)),
"PROPERTY: BatchBeginWritten fault must surface as BatchFailed or \
FaultInjected, got {err:?}"
);
drop(store);
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("reopen store after begin-fault recovery");
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(
entries.len(),
1,
"incomplete batch should be discarded on recovery"
);
assert_eq!(entries[0].kind, EventKind::DATA);
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn batch_restart_recovery_discards_incomplete_mid_items() {
use batpak::store::CountdownInjector;
let tmp = tempfile::tempdir().expect("create temp dir for mid-items recovery test");
let coord = Coordinate::new("crash", "test").expect("valid crash-test coordinate");
let mut config = StoreConfig::new(tmp.path());
config.fault_injector = Some(std::sync::Arc::new(CountdownInjector::after_batch_items(1)));
let store = Store::open(config).expect("open fault-injected store mid-items");
let items = vec![
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct first item in mid-items fault batch"),
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 2}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct second item in mid-items fault batch"),
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 3}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct third item in mid-items fault batch"),
];
let result = store.append_batch(items);
let err = result.expect_err(
"PROPERTY: fault injection mid-batch must propagate as a BatchFailed \
or FaultInjected error.",
);
assert!(
matches!(err, StoreError::BatchFailed { .. })
|| matches!(err, StoreError::FaultInjected(_)),
"PROPERTY: mid-batch fault must surface as BatchFailed or \
FaultInjected, got {err:?}"
);
drop(store);
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("reopen store after mid-items fault recovery");
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(entries.len(), 0, "partial batch items should be discarded");
}
#[test]
fn batch_both_markers_invisible() {
let tmp = tempfile::tempdir().expect("create temp dir for marker invisibility pair test");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open store for both-markers invisible test");
let coord = Coordinate::new("test", "markers").expect("valid markers coordinate");
let items = vec![BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"data": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct marker-pair invisibility item")];
store
.append_batch(items)
.expect("append batch for both-markers invisible test");
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].kind, EventKind::DATA);
for entry in &entries {
assert!(
!entry.kind.is_system(),
"system events should not be visible"
);
}
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn batch_fsync_ambiguity_discards_uncommitted() {
use batpak::store::{CountdownAction, CountdownInjector, InjectionPoint, SyncMode};
let tmp = tempfile::tempdir().expect("create temp dir for fsync ambiguity test");
let coord = Coordinate::new("fsync", "test").expect("valid fsync test coordinate");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open baseline store for fsync ambiguity test");
let _receipt = store
.append(&coord, EventKind::DATA, &serde_json::json!({"pre": 1}))
.expect("append pre-established event for fsync ambiguity test");
drop(store);
let mut config = StoreConfig::new(tmp.path());
config.sync.mode = SyncMode::SyncAll; config.fault_injector = Some(std::sync::Arc::new(
CountdownInjector::new(
1,
CountdownAction::Fail("simulated power loss during fsync"),
)
.with_filter(|p| matches!(p, InjectionPoint::BatchFsync { .. })),
));
let store = Store::open(config).expect("open fault-injected store for fsync ambiguity");
let items = vec![
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"batch": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct first fsync ambiguity batch item"),
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"batch": 2}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct second fsync ambiguity batch item"),
];
let result = store.append_batch(items);
let err = result.expect_err(
"PROPERTY: a fault injected during BatchFsync must propagate as an \
error. Investigate: src/store/writer.rs handle_append_batch fsync \
site fault injection point.",
);
assert!(
matches!(
err,
StoreError::BatchFailed {
stage: BatchStage::Syncing,
..
}
) || matches!(err, StoreError::FaultInjected(_)),
"PROPERTY: BatchFsync fault must surface as BatchFailed{{stage: \
Syncing}} or FaultInjected, got {err:?}"
);
drop(store);
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("reopen store after fsync ambiguity fault");
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(
entries.len(),
1,
"un-fsynced batch must be discarded per fsync ambiguity rule"
);
assert_eq!(
store
.get(entries[0].event_id)
.expect("load recovered pre-established event after fsync ambiguity")
.event
.payload["pre"],
serde_json::json!(1)
);
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn batch_recovery_system_remains_coherent() {
use batpak::store::CountdownInjector;
let tmp = tempfile::tempdir().expect("create temp dir for recovery coherence test");
let coord_a = Coordinate::new("entity_a", "scope").expect("valid entity_a coordinate");
let coord_b = Coordinate::new("entity_b", "scope").expect("valid entity_b coordinate");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open baseline store for recovery coherence test");
let receipt_a1 = store
.append(&coord_a, EventKind::DATA, &serde_json::json!({"seq": 1}))
.expect("append baseline entity_a event for recovery coherence test");
drop(store);
let mut config = StoreConfig::new(tmp.path());
config.fault_injector = Some(std::sync::Arc::new(CountdownInjector::after_batch_items(1)));
let store = Store::open(config).expect("open fault-injected store for recovery coherence");
let items = vec![
BatchAppendItem::new(
coord_a.clone(),
EventKind::DATA,
&serde_json::json!({"seq": 2}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct faulted entity_a batch item"),
BatchAppendItem::new(
coord_b.clone(),
EventKind::DATA,
&serde_json::json!({"other": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct faulted entity_b batch item"),
];
let _ = store.append_batch(items);
drop(store);
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("reopen store for recovery coherence verification");
let mut cursor_a = store.cursor_guaranteed(&Region::entity(coord_a.entity()));
let entries_a = cursor_a.poll_batch(10);
assert_eq!(
entries_a.len(),
1,
"entity_a should have only committed event"
);
assert_eq!(entries_a[0].global_sequence, receipt_a1.sequence);
let mut cursor_b = store.cursor_guaranteed(&Region::entity(coord_b.entity()));
let entries_b = cursor_b.poll_batch(10);
assert!(
entries_b.is_empty(),
"entity_b should have no events (batch discarded)"
);
let receipt_new = store
.append(&coord_a, EventKind::DATA, &serde_json::json!({"seq": 3}))
.expect("append post-recovery entity_a event");
assert_eq!(
receipt_new.sequence,
receipt_a1.sequence + 1,
"sequence should continue"
);
let batch_items = vec![
BatchAppendItem::new(
coord_b.clone(),
EventKind::DATA,
&serde_json::json!({"new": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct first post-recovery entity_b batch item"),
BatchAppendItem::new(
coord_b.clone(),
EventKind::DATA,
&serde_json::json!({"new": 2}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct second post-recovery entity_b batch item"),
];
let batch_receipts = store
.append_batch(batch_items)
.expect("append post-recovery entity_b batch");
assert_eq!(batch_receipts.len(), 2);
let mut cursor_all = store.cursor_guaranteed(&Region::all());
let all_entries = cursor_all.poll_batch(10);
assert_eq!(all_entries.len(), 4, "should have all committed events");
for entry in &all_entries {
if entry.clock > 0 {
let mut entity_cursor = store.cursor_guaranteed(&Region::entity(entry.coord.entity()));
let entity_entries = entity_cursor.poll_batch(10);
for (i, e) in entity_entries.iter().enumerate() {
assert_eq!(e.clock as usize, i, "entity clock should be contiguous");
}
}
}
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn batch_subscription_atomicity_no_partial_visibility() {
use batpak::store::CountdownInjector;
let tmp = tempfile::tempdir().expect("create temp dir for subscription atomicity test");
let coord = Coordinate::new("sub", "test").expect("valid subscription test coordinate");
fn drain(sub: &batpak::store::Subscription) -> usize {
let mut count = 0;
while sub.receiver().try_recv().is_ok() {
count += 1;
}
count
}
let mut config = StoreConfig::new(tmp.path());
let store =
Store::open(config.clone()).expect("open baseline store for subscription atomicity");
let sub = store.subscribe_lossy(&Region::all());
store
.append(&coord, EventKind::DATA, &serde_json::json!({"pre": 1}))
.expect("append pre-crash subscription event");
let pre_crash_count = drain(&sub);
assert_eq!(
pre_crash_count, 1,
"PROPERTY: a successful append must produce exactly one subscriber \
notification, drainable immediately. Got {pre_crash_count}. \
Investigate: src/store/writer.rs handle_append broadcast site, \
and ensure append() blocks until AFTER the broadcast."
);
drop(store);
config.fault_injector = Some(std::sync::Arc::new(CountdownInjector::after_batch_items(1)));
let store = Store::open(config).expect("open fault-injected store for subscription atomicity");
let sub = store.subscribe_lossy(&Region::all());
let items = vec![
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"batch": 1}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct first subscription atomicity batch item"),
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"batch": 2}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct second subscription atomicity batch item"),
];
let result = store.append_batch(items);
let _err = result.expect_err(
"PROPERTY: batch with after_batch_items(1) fault must fail. If this \
passes, fault injection is silently swallowed.",
);
let notifications_received = drain(&sub);
drop(store);
assert_eq!(
notifications_received, 0,
"PROPERTY: BATCH SUBSCRIPTION ATOMICITY VIOLATION — a faulted batch \
must produce ZERO subscriber notifications. Got {notifications_received}. \
The writer must broadcast notifications only AFTER the atomic publish, \
and the publish must never happen for a faulted batch. \
Investigate: src/store/writer.rs handle_append_batch ordering of \
publish() and broadcast_batch_notifications()."
);
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("reopen store after subscription atomicity fault");
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(entries.len(), 1, "only pre-established event visible");
assert_eq!(
store
.get(entries[0].event_id)
.expect("load recovered pre-established subscription event")
.event
.payload["pre"],
serde_json::json!(1)
);
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn fault_injector_after_commit_before_fsync() {
use batpak::store::{CountdownInjector, FaultInjector, InjectionPoint};
let injector = CountdownInjector::after_commit_before_fsync();
let commit_point = InjectionPoint::BatchCommitWritten { batch_id: 1 };
assert!(injector.check(commit_point).is_some());
let begin_point = InjectionPoint::BatchBeginWritten {
batch_id: 1,
item_count: 5,
};
assert!(injector.check(begin_point).is_none());
let items_point = InjectionPoint::BatchItemWritten {
batch_id: 1,
item_index: 0,
total_items: 5,
};
assert!(injector.check(items_point).is_none());
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn batch_cross_segment_fault_recovery() {
use batpak::store::{CountdownAction, CountdownInjector, InjectionPoint};
let tmp = tempfile::tempdir().expect("create temp dir for cross-segment recovery test");
let coord = Coordinate::new("cross", "seg").expect("valid cross-segment coordinate");
let mut config = StoreConfig::new(tmp.path());
config.segment_max_bytes = 1024;
let store = Store::open(config).expect("open baseline store for cross-segment recovery");
let large_payload = serde_json::json!({"data": "x".repeat(400) });
let _ = store
.append(&coord, EventKind::DATA, &large_payload)
.expect("append baseline large payload before cross-segment fault");
drop(store);
let mut config = StoreConfig::new(tmp.path());
config.segment_max_bytes = 1024;
config.fault_injector = Some(std::sync::Arc::new(
CountdownInjector::new(1, CountdownAction::Fail("crash at segment rotation"))
.with_filter(|p| matches!(p, InjectionPoint::BatchItemWritten { item_index: 2, .. })),
));
let store = Store::open(config).expect("open fault-injected store for cross-segment recovery");
let items: Vec<_> = (0..5)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"item": i, "pad": "y".repeat(300)}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct cross-segment batch item")
})
.collect();
let _ = store.append_batch(items);
drop(store);
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("reopen store after cross-segment fault recovery");
let mut cursor = store.cursor_guaranteed(&Region::all());
let entries = cursor.poll_batch(10);
assert_eq!(
entries.len(),
1,
"cross-segment partial batch should be fully discarded"
);
let new_receipt = store
.append(
&coord,
EventKind::DATA,
&serde_json::json!({"after": "recovery"}),
)
.expect("append event after cross-segment recovery");
assert!(
new_receipt.sequence > 0,
"new appends should work after cross-segment recovery"
);
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn batch_publish_atomicity_no_partial_read_during_insert() {
use batpak::store::{CountdownAction, CountdownInjector, InjectionPoint};
use std::sync::Arc;
let tmp = tempfile::tempdir().expect("create temp dir for publish atomicity test");
let coord = Coordinate::new("batch_vis", "test").expect("valid coordinate");
let config = StoreConfig::new(tmp.path());
let store = Store::open(config).expect("open baseline store");
let pre = store
.append(
&coord,
EventKind::DATA,
&serde_json::json!({"baseline": true}),
)
.expect("append baseline event");
drop(store);
let mut config = StoreConfig::new(tmp.path());
config.fault_injector = Some(Arc::new(
CountdownInjector::new(1, CountdownAction::Fail("halt before publish"))
.with_filter(|p| matches!(p, InjectionPoint::BatchPrePublish { .. })),
));
let store = Arc::new(Store::open(config).expect("open fault-injected store"));
let batch_size = 10usize;
let items: Vec<_> = (0..batch_size)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"batch_item": i}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct batch item")
})
.collect();
let result = store.append_batch(items);
let err = result.expect_err(
"PROPERTY: a batch with a BatchPrePublish fault injection must fail. \
If this passes, fault injection is being silently swallowed.",
);
assert!(
matches!(err, StoreError::BatchFailed { .. })
|| matches!(err, StoreError::FaultInjected(_)),
"PROPERTY: BatchPrePublish fault must surface as BatchFailed or \
FaultInjected, got {err:?}"
);
let region = Region::entity("batch_vis");
let entries = store.query(®ion);
assert_eq!(
entries.len(),
1,
"PROPERTY: after BatchPrePublish fault, readers must see 0 batch entries.\n\
Expected only the baseline event (id={}), but got {} entries.\n\
Investigate: src/store/index.rs read methods must filter by visible_sequence.\n\
Common causes: read method missing visibility filter, publish() called before fault point.",
pre.event_id,
entries.len(),
);
assert_eq!(
entries[0].event_id, pre.event_id,
"the single visible entry must be the pre-batch baseline event"
);
}
#[test]
fn batch_publish_atomicity_concurrent_reader_sees_zero_or_all() {
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering as MemOrd};
use std::sync::Arc;
use std::thread;
let tmp = tempfile::tempdir().expect("create temp dir for concurrent atomicity test");
let coord = Coordinate::new("concurrent_atom", "scope").expect("valid coordinate");
let config = StoreConfig::new(tmp.path());
let store = Arc::new(Store::open(config).expect("open store"));
let pre_count: usize = 3;
for i in 0..pre_count {
store
.append(&coord, EventKind::DATA, &serde_json::json!({"pre": i}))
.expect("append baseline event");
}
let stop = Arc::new(AtomicBool::new(false));
let region = Region::entity("concurrent_atom");
let r_store = Arc::clone(&store);
let r_stop = Arc::clone(&stop);
let r_region = region.clone();
let reader = thread::Builder::new()
.name("atomic-batch-reader".into())
.spawn(move || {
let mut observations: HashSet<usize> = HashSet::new();
while !r_stop.load(MemOrd::Acquire) {
let count = r_store.query(&r_region).len();
observations.insert(count);
}
observations.insert(r_store.query(&r_region).len());
observations
})
.expect("spawn reader thread");
let batch_size: usize = 7;
let num_batches: usize = 50;
for _ in 0..num_batches {
let items: Vec<BatchAppendItem> = (0..batch_size)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"batch_item": i}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct batch item")
})
.collect();
store.append_batch(items).expect("batch append");
}
stop.store(true, MemOrd::Release);
let observed = reader.join().expect("reader thread joined cleanly");
let allowed: HashSet<usize> = (0..=num_batches)
.map(|k| pre_count + k * batch_size)
.collect();
let bad: Vec<usize> = observed.difference(&allowed).copied().collect();
assert!(
bad.is_empty(),
"PROPERTY: reader must only ever observe pre_count + k * batch_size.\n\
Observed counts not in the allowed set: {bad:?}\n\
Allowed set: {allowed:?}\n\
All observed: {observed:?}\n\
A partial batch was visible — INV-BATCH-ATOMIC-VISIBILITY violated.\n\
Investigate: src/store/index.rs SequenceGate visibility filter +\n\
src/store/writer.rs handle_append_batch publish ordering.",
);
assert!(
observed.contains(&pre_count),
"expected to observe at least the pre-batch baseline count {pre_count}, observed {observed:?}",
);
let terminal = pre_count + num_batches * batch_size;
assert!(
observed.contains(&terminal),
"expected to observe the terminal count {terminal}, observed {observed:?}",
);
}
#[cfg(feature = "blake3")]
#[test]
fn batch_multi_item_same_entity_hash_chain_is_continuous() {
let tmp = tempfile::tempdir().expect("create temp dir for hash chain regression");
let store = Store::open(StoreConfig::new(tmp.path())).expect("open store");
let coord = Coordinate::new("regress", "hashchain").expect("valid coord");
let items: Vec<BatchAppendItem> = (0..3)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"step": i, "nonce": format!("regress-{i}")}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct batch item")
})
.collect();
let receipts = store.append_batch(items).expect("batch append");
assert_eq!(receipts.len(), 3, "all three items must be committed");
let entries = store.query(&Region::entity("regress"));
assert_eq!(entries.len(), 3, "query must surface all three batch items");
let mut entries = entries;
entries.sort_by_key(|e| e.clock);
let h0 = entries[0].hash_chain.event_hash;
let h1 = entries[1].hash_chain.event_hash;
let h2 = entries[2].hash_chain.event_hash;
assert_ne!(
h0, h1,
"PROPERTY: distinct payloads must produce distinct event_hash values \
in the in-memory IndexEntry. Buggy stage_batch_index_entries collapsed \
every same-entity entry's event_hash to the LAST item's hash via the \
shared entity_prev_hashes map."
);
assert_ne!(h1, h2, "second pair of event_hash values must be distinct");
assert_ne!(h0, h2, "first/third event_hash values must be distinct");
assert_ne!(
h0, [0u8; 32],
"blake3 of a non-empty payload must be non-zero"
);
assert_eq!(
entries[1].hash_chain.prev_hash, h0,
"PROPERTY: items[1].prev_hash MUST equal items[0].event_hash. \
Buggy precompute_batch_items inserted [0; 32] into entity_prev_hashes \
before the real hash was computed, so this assertion would fail with \
actual = [0; 32]."
);
assert_eq!(
entries[2].hash_chain.prev_hash, h1,
"PROPERTY: items[2].prev_hash MUST equal items[1].event_hash. Same bug."
);
assert_eq!(
entries[0].hash_chain.prev_hash, [0u8; 32],
"items[0] is the genesis for the entity, so prev_hash is the all-zero \
sentinel. (Entity has no prior history in this test.)"
);
let walked = store.walk_ancestors(receipts[2].event_id, 8);
let walked_ids: Vec<u128> = walked.iter().map(|s| s.event.event_id()).collect();
let expected: Vec<u128> = vec![
receipts[2].event_id,
receipts[1].event_id,
receipts[0].event_id,
];
assert_eq!(
walked_ids, expected,
"PROPERTY: walk_ancestors from the last batch item must yield all \
three items in reverse insertion order. Buggy hash chain breaks the \
traversal at the [0; 32] terminator after step 1."
);
}
#[test]
fn batch_survives_unclean_shutdown_without_sidx_footer() {
let tmp = tempfile::tempdir().expect("create temp dir for unclean-shutdown regression");
let data_dir = tmp.path().to_path_buf();
let coord = Coordinate::new("regress", "no-sidx").expect("valid coord");
{
let store = Store::open(StoreConfig::new(&data_dir)).expect("open store");
let items: Vec<BatchAppendItem> = (0..3)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"step": i}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct item")
})
.collect();
let receipts = store
.append_batch(items)
.expect("batch append must succeed");
assert_eq!(receipts.len(), 3, "baseline: all 3 items committed");
store.close().expect("clean close");
}
let _ = std::fs::remove_file(data_dir.join("index.ckpt"));
let entries: Vec<_> = std::fs::read_dir(&data_dir)
.expect("read data_dir")
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().map(|x| x == "fbat").unwrap_or(false))
.collect();
assert_eq!(
entries.len(),
1,
"exactly one segment file expected before truncation"
);
let seg_path = entries[0].path();
let bytes = std::fs::read(&seg_path).expect("read segment file");
assert!(
bytes.len() >= 16,
"segment file must be at least 16 bytes (SIDX trailer length)"
);
let trailer = &bytes[bytes.len() - 16..];
assert_eq!(
&trailer[12..16],
b"SIDX",
"clean close must have written the SIDX footer (sanity check before truncation)"
);
let string_table_offset = u64::from_le_bytes(
trailer[0..8]
.try_into()
.expect("SIDX trailer offset is exactly 8 bytes"),
);
std::fs::write(
&seg_path,
&bytes[..usize::try_from(string_table_offset).expect("offset fits in usize")],
)
.expect("truncate SIDX footer off segment");
let store = Store::open(StoreConfig::new(&data_dir)).expect("reopen after truncation");
let recovered = store.query(&Region::entity("regress"));
assert_eq!(
recovered.len(),
3,
"PROPERTY: a durably-committed batch (BEGIN+frames+COMMIT all on disk) \
must survive an unclean shutdown that stripped the SIDX footer. The \
old reader.rs:707 discard branch silently dropped all 3 entries when \
has_sidx_footer == false, violating [INV-BATCH-ATOMIC-VISIBILITY]."
);
let mut steps: Vec<i64> = recovered
.iter()
.filter_map(|e| {
store
.get(e.event_id)
.ok()
.and_then(|stored| stored.event.payload["step"].as_i64())
})
.collect();
steps.sort();
assert_eq!(
steps,
vec![0, 1, 2],
"recovered batch payloads must round-trip exactly"
);
}
#[test]
fn batch_wall_ms_monotonic_under_regressing_clock() {
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
let tick = Arc::new(AtomicI64::new(2_000_000_000_000)); let clock_tick = Arc::clone(&tick);
let clock: Arc<dyn Fn() -> i64 + Send + Sync> =
Arc::new(move || clock_tick.fetch_sub(10_000, Ordering::SeqCst));
let tmp = tempfile::tempdir().expect("create temp dir for wall_ms regression");
let store = Store::open(StoreConfig::new(tmp.path()).with_clock(Some(clock)))
.expect("open store with regressing clock");
let coord = Coordinate::new("regress", "wallms").expect("valid coord");
let pre = store
.append(&coord, EventKind::DATA, &serde_json::json!({"pre": true}))
.expect("pre-establish single event");
let pre_entry = store
.get(pre.event_id)
.expect("load pre-established event")
.event;
let pre_wall_ms = pre_entry.header.position.wall_ms;
let items: Vec<BatchAppendItem> = (0..3)
.map(|i| {
BatchAppendItem::new(
coord.clone(),
EventKind::DATA,
&serde_json::json!({"batch_step": i}),
AppendOptions::default(),
CausationRef::None,
)
.expect("construct batch item")
})
.collect();
store
.append_batch(items)
.expect("batch append must succeed");
let mut entries = store.query(&Region::entity("regress"));
entries.sort_by_key(|e| e.clock);
assert_eq!(entries.len(), 4, "1 single + 3 batch items expected");
for (idx, entry) in entries.iter().enumerate() {
assert!(
entry.wall_ms >= pre_wall_ms,
"PROPERTY: batch item {idx} wall_ms ({}) must NOT regress below \
the entity's prior wall_ms ({pre_wall_ms}). Buggy precompute \
never applied raw_ms.max(last_ms) for batches, so a regressing \
clock would write wall_ms < pre_wall_ms and reorder stream() \
results.",
entry.wall_ms
);
}
let mut sequences: Vec<u64> = entries.iter().map(|e| e.global_sequence).collect();
let sorted_sequences = {
let mut s = sequences.clone();
s.sort();
s
};
sequences.sort_by_key(|_| 0); assert_eq!(
sequences, sorted_sequences,
"PROPERTY: stream-order (clock) and append-order (global_sequence) \
must agree per entity. A wall_ms regression in a batch breaks this \
invariant by inserting batch items at a lower BTreeMap key."
);
}