use super::*;
use tempfile::TempDir;
#[test]
fn next_active_segment_id_is_one_past_latest_existing_segment() -> Result<(), StoreError> {
let dir = TempDir::new()?;
crate::store::platform::fs::write_derivative_file_atomically(
dir.path(),
&dir.path().join(segment::segment_filename(1)),
"test segment",
b"",
)?;
crate::store::platform::fs::write_derivative_file_atomically(
dir.path(),
&dir.path().join(segment::segment_filename(7)),
"test segment",
b"",
)?;
assert_eq!(
next_active_segment_id(dir.path())?,
8,
"PROPERTY: reader active segment must be one past the highest existing segment so the last sealed segment remains mmap-eligible"
);
Ok(())
}
#[cfg(feature = "dangerous-test-hooks")]
#[test]
fn cooperative_open_spawns_zero_writer_threads_but_threaded_spawns_one() {
use crate::store::platform::spawn::{JobHandle, Spawn, ThreadSpawn};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct CountingSpawn {
count: Arc<AtomicUsize>,
inner: ThreadSpawn,
}
impl Spawn for CountingSpawn {
fn spawn(
&self,
name: String,
stack_size: Option<usize>,
body: Box<dyn FnOnce() + Send + 'static>,
) -> Result<Box<dyn JobHandle>, crate::store::platform::spawn::SpawnError> {
self.count.fetch_add(1, Ordering::Release);
self.inner.spawn(name, stack_size, body)
}
}
let threaded_count = Arc::new(AtomicUsize::new(0));
let threaded_dir = TempDir::new().expect("temp dir");
let threaded_spawner: Arc<dyn Spawn> = Arc::new(CountingSpawn {
count: Arc::clone(&threaded_count),
inner: ThreadSpawn,
});
let threaded =
Store::open(StoreConfig::new(threaded_dir.path()).with_spawner(threaded_spawner))
.expect("open threaded");
threaded.close().expect("close threaded");
assert!(
threaded_count.load(Ordering::Acquire) >= 1,
"PROPERTY: the THREADED writer must spawn at least one OS thread (proves the counter bites)"
);
let coop_count = Arc::new(AtomicUsize::new(0));
let coop_dir = TempDir::new().expect("temp dir");
let coop_spawner: Arc<dyn Spawn> = Arc::new(CountingSpawn {
count: Arc::clone(&coop_count),
inner: ThreadSpawn,
});
let coop =
Store::open_cooperative(StoreConfig::new(coop_dir.path()).with_spawner(coop_spawner))
.expect("open cooperative");
let coord = Coordinate::new("entity:coop-seam", "scope:test").expect("coord");
let _ = coop
.append(
&coord,
EventKind::custom(0xC, 0x0B),
&serde_json::json!({ "x": 1 }),
)
.expect("cooperative append drives inline");
coop.close().expect("close cooperative");
assert_eq!(
coop_count.load(Ordering::Acquire),
0,
"PROPERTY: the COOPERATIVE writer must spawn ZERO OS threads — it runs inline via the reply-funnel pump"
);
}
#[test]
fn highest_index_hlc_reports_non_origin_point_for_appended_entry() {
let dir = TempDir::new().expect("temp dir");
let store = Store::open(StoreConfig::new(dir.path())).expect("open store");
let coord = Coordinate::new("entity:highest-hlc", "scope:test").expect("coord");
let receipt = store
.append(
&coord,
EventKind::custom(0xF, 0x77),
&serde_json::json!({"x": 1}),
)
.expect("append");
let point = highest_index_hlc(&store.index);
assert_eq!(
point.global_sequence, receipt.global_sequence,
"PROPERTY: highest_index_hlc must observe the committed entry's global sequence"
);
assert!(
point > HlcPoint::ORIGIN,
"PROPERTY: highest_index_hlc must not collapse a non-empty index to origin/default"
);
store.close().expect("close");
}
#[test]
fn run_open_chain_verification_decides_crc_skip_recompute_pass_and_fail_closed(
) -> Result<(), StoreError> {
let dir = TempDir::new()?;
{
let store = Store::open(StoreConfig::new(dir.path()))?;
run_open_chain_verification(&store)?;
let _ = store.append(
&Coordinate::new("alice", "scope")?,
EventKind::custom(0xF, 0x012),
&serde_json::json!({ "n": 1 }),
)?;
store.close()?;
}
let store = Store::open(
StoreConfig::new(dir.path()).with_chain_verification(ChainVerification::Recompute),
)?;
run_open_chain_verification(&store)?;
store.close()?;
use crate::id::EventId;
assert!(
chain_verification_failure(&ChainVerificationReport::default()).is_none(),
"an intact report must not fail the open"
);
let tampered = ChainVerificationReport {
events_checked: 4,
content_hash_mismatches: vec![EventId::from_u128(1), EventId::from_u128(2)],
dangling_links: vec![EventId::from_u128(3)],
};
let failure = chain_verification_failure(&tampered);
assert!(
matches!(
failure,
Some(StoreError::ChainVerificationFailed {
content_hash_mismatches: 2,
dangling_links: 1,
})
),
"a non-intact report must fail closed with both integrity counts, got {failure:?}"
);
Ok(())
}
#[test]
fn last_close_hlc_reports_the_recorded_close_point_not_default_or_open() -> Result<(), StoreError> {
let dir = TempDir::new()?;
{
let store = Store::open(StoreConfig::new(dir.path()))?;
let _ = store.append(
&Coordinate::new("entity:lc", "scope:lc")?,
EventKind::custom(0xF, 0x33),
&serde_json::json!({ "n": 1 }),
)?;
store.close()?;
}
let store = Store::open(StoreConfig::new(dir.path()))?;
let expected = store
.index
.all_entries()
.into_iter()
.filter(|e| e.kind == EventKind::SYSTEM_CLOSE_COMPLETED)
.map(|e| HlcPoint {
wall_ms: e.wall_ms,
global_sequence: e.global_sequence,
})
.reduce(HlcPoint::max_by_sequence)
.expect("a close event must be present after reopen");
let got = last_close_hlc(&store.index)?;
assert_eq!(
got, expected,
"last_close_hlc must return the recorded SYSTEM_CLOSE_COMPLETED point"
);
assert!(got.global_sequence > 0, "the close point is not the origin");
let max_all = highest_index_hlc(&store.index);
assert!(
max_all.global_sequence > got.global_sequence,
"premise: a later OPEN event outranks the close, so a kind-filter flip is observable"
);
store.close()?;
Ok(())
}
#[test]
fn validate_bootstrap_hlc_rejects_open_below_either_floor() {
let open = HlcPoint {
wall_ms: 5,
global_sequence: 5,
};
let higher_recovered = HlcPoint {
wall_ms: 10,
global_sequence: 10,
};
let lower_close = HlcPoint {
wall_ms: 1,
global_sequence: 1,
};
let result = validate_bootstrap_hlc(open, higher_recovered, lower_close);
assert!(
matches!(
result,
Err(StoreError::InvariantViolation {
kind: StoreInvariant::BootstrapHlcOutOfOrder { .. }
})
),
"an open_hlc below max_recovered_hlc must fail even when above last_close_hlc, got {result:?}"
);
validate_bootstrap_hlc(higher_recovered, higher_recovered, lower_close)
.expect("an open at/above both floors must validate");
}
#[test]
fn bootstrap_open_hlc_carries_the_recovered_frontier_sequence() -> Result<(), StoreError> {
let dir = TempDir::new()?;
let store = Store::open(StoreConfig::new(dir.path()))?;
let _ = store.append(
&Coordinate::new("entity:boot", "scope:boot")?,
EventKind::custom(0xF, 0x44),
&serde_json::json!({ "n": 1 }),
)?;
let recovered = highest_index_hlc(&store.index);
assert!(
recovered.global_sequence > 0,
"premise: the store has committed events"
);
let open_hlc = bootstrap_open_hlc(&store.runtime, &store.index)?;
assert_eq!(
open_hlc.global_sequence, recovered.global_sequence,
"the bootstrap open point must carry the recovered frontier's sequence"
);
store.close()?;
Ok(())
}
#[cfg(feature = "payload-encryption")]
#[test]
fn payload_key_count_reflects_rehydrated_keyset_and_absence() -> Result<(), StoreError> {
use crate::coordinate::Coordinate;
use crate::id::EventId;
use crate::store::{scope_for, KeyScope, KeyScopeGranularity, KeyStore};
let gran = KeyScopeGranularity::PerEntity;
let dir = TempDir::new()?;
let scope = |entity: &str| -> KeyScope {
scope_for(
gran,
&Coordinate::new(entity, "scope:k").expect("coord"),
EventKind::custom(0xF, 1),
EventId::from(1u128),
)
};
let mut ks = KeyStore::new(gran);
let _ = ks.get_or_create(&scope("entity:a")).expect("mint a");
let _ = ks.get_or_create(&scope("entity:b")).expect("mint b");
ks.flush(dir.path()).expect("flush keyset");
let store = Store::open(StoreConfig::new(dir.path()).with_payload_encryption(gran))?;
assert_eq!(
store.payload_key_count(),
Some(2),
"an encrypted open must rehydrate the two flushed keys (not None, Some(0), or Some(1))"
);
store.close()?;
let plain = Store::open(StoreConfig::new(dir.path()))?;
assert_eq!(
plain.payload_key_count(),
None,
"a store opened without encryption holds no keyset"
);
plain.close()?;
Ok(())
}