use super::*;
#[test]
fn t1_off_equals_brief1_fsync_count() {
let durable = CountingDurable::new(b"CNT1");
let shared = graph_with_durable(70_001, durable.clone(), CommitBatching::Off);
for idx in 0..3 {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string("L")), PropertyMap::new())
.unwrap();
let outcome = txn.commit().expect("commit ok");
assert_eq!(outcome.generation, idx + 1);
assert_eq!(outcome.durable_at, Some(idx + 1));
}
shared
.create_property_index(db_string("L"), db_string("k"), crate::TypedIndexKind::I64)
.expect("index create ok");
let events = durable.events();
assert_eq!(durable.write_count(), 4, "events: {events:?}");
assert_eq!(
durable.flush_count(),
durable.write_count(),
"OFF: exactly one flush per commit (events: {events:?})",
);
for pair in events.chunks(2) {
assert!(
matches!(pair, [DurableEvent::Write(_), DurableEvent::Flush(_)]),
"OFF interleaves write then flush per commit, got {pair:?}",
);
}
let writes_before = durable.write_count();
let flushes_before = durable.flush_count();
shared.compact().expect("compact ok");
assert_eq!(
durable.write_count(),
writes_before,
"compact appends nothing",
);
assert_eq!(
durable.flush_count(),
flushes_before,
"compact issues zero flush calls",
);
}
#[test]
fn t1b_reverse_order_seal_pair_off_one_flush_each() {
let durable = CountingDurable::new(b"CNT2");
let shared = Arc::new(graph_with_durable(
70_002,
durable.clone(),
CommitBatching::Off,
));
let mut txn_a = shared.begin_write();
txn_a
.mutator()
.create_node(LabelSet::single(db_string("A")), PropertyMap::new())
.unwrap();
let sealed_a = txn_a.seal(None, None).expect("A seals");
let mut txn_b = shared.begin_write();
txn_b
.mutator()
.create_node(LabelSet::single(db_string("B")), PropertyMap::new())
.unwrap();
let sealed_b = txn_b.seal(None, None).expect("B seals");
let shared_b = Arc::clone(&shared);
let b_thread = thread::spawn(move || {
shared_b
.submit_sealed_for_test(sealed_b)
.expect("B publishes")
});
for _ in 0..1_000 {
thread::yield_now();
}
let outcome_a = shared
.submit_sealed_for_test(sealed_a)
.expect("A publishes");
let outcome_b = b_thread.join().expect("B thread ok");
assert_eq!(outcome_a.generation, 1);
assert_eq!(outcome_b.generation, 2);
assert_eq!(durable.write_count(), 2);
assert_eq!(durable.flush_count(), 2);
assert_eq!(shared.read().node_count(), 2);
}
#[test]
fn t2_on_path_groups_fsyncs() {
const THREADS: usize = 8;
const PER_THREAD: usize = 64;
let durable = CountingDurable::new(b"CNT3");
let shared = Arc::new(graph_with_durable(
70_003,
durable.clone(),
CommitBatching::DEFAULT_ON,
));
let barrier = Arc::new(Barrier::new(THREADS));
thread::scope(|scope| {
for thread_idx in 0..THREADS {
let shared = Arc::clone(&shared);
let barrier = Arc::clone(&barrier);
scope.spawn(move || {
barrier.wait();
for commit_idx in 0..PER_THREAD {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(
LabelSet::single(db_string("N")),
PropertyMap::from_pairs([(
db_string("k"),
selene_core::Value::Int(
(thread_idx * PER_THREAD + commit_idx) as i64,
),
)])
.unwrap(),
)
.unwrap();
let outcome = txn.commit().expect("commit ok");
assert!(outcome.durable_at.is_some());
}
});
}
});
let total = THREADS * PER_THREAD;
assert_eq!(durable.write_count(), total, "every commit appended once");
assert_eq!(shared.read().node_count(), total);
assert_eq!(shared.read().meta.generation, total as u64);
assert!(
durable.flush_count() < durable.write_count(),
"ON grouped fsyncs: {} flushes for {} writes",
durable.flush_count(),
durable.write_count(),
);
}
struct GenOrderProvider {
tag: ProviderTag,
seen: Mutex<Vec<u64>>,
}
impl GenOrderProvider {
fn new(tag: &[u8; 4]) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
seen: Mutex::new(Vec::new()),
})
}
}
impl IndexProvider for GenOrderProvider {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
if let Change::NodeCreated { id, .. } = change {
self.seen.lock().unwrap().push(id.get());
}
Ok(())
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}
#[test]
fn t3_order_sensitive_batch_publishes_in_seal_order() {
let durable = CountingDurable::new(b"CNT4");
let provider = GenOrderProvider::new(b"GORD");
let shared = Arc::new(graph_with_durable_and_provider(
70_004,
durable,
provider.clone(),
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
let mut ids = Vec::new();
for label in ["A", "B", "C"] {
let mut txn = shared.begin_write();
let id = txn
.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
ids.push(id);
sealeds.push(txn.seal(None, None).expect("seals"));
}
let sealed_c = sealeds.pop().unwrap();
let sealed_b = sealeds.pop().unwrap();
let sealed_a = sealeds.pop().unwrap();
let s_c = Arc::clone(&shared);
let c_thread = thread::spawn(move || s_c.submit_sealed_for_test(sealed_c).expect("C"));
for _ in 0..1_000 {
thread::yield_now();
}
let s_b = Arc::clone(&shared);
let b_thread = thread::spawn(move || s_b.submit_sealed_for_test(sealed_b).expect("B"));
for _ in 0..1_000 {
thread::yield_now();
}
let outcome_a = shared.submit_sealed_for_test(sealed_a).expect("A");
let outcome_b = b_thread.join().unwrap();
let outcome_c = c_thread.join().unwrap();
assert_eq!(outcome_a.generation, 1);
assert_eq!(outcome_b.generation, 2);
assert_eq!(outcome_c.generation, 3);
let snap = shared.read();
assert_eq!(snap.meta.generation, 3);
assert_eq!(snap.node_count(), 3);
let seen = provider.seen.lock().unwrap().clone();
let expected: Vec<u64> = ids.iter().map(|id| id.get()).collect();
assert_eq!(seen, expected, "fan-out observed seal-order publish");
}
#[test]
fn t4_gap_ends_batch_no_deadlock() {
let durable = CountingDurable::new(b"CNT5");
let shared = Arc::new(graph_with_durable(70_005, durable, on(8, 8 * 1024 * 1024)));
let mut s0 = shared.begin_write();
s0.mutator()
.create_node(LabelSet::single(db_string("Z0")), PropertyMap::new())
.unwrap();
let sealed0 = s0.seal(None, None).expect("0");
let mut s1 = shared.begin_write();
s1.mutator()
.create_node(LabelSet::single(db_string("Z1")), PropertyMap::new())
.unwrap();
let sealed1 = s1.seal(None, None).expect("1");
let mut s2 = shared.begin_write();
s2.mutator()
.create_node(LabelSet::single(db_string("Z2")), PropertyMap::new())
.unwrap();
let sealed2 = s2.seal(None, None).expect("2");
let outcome0 = shared.submit_sealed_for_test(sealed0).expect("0 publishes");
assert_eq!(outcome0.generation, 1);
let s_two = Arc::clone(&shared);
let two_thread = thread::spawn(move || s_two.submit_sealed_for_test(sealed2).expect("2"));
for _ in 0..1_000 {
thread::yield_now();
}
assert_eq!(shared.read().meta.generation, 1, "2 not published over gap");
let outcome1 = shared.submit_sealed_for_test(sealed1).expect("1 publishes");
let outcome2 = two_thread.join().expect("2 thread ok");
assert_eq!(outcome1.generation, 2);
assert_eq!(outcome2.generation, 3);
assert_eq!(shared.read().node_count(), 3);
}