use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use selene_core::{Change, DbString, GraphId, HlcTimestamp, LabelSet, PropertyMap};
use crate::committer_batch::CommitBatching;
use crate::durable_provider::DurableProvider;
use crate::error::GraphError;
use crate::index_provider::{IndexProvider, ProviderError, ProviderTag, SubTag};
use crate::{SeleneGraph, SharedGraph};
fn db_string(value: &str) -> DbString {
selene_core::db_string(value).expect("test string fits DB string cap")
}
fn temp_dir(name: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let dir = std::env::temp_dir().join(format!(
"selene-brief2-{name}-{}-{nanos}",
std::process::id()
));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
dir
}
fn on(max_commits: usize, max_bytes: u64) -> CommitBatching {
CommitBatching::On {
max_commits: NonZeroUsize::new(max_commits).unwrap(),
max_bytes,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DurableEvent {
Write(u64),
Flush(u64),
}
struct CountingDurable {
tag: ProviderTag,
seq: AtomicU64,
fail_write_on_call: usize,
fail_flush: AtomicBool,
events: Mutex<Vec<DurableEvent>>,
}
impl CountingDurable {
fn new(tag: &[u8; 4]) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
seq: AtomicU64::new(0),
fail_write_on_call: 0,
fail_flush: AtomicBool::new(false),
events: Mutex::new(Vec::new()),
})
}
fn fail_write_on(tag: &[u8; 4], call: usize) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
seq: AtomicU64::new(0),
fail_write_on_call: call,
fail_flush: AtomicBool::new(false),
events: Mutex::new(Vec::new()),
})
}
fn fail_flush(tag: &[u8; 4]) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
seq: AtomicU64::new(0),
fail_write_on_call: 0,
fail_flush: AtomicBool::new(true),
events: Mutex::new(Vec::new()),
})
}
fn events(&self) -> Vec<DurableEvent> {
self.events.lock().unwrap().clone()
}
fn write_count(&self) -> usize {
self.events()
.iter()
.filter(|event| matches!(event, DurableEvent::Write(_)))
.count()
}
fn flush_count(&self) -> usize {
self.events()
.iter()
.filter(|event| matches!(event, DurableEvent::Flush(_)))
.count()
}
fn max_batch_size(&self) -> usize {
let mut max = 0usize;
let mut run = 0usize;
for event in self.events() {
match event {
DurableEvent::Write(_) => {
run += 1;
max = max.max(run);
}
DurableEvent::Flush(_) => run = 0,
}
}
max
}
}
impl DurableProvider for CountingDurable {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn write_commit(
&self,
_principal: Option<&Arc<[u8]>>,
_changes: &[Change],
_timestamp: HlcTimestamp,
) -> Result<u64, ProviderError> {
let write_calls = self.write_count() + 1;
if self.fail_write_on_call != 0 && write_calls == self.fail_write_on_call {
return Err(ProviderError::Inconsistent {
reason: "synthetic write_commit failure".to_owned(),
});
}
let seq = self.seq.fetch_add(1, Ordering::SeqCst) + 1;
self.events.lock().unwrap().push(DurableEvent::Write(seq));
Ok(seq)
}
fn flush(&self) -> Result<Option<u64>, ProviderError> {
if self.fail_flush.load(Ordering::SeqCst) {
return Err(ProviderError::Inconsistent {
reason: "synthetic flush failure".to_owned(),
});
}
let high = self.seq.load(Ordering::SeqCst);
self.events.lock().unwrap().push(DurableEvent::Flush(high));
Ok(Some(high))
}
}
fn graph_with_durable(
id: u64,
durable: Arc<dyn DurableProvider>,
batching: CommitBatching,
) -> SharedGraph {
SharedGraph::from_graph_with_core_and_durables(
SeleneGraph::new(GraphId::new(id)),
Vec::new(),
vec![durable],
None,
None,
batching,
)
.expect("graph builds with synthetic durable provider")
}
fn graph_with_durable_and_provider(
id: u64,
durable: Arc<dyn DurableProvider>,
provider: Arc<dyn IndexProvider>,
batching: CommitBatching,
) -> SharedGraph {
SharedGraph::from_graph_with_core_and_durables(
SeleneGraph::new(GraphId::new(id)),
vec![provider],
vec![durable],
None,
None,
batching,
)
.expect("graph builds with synthetic durable + provider")
}
#[test]
fn t1_off_equals_brief1_fsync_count() {
let durable = CountingDurable::new(b"CNT1");
let shared = graph_with_durable(70_001, durable.clone(), CommitBatching::Off);
for idx in 0..3 {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string("L")), PropertyMap::new())
.unwrap();
let outcome = txn.commit().expect("commit ok");
assert_eq!(outcome.generation, idx + 1);
assert_eq!(outcome.durable_at, Some(idx + 1));
}
shared
.create_property_index(db_string("L"), db_string("k"), crate::TypedIndexKind::I64)
.expect("index create ok");
let events = durable.events();
assert_eq!(durable.write_count(), 4, "events: {events:?}");
assert_eq!(
durable.flush_count(),
durable.write_count(),
"OFF: exactly one flush per commit (events: {events:?})",
);
for pair in events.chunks(2) {
assert!(
matches!(pair, [DurableEvent::Write(_), DurableEvent::Flush(_)]),
"OFF interleaves write then flush per commit, got {pair:?}",
);
}
let writes_before = durable.write_count();
let flushes_before = durable.flush_count();
shared.compact().expect("compact ok");
assert_eq!(
durable.write_count(),
writes_before,
"compact appends nothing",
);
assert_eq!(
durable.flush_count(),
flushes_before,
"compact issues zero flush calls",
);
}
#[test]
fn t1b_reverse_order_seal_pair_off_one_flush_each() {
let durable = CountingDurable::new(b"CNT2");
let shared = Arc::new(graph_with_durable(
70_002,
durable.clone(),
CommitBatching::Off,
));
let mut txn_a = shared.begin_write();
txn_a
.mutator()
.create_node(LabelSet::single(db_string("A")), PropertyMap::new())
.unwrap();
let sealed_a = txn_a.seal(None, None).expect("A seals");
let mut txn_b = shared.begin_write();
txn_b
.mutator()
.create_node(LabelSet::single(db_string("B")), PropertyMap::new())
.unwrap();
let sealed_b = txn_b.seal(None, None).expect("B seals");
let shared_b = Arc::clone(&shared);
let b_thread = thread::spawn(move || {
shared_b
.submit_sealed_for_test(sealed_b)
.expect("B publishes")
});
for _ in 0..1_000 {
thread::yield_now();
}
let outcome_a = shared
.submit_sealed_for_test(sealed_a)
.expect("A publishes");
let outcome_b = b_thread.join().expect("B thread ok");
assert_eq!(outcome_a.generation, 1);
assert_eq!(outcome_b.generation, 2);
assert_eq!(durable.write_count(), 2);
assert_eq!(durable.flush_count(), 2);
assert_eq!(shared.read().node_count(), 2);
}
#[test]
fn t2_on_path_groups_fsyncs() {
const THREADS: usize = 8;
const PER_THREAD: usize = 64;
let durable = CountingDurable::new(b"CNT3");
let shared = Arc::new(graph_with_durable(
70_003,
durable.clone(),
CommitBatching::DEFAULT_ON,
));
let barrier = Arc::new(Barrier::new(THREADS));
thread::scope(|scope| {
for thread_idx in 0..THREADS {
let shared = Arc::clone(&shared);
let barrier = Arc::clone(&barrier);
scope.spawn(move || {
barrier.wait();
for commit_idx in 0..PER_THREAD {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(
LabelSet::single(db_string("N")),
PropertyMap::from_pairs([(
db_string("k"),
selene_core::Value::Int(
(thread_idx * PER_THREAD + commit_idx) as i64,
),
)])
.unwrap(),
)
.unwrap();
let outcome = txn.commit().expect("commit ok");
assert!(outcome.durable_at.is_some());
}
});
}
});
let total = THREADS * PER_THREAD;
assert_eq!(durable.write_count(), total, "every commit appended once");
assert_eq!(shared.read().node_count(), total);
assert_eq!(shared.read().meta.generation, total as u64);
assert!(
durable.flush_count() < durable.write_count(),
"ON grouped fsyncs: {} flushes for {} writes",
durable.flush_count(),
durable.write_count(),
);
}
struct GenOrderProvider {
tag: ProviderTag,
seen: Mutex<Vec<u64>>,
}
impl GenOrderProvider {
fn new(tag: &[u8; 4]) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
seen: Mutex::new(Vec::new()),
})
}
}
impl IndexProvider for GenOrderProvider {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
if let Change::NodeCreated { id, .. } = change {
self.seen.lock().unwrap().push(id.get());
}
Ok(())
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}
#[test]
fn t3_order_sensitive_batch_publishes_in_seal_order() {
let durable = CountingDurable::new(b"CNT4");
let provider = GenOrderProvider::new(b"GORD");
let shared = Arc::new(graph_with_durable_and_provider(
70_004,
durable,
provider.clone(),
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
let mut ids = Vec::new();
for label in ["A", "B", "C"] {
let mut txn = shared.begin_write();
let id = txn
.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
ids.push(id);
sealeds.push(txn.seal(None, None).expect("seals"));
}
let sealed_c = sealeds.pop().unwrap();
let sealed_b = sealeds.pop().unwrap();
let sealed_a = sealeds.pop().unwrap();
let s_c = Arc::clone(&shared);
let c_thread = thread::spawn(move || s_c.submit_sealed_for_test(sealed_c).expect("C"));
for _ in 0..1_000 {
thread::yield_now();
}
let s_b = Arc::clone(&shared);
let b_thread = thread::spawn(move || s_b.submit_sealed_for_test(sealed_b).expect("B"));
for _ in 0..1_000 {
thread::yield_now();
}
let outcome_a = shared.submit_sealed_for_test(sealed_a).expect("A");
let outcome_b = b_thread.join().unwrap();
let outcome_c = c_thread.join().unwrap();
assert_eq!(outcome_a.generation, 1);
assert_eq!(outcome_b.generation, 2);
assert_eq!(outcome_c.generation, 3);
let snap = shared.read();
assert_eq!(snap.meta.generation, 3);
assert_eq!(snap.node_count(), 3);
let seen = provider.seen.lock().unwrap().clone();
let expected: Vec<u64> = ids.iter().map(|id| id.get()).collect();
assert_eq!(seen, expected, "fan-out observed seal-order publish");
}
#[test]
fn t4_gap_ends_batch_no_deadlock() {
let durable = CountingDurable::new(b"CNT5");
let shared = Arc::new(graph_with_durable(70_005, durable, on(8, 8 * 1024 * 1024)));
let mut s0 = shared.begin_write();
s0.mutator()
.create_node(LabelSet::single(db_string("Z0")), PropertyMap::new())
.unwrap();
let sealed0 = s0.seal(None, None).expect("0");
let mut s1 = shared.begin_write();
s1.mutator()
.create_node(LabelSet::single(db_string("Z1")), PropertyMap::new())
.unwrap();
let sealed1 = s1.seal(None, None).expect("1");
let mut s2 = shared.begin_write();
s2.mutator()
.create_node(LabelSet::single(db_string("Z2")), PropertyMap::new())
.unwrap();
let sealed2 = s2.seal(None, None).expect("2");
let outcome0 = shared.submit_sealed_for_test(sealed0).expect("0 publishes");
assert_eq!(outcome0.generation, 1);
let s_two = Arc::clone(&shared);
let two_thread = thread::spawn(move || s_two.submit_sealed_for_test(sealed2).expect("2"));
for _ in 0..1_000 {
thread::yield_now();
}
assert_eq!(shared.read().meta.generation, 1, "2 not published over gap");
let outcome1 = shared.submit_sealed_for_test(sealed1).expect("1 publishes");
let outcome2 = two_thread.join().expect("2 thread ok");
assert_eq!(outcome1.generation, 2);
assert_eq!(outcome2.generation, 3);
assert_eq!(shared.read().node_count(), 3);
}
#[test]
fn t5_partial_batch_append_failure_errs_all_and_poisons() {
let durable = CountingDurable::fail_write_on(b"CNT6", 3);
let shared = Arc::new(graph_with_durable(
70_006,
durable.clone(),
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
for label in ["a", "b", "c", "d", "e"] {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
sealeds.push(txn.seal(None, None).expect("seals"));
}
let sealed_a = sealeds.remove(0);
let mut replies = Vec::new();
while let Some(sealed) = sealeds.pop() {
replies.push(
shared
.submit_sealed_async_for_test(sealed)
.expect("later seq enqueued"),
);
}
let a_reply = shared
.submit_sealed_async_for_test(sealed_a)
.expect("seq 0 enqueued");
let a_result = a_reply
.recv_timeout(Duration::from_secs(10))
.expect("seq 0 waiter did not hang");
let mut err_count = usize::from(a_result.is_err());
for reply in replies {
let result = reply
.recv_timeout(Duration::from_secs(10))
.expect("later waiter did not hang");
if result.is_err() {
err_count += 1;
}
}
assert_eq!(err_count, 5, "all 5 members Err (incl. already-appended)");
assert_eq!(shared.read().meta.generation, 0);
assert_eq!(shared.read().node_count(), 0);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
assert!(
matches!(txn.commit(), Err(GraphError::Durable { .. })),
"post-poison commit fails fast",
);
}
#[test]
fn t6_flush_failure_poisons_all() {
let durable = CountingDurable::fail_flush(b"CNT7");
let shared = Arc::new(graph_with_durable(
70_007,
durable.clone(),
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
for label in ["a", "b", "c", "d"] {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
sealeds.push(txn.seal(None, None).expect("seals"));
}
let deadline = Instant::now() + Duration::from_secs(10);
let mut handles = Vec::new();
for sealed in sealeds {
let shared = Arc::clone(&shared);
handles.push(thread::spawn(move || shared.submit_sealed_for_test(sealed)));
}
let mut err_count = 0;
for handle in handles {
if handle.join().expect("no panic").is_err() {
err_count += 1;
}
}
assert!(Instant::now() < deadline, "no waiter hung");
assert_eq!(err_count, 4, "all 4 Err on flush failure");
assert_eq!(shared.read().meta.generation, 0, "nothing published");
assert_eq!(durable.flush_count(), 0);
}
struct ArmPublishPanicProvider {
tag: ProviderTag,
armed: AtomicBool,
}
impl ArmPublishPanicProvider {
fn new(tag: &[u8; 4]) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
armed: AtomicBool::new(false),
})
}
}
impl IndexProvider for ArmPublishPanicProvider {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
if matches!(change, Change::NodeCreated { .. }) && !self.armed.swap(true, Ordering::SeqCst)
{
crate::write_txn::publish_panic_inject::arm(1);
}
Ok(())
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}
#[test]
fn t5b_publish_tail_panic_acks_member_errs_rest_and_poisons() {
let durable = CountingDurable::new(b"CNT8");
let provider = ArmPublishPanicProvider::new(b"ARMP");
let shared = Arc::new(graph_with_durable_and_provider(
70_008,
durable,
provider,
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
for label in ["x", "y", "z"] {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
sealeds.push(txn.seal(None, None).expect("seals"));
}
let deadline = Instant::now() + Duration::from_secs(10);
let sealed_0 = sealeds.remove(0);
let mut handles = Vec::new();
while let Some(sealed) = sealeds.pop() {
let shared = Arc::clone(&shared);
handles.push(thread::spawn(move || shared.submit_sealed_for_test(sealed)));
for _ in 0..1_000 {
thread::yield_now();
}
}
let r0 = shared.submit_sealed_for_test(sealed_0);
let mut later = Vec::new();
for handle in handles {
later.push(handle.join().expect("waiter thread did not panic"));
}
assert!(Instant::now() < deadline, "no waiter hung");
assert!(
r0.is_ok(),
"member 0 published before the panic, got {r0:?}"
);
assert_eq!(r0.unwrap().generation, 1);
assert_eq!(
later.iter().filter(|r| r.is_err()).count(),
2,
"the panicking member + the remaining member both Err",
);
assert_eq!(shared.read().node_count(), 1);
assert_eq!(shared.read().meta.generation, 1);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
assert!(
matches!(txn.commit(), Err(GraphError::Durable { .. })),
"post-panic commit fails fast (engine poisoned)",
);
}
#[path = "committer_batch_wal_tests.rs"]
mod wal;
#[path = "committer_batch_estimate_tests.rs"]
mod estimate;