use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use selene_core::{Change, GraphId, HlcTimestamp, LabelSet, PropertyMap};
use crate::SharedGraph;
use crate::durable_provider::DurableProvider;
use crate::error::GraphError;
use crate::index_provider::{ProviderError, ProviderTag};
fn db_string(value: &str) -> selene_core::DbString {
selene_core::db_string(value).expect("string fits DB string cap")
}
struct PanicOnWriteCommit;
impl DurableProvider for PanicOnWriteCommit {
fn provider_tag(&self) -> ProviderTag {
ProviderTag(*b"BOOM")
}
fn write_commit(
&self,
_principal: Option<&Arc<[u8]>>,
_changes: &[Change],
_timestamp: HlcTimestamp,
) -> Result<u64, ProviderError> {
panic!("synthetic committer-body panic in write_commit");
}
}
fn graph_with_panicking_durable(id: u64) -> SharedGraph {
SharedGraph::from_graph_with_core_and_durables(
crate::SeleneGraph::new(GraphId::new(id)),
Vec::new(),
vec![Arc::new(PanicOnWriteCommit) as Arc<dyn DurableProvider>],
None,
None,
crate::committer_batch::CommitBatching::Off,
)
.expect("graph builds with synthetic durable provider")
}
struct FailFirstWriteCommit {
calls: AtomicU64,
}
impl DurableProvider for FailFirstWriteCommit {
fn provider_tag(&self) -> ProviderTag {
ProviderTag(*b"FRST")
}
fn write_commit(
&self,
_principal: Option<&Arc<[u8]>>,
_changes: &[Change],
_timestamp: HlcTimestamp,
) -> Result<u64, ProviderError> {
let n = self.calls.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Err(ProviderError::Inconsistent {
reason: "synthetic first-commit durable failure".to_owned(),
})
} else {
Ok(n)
}
}
}
fn graph_with_fail_first_durable(id: u64) -> SharedGraph {
SharedGraph::from_graph_with_core_and_durables(
crate::SeleneGraph::new(GraphId::new(id)),
Vec::new(),
vec![Arc::new(FailFirstWriteCommit {
calls: AtomicU64::new(0),
}) as Arc<dyn DurableProvider>],
None,
None,
crate::committer_batch::CommitBatching::Off,
)
.expect("graph builds with synthetic fail-first durable provider")
}
#[test]
fn cancel_cutline_in_seal_rolls_back_with_no_burned_state() {
let dir = std::env::temp_dir().join(format!(
"selene-committer-cancel-{}-{:?}",
std::process::id(),
Instant::now()
));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let wal_path = dir.join(selene_persist::DEFAULT_WAL_FILE_NAME);
let shared = SharedGraph::builder(GraphId::new(91_001))
.with_wal(&wal_path, selene_persist::WalConfig::default())
.unwrap()
.build()
.unwrap();
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let flag = Arc::new(std::sync::atomic::AtomicBool::new(true));
let err = match txn.seal(None, Some(&flag)) {
Ok(_) => panic!("pre-publish cancel must return Err from seal"),
Err(err) => err,
};
assert!(matches!(err, GraphError::Cancelled), "got {err:?}");
assert_eq!(err.gqlstatus(), "5GQL2");
assert_eq!(shared.read().node_count(), 0);
assert_eq!(shared.read().meta.generation, 0);
assert_eq!(shared.locked_generation_for_test(), 0);
assert_eq!(
shared.locked_arc_ptr_for_test(),
Arc::as_ptr(&shared.read()),
"live RwLock graph and published snapshot are the same Arc after a \
cancelled seal — no divergence",
);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let outcome = txn.commit().unwrap();
assert_eq!(outcome.durable_at, Some(1));
assert_eq!(outcome.generation, 1);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn cancel_token_unset_at_seal_proceeds_and_is_irrevocable() {
let shared = SharedGraph::new(GraphId::new(91_002));
let mut txn = shared.begin_write();
let id = txn
.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let sealed = txn.seal(None, Some(&flag)).expect("uncancelled seal");
let outcome = shared
.submit_sealed_for_test(sealed)
.expect("uncancelled commit publishes");
flag.store(true, std::sync::atomic::Ordering::Release);
assert_eq!(outcome.generation, 1);
assert!(shared.read().is_node_alive(id));
}
#[test]
fn committer_panic_poisons_and_fails_all_waiters_without_hanging() {
let shared = graph_with_panicking_durable(91_004);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string("L")), PropertyMap::new())
.unwrap();
let first = txn.commit();
assert!(
matches!(first, Err(GraphError::Durable { .. })),
"the panicking commit reports a Durable error, got {first:?}"
);
let deadline = Instant::now() + Duration::from_secs(5);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let second = txn.commit();
assert!(Instant::now() < deadline, "post-poison commit did not hang");
assert!(
matches!(second, Err(GraphError::Durable { .. })),
"post-poison commit fails fast, got {second:?}"
);
}
#[test]
fn concurrent_panicking_commits_all_err_without_hanging() {
let shared = Arc::new(graph_with_panicking_durable(91_005));
let deadline = Instant::now() + Duration::from_secs(10);
let mut handles = Vec::new();
for _ in 0..8 {
let shared = Arc::clone(&shared);
handles.push(std::thread::spawn(move || {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
txn.commit()
}));
}
for h in handles {
let result = h.join().expect("waiter thread did not panic");
assert!(
result.is_err(),
"every waiter behind a poisoned committer gets Err, got {result:?}"
);
}
assert!(Instant::now() < deadline, "no waiter hung after the panic");
}
#[test]
fn returned_write_commit_err_poisons_so_failed_commit_never_leaks() {
let shared = graph_with_fail_first_durable(91_006);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string("L")), PropertyMap::new())
.unwrap();
let first = txn.commit();
assert!(
matches!(first, Err(GraphError::Durable { .. })),
"a returned write_commit Err surfaces as Durable, got {first:?}"
);
assert_eq!(shared.read().node_count(), 0);
assert_eq!(shared.read().meta.generation, 0);
let deadline = Instant::now() + Duration::from_secs(5);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let second = txn.commit();
assert!(Instant::now() < deadline, "post-poison commit did not hang");
assert!(
matches!(second, Err(GraphError::Durable { .. })),
"post-poison commit fails fast (engine poisoned), got {second:?}"
);
assert_eq!(
shared.read().node_count(),
0,
"the failed commit's node never leaked into the published snapshot",
);
}
#[test]
fn reorder_buffer_publishes_in_seal_order_not_arrival_order() {
let shared = Arc::new(SharedGraph::new(GraphId::new(91_007)));
let mut txn_a = shared.begin_write();
let a = txn_a
.mutator()
.create_node(LabelSet::single(db_string("A")), PropertyMap::new())
.unwrap();
let sealed_a = txn_a.seal(None, None).expect("A seals");
let mut txn_b = shared.begin_write();
let b = txn_b
.mutator()
.create_node(LabelSet::single(db_string("B")), PropertyMap::new())
.unwrap();
let sealed_b = txn_b.seal(None, None).expect("B seals");
let shared_b = Arc::clone(&shared);
let b_thread = std::thread::spawn(move || {
shared_b
.submit_sealed_for_test(sealed_b)
.expect("B publishes after A")
});
for _ in 0..1000 {
std::thread::yield_now();
}
let outcome_a = shared
.submit_sealed_for_test(sealed_a)
.expect("A publishes");
let outcome_b = b_thread.join().expect("B thread did not panic");
assert_eq!(outcome_a.generation, 1, "A is seal_seq 0 ⇒ generation 1");
assert_eq!(outcome_b.generation, 2, "B is seal_seq 1 ⇒ generation 2");
let snap = shared.read();
assert_eq!(
snap.meta.generation, 2,
"final published gen == max seal_seq"
);
assert!(
snap.is_node_alive(a),
"A's node survived in the final snapshot"
);
assert!(
snap.is_node_alive(b),
"B's node is present in the final snapshot"
);
assert_eq!(snap.node_count(), 2);
}
#[test]
fn compact_cannot_clobber_an_earlier_sealed_commit() {
let shared = Arc::new(SharedGraph::new(GraphId::new(91_008)));
{
let mut txn = shared.begin_write();
let mut ids = Vec::new();
for _ in 0..20 {
ids.push(
txn.mutator()
.create_node(LabelSet::single(db_string("S")), PropertyMap::new())
.unwrap(),
);
}
txn.commit().unwrap();
let mut txn = shared.begin_write();
for id in &ids {
txn.mutator().delete_node(*id).unwrap();
}
txn.commit().unwrap();
}
let mut txn_a = shared.begin_write();
let a = txn_a
.mutator()
.create_node(LabelSet::single(db_string("A")), PropertyMap::new())
.unwrap();
let sealed_a = txn_a.seal(None, None).expect("A seals");
let shared_c = Arc::clone(&shared);
let compactor = std::thread::spawn(move || shared_c.compact().expect("compaction ok"));
for _ in 0..1000 {
std::thread::yield_now();
}
let outcome_a = shared
.submit_sealed_for_test(sealed_a)
.expect("A publishes");
let report = compactor.join().expect("compactor did not panic");
assert_eq!(outcome_a.generation, 3);
assert!(report.reclaimed_nodes >= 20, "report: {report:?}");
let snap = shared.read();
assert!(snap.is_node_alive(a));
assert_eq!(snap.node_count(), 1, "only A is alive");
assert_eq!(
snap.node_store.len(),
1,
"published snapshot is dense — the compaction's reclamation was not \
clobbered by A's stale pre-compaction snapshot",
);
snap.assert_indexes_consistent()
.expect("published snapshot is structurally consistent");
}