use std::collections::BTreeMap;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use arc_swap::ArcSwap;
use crate::committer_batch::{
BatchDrain, BatchLimits, CommitBatching, drain_contiguous_batch, flush_and_publish_batch,
};
use crate::durable_provider::DurableProvider;
use crate::error::{GraphError, GraphResult};
use crate::graph::SeleneGraph;
use crate::index_provider::IndexProvider;
use crate::write_txn::{CommitOutcome, SealedCommit};
const WORK_CHANNEL_CAPACITY: usize = 1024;
pub(crate) enum Work {
Commit {
sealed: SealedCommit,
reply: SyncSender<GraphResult<CommitOutcome>>,
},
Compact {
seal_seq: u64,
dense: Arc<SeleneGraph>,
report: crate::CompactionReport,
reply: SyncSender<GraphResult<crate::CompactionReport>>,
},
VectorIndexRebuild {
seal_seq: u64,
rebuilt: Arc<SeleneGraph>,
report: crate::VectorIndexRebuildReport,
reply: SyncSender<GraphResult<crate::VectorIndexRebuildReport>>,
},
}
impl Work {
pub(crate) fn seal_seq(&self) -> u64 {
match self {
Work::Commit { sealed, .. } => sealed.seal_seq,
Work::Compact { seal_seq, .. } => *seal_seq,
Work::VectorIndexRebuild { seal_seq, .. } => *seal_seq,
}
}
pub(crate) fn is_snapshot_maintenance(&self) -> bool {
matches!(self, Work::Compact { .. } | Work::VectorIndexRebuild { .. })
}
}
pub(crate) struct CommitterHandles {
pub(crate) snapshot: Arc<ArcSwap<SeleneGraph>>,
pub(crate) schema_version: Arc<AtomicU64>,
pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
pub(crate) durable_providers: Vec<Arc<dyn DurableProvider>>,
pub(crate) batching: CommitBatching,
}
#[derive(Clone)]
pub(crate) struct Committer {
sender: SyncSender<Work>,
poisoned: Arc<std::sync::atomic::AtomicBool>,
next_seal_seq: Arc<AtomicU64>,
}
pub(crate) struct CommitterThread {
sender: Option<SyncSender<Work>>,
poisoned: Arc<std::sync::atomic::AtomicBool>,
next_seal_seq: Arc<AtomicU64>,
join: Mutex<Option<JoinHandle<()>>>,
}
impl CommitterThread {
pub(crate) fn spawn(handles: CommitterHandles) -> Self {
let (sender, receiver) = sync_channel::<Work>(WORK_CHANNEL_CAPACITY);
let poisoned = Arc::new(std::sync::atomic::AtomicBool::new(false));
let next_seal_seq = Arc::new(AtomicU64::new(0));
let thread_poisoned = Arc::clone(&poisoned);
let join = std::thread::Builder::new()
.name("selene-committer".to_owned())
.spawn(move || run_committer(receiver, handles, &thread_poisoned))
.expect("committer thread spawns");
Self {
sender: Some(sender),
poisoned,
next_seal_seq,
join: Mutex::new(Some(join)),
}
}
pub(crate) fn handle(&self) -> Committer {
Committer {
sender: self.sender.clone().expect("committer sender live"),
poisoned: Arc::clone(&self.poisoned),
next_seal_seq: Arc::clone(&self.next_seal_seq),
}
}
}
impl Drop for CommitterThread {
fn drop(&mut self) {
self.sender = None;
if let Some(join) = self.join.lock().expect("committer join lock").take() {
let _ = join.join();
}
}
}
impl Committer {
pub(crate) fn next_seal_seq(&self) -> u64 {
self.next_seal_seq.fetch_add(1, Ordering::Relaxed)
}
pub(crate) fn submit_commit(&self, sealed: SealedCommit) -> GraphResult<CommitOutcome> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<CommitOutcome>>(1);
self.sender
.send(Work::Commit {
sealed,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
reply_rx.recv().map_err(|_| committer_dead())?
}
#[cfg(test)]
pub(crate) fn submit_commit_async_for_test(
&self,
sealed: SealedCommit,
) -> GraphResult<Receiver<GraphResult<CommitOutcome>>> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<CommitOutcome>>(1);
self.sender
.send(Work::Commit {
sealed,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
Ok(reply_rx)
}
pub(crate) fn submit_compact(
&self,
seal_seq: u64,
dense: Arc<SeleneGraph>,
report: crate::CompactionReport,
) -> GraphResult<crate::CompactionReport> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<crate::CompactionReport>>(1);
self.sender
.send(Work::Compact {
seal_seq,
dense,
report,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
reply_rx.recv().map_err(|_| committer_dead())?
}
pub(crate) fn submit_vector_index_rebuild(
&self,
seal_seq: u64,
rebuilt: Arc<SeleneGraph>,
report: crate::VectorIndexRebuildReport,
) -> GraphResult<crate::VectorIndexRebuildReport> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<crate::VectorIndexRebuildReport>>(1);
self.sender
.send(Work::VectorIndexRebuild {
seal_seq,
rebuilt,
report,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
reply_rx.recv().map_err(|_| committer_dead())?
}
}
pub(crate) fn committer_dead() -> GraphError {
GraphError::Durable {
reason: "commit thread is no longer running; the graph must be reopened".to_owned(),
}
}
fn run_committer(
receiver: Receiver<Work>,
handles: CommitterHandles,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
) {
let mut next_publish_seq: u64 = 0;
let mut reorder: BTreeMap<u64, Work> = BTreeMap::new();
let limits = BatchLimits::resolve(handles.batching);
loop {
let work = match receiver.recv() {
Ok(work) => work,
Err(_) => return,
};
reorder.insert(work.seal_seq(), work);
while reorder.contains_key(&next_publish_seq) {
if reorder
.get(&next_publish_seq)
.is_some_and(Work::is_snapshot_maintenance)
{
let Some(work) = reorder.remove(&next_publish_seq) else {
unreachable!("checked maintenance work at next_publish_seq above");
};
publish_snapshot_maintenance(work, poisoned, &handles);
next_publish_seq += 1;
if poisoned.load(Ordering::Acquire) {
drain_buffer_with_error(&mut reorder);
return;
}
continue;
}
match drain_contiguous_batch(
&receiver,
&mut reorder,
&mut next_publish_seq,
limits,
&handles,
poisoned,
) {
BatchDrain::Run { batch } => {
if flush_and_publish_batch(batch, &mut reorder, &handles, poisoned) {
return;
}
}
BatchDrain::AppendFailed { appended } => {
crate::committer_batch::ack_appended_with_error(appended);
drain_buffer_with_error(&mut reorder);
return;
}
}
}
}
}
fn publish_snapshot_maintenance(
work: Work,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
handles: &CommitterHandles,
) {
match work {
Work::Compact {
seal_seq: _,
dense,
report,
reply,
} => {
let result =
run_protected(|| publish_replacement_snapshot(&dense, &handles.snapshot, report));
let result = unwrap_protected(result, poisoned);
let _ = reply.send(result);
}
Work::VectorIndexRebuild {
seal_seq: _,
rebuilt,
report,
reply,
} => {
let result =
run_protected(|| publish_replacement_snapshot(&rebuilt, &handles.snapshot, report));
let result = unwrap_protected(result, poisoned);
let _ = reply.send(result);
}
Work::Commit { .. } => {
unreachable!("publish_snapshot_maintenance is never called with Work::Commit");
}
}
}
fn publish_replacement_snapshot<T>(
replacement: &Arc<SeleneGraph>,
snapshot: &ArcSwap<SeleneGraph>,
report: T,
) -> GraphResult<T> {
snapshot.store(Arc::clone(replacement));
Ok(report)
}
pub(crate) fn drain_buffer_with_error(reorder: &mut BTreeMap<u64, Work>) {
for (_, work) in std::mem::take(reorder) {
match work {
Work::Commit { reply, .. } => {
let _ = reply.send(Err(committer_dead()));
}
Work::Compact { reply, .. } => {
let _ = reply.send(Err(committer_dead()));
}
Work::VectorIndexRebuild { reply, .. } => {
let _ = reply.send(Err(committer_dead()));
}
}
}
}
pub(crate) fn run_protected<T>(
body: impl FnOnce() -> GraphResult<T>,
) -> Result<GraphResult<T>, Box<dyn std::any::Any + Send>> {
std::panic::catch_unwind(AssertUnwindSafe(body))
}
pub(crate) fn unwrap_protected<T>(
result: Result<GraphResult<T>, Box<dyn std::any::Any + Send>>,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
) -> GraphResult<T> {
match result {
Ok(Ok(value)) => Ok(value),
Ok(Err(error)) => {
poisoned.store(true, Ordering::Release);
tracing::error!(
error = %error,
"selene-graph: commit failed after seal; engine poisoned, reopen required",
);
Err(error)
}
Err(payload) => {
poisoned.store(true, Ordering::Release);
let description = crate::panic_payload::describe(&payload);
tracing::error!(
payload = %description,
"selene-graph: commit thread panicked; engine poisoned, reopen required",
);
Err(GraphError::Durable {
reason: format!("commit thread panicked: {description}"),
})
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use selene_core::{Change, GraphId, HlcTimestamp, LabelSet, PropertyMap};
use crate::SharedGraph;
use crate::durable_provider::DurableProvider;
use crate::error::GraphError;
use crate::index_provider::{ProviderError, ProviderTag};
fn db_string(value: &str) -> selene_core::DbString {
selene_core::db_string(value).expect("string fits DB string cap")
}
struct PanicOnWriteCommit;
impl DurableProvider for PanicOnWriteCommit {
fn provider_tag(&self) -> ProviderTag {
ProviderTag(*b"BOOM")
}
fn write_commit(
&self,
_principal: Option<&Arc<[u8]>>,
_changes: &[Change],
_timestamp: HlcTimestamp,
) -> Result<u64, ProviderError> {
panic!("synthetic committer-body panic in write_commit");
}
}
fn graph_with_panicking_durable(id: u64) -> SharedGraph {
SharedGraph::from_graph_with_core_and_durables(
crate::SeleneGraph::new(GraphId::new(id)),
Vec::new(),
vec![Arc::new(PanicOnWriteCommit) as Arc<dyn DurableProvider>],
None,
None,
crate::committer_batch::CommitBatching::Off,
)
.expect("graph builds with synthetic durable provider")
}
struct FailFirstWriteCommit {
calls: AtomicU64,
}
impl DurableProvider for FailFirstWriteCommit {
fn provider_tag(&self) -> ProviderTag {
ProviderTag(*b"FRST")
}
fn write_commit(
&self,
_principal: Option<&Arc<[u8]>>,
_changes: &[Change],
_timestamp: HlcTimestamp,
) -> Result<u64, ProviderError> {
let n = self.calls.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Err(ProviderError::Inconsistent {
reason: "synthetic first-commit durable failure".to_owned(),
})
} else {
Ok(n)
}
}
}
fn graph_with_fail_first_durable(id: u64) -> SharedGraph {
SharedGraph::from_graph_with_core_and_durables(
crate::SeleneGraph::new(GraphId::new(id)),
Vec::new(),
vec![Arc::new(FailFirstWriteCommit {
calls: AtomicU64::new(0),
}) as Arc<dyn DurableProvider>],
None,
None,
crate::committer_batch::CommitBatching::Off,
)
.expect("graph builds with synthetic fail-first durable provider")
}
#[test]
fn cancel_cutline_in_seal_rolls_back_with_no_burned_state() {
let dir = std::env::temp_dir().join(format!(
"selene-committer-cancel-{}-{:?}",
std::process::id(),
Instant::now()
));
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let wal_path = dir.join(selene_persist::DEFAULT_WAL_FILE_NAME);
let shared = SharedGraph::builder(GraphId::new(91_001))
.with_wal(&wal_path, selene_persist::WalConfig::default())
.unwrap()
.build()
.unwrap();
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let flag = Arc::new(std::sync::atomic::AtomicBool::new(true));
let err = match txn.seal(None, Some(&flag)) {
Ok(_) => panic!("pre-publish cancel must return Err from seal"),
Err(err) => err,
};
assert!(matches!(err, GraphError::Cancelled), "got {err:?}");
assert_eq!(err.gqlstatus(), "5GQL2");
assert_eq!(shared.read().node_count(), 0);
assert_eq!(shared.read().meta.generation, 0);
assert_eq!(shared.locked_generation_for_test(), 0);
assert_eq!(
shared.locked_arc_ptr_for_test(),
Arc::as_ptr(&shared.read()),
"live RwLock graph and published snapshot are the same Arc after a \
cancelled seal — no divergence",
);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let outcome = txn.commit().unwrap();
assert_eq!(outcome.durable_at, Some(1));
assert_eq!(outcome.generation, 1);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn cancel_token_unset_at_seal_proceeds_and_is_irrevocable() {
let shared = SharedGraph::new(GraphId::new(91_002));
let mut txn = shared.begin_write();
let id = txn
.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let sealed = txn.seal(None, Some(&flag)).expect("uncancelled seal");
let outcome = shared
.submit_sealed_for_test(sealed)
.expect("uncancelled commit publishes");
flag.store(true, std::sync::atomic::Ordering::Release);
assert_eq!(outcome.generation, 1);
assert!(shared.read().is_node_alive(id));
}
#[test]
fn committer_panic_poisons_and_fails_all_waiters_without_hanging() {
let shared = graph_with_panicking_durable(91_004);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string("L")), PropertyMap::new())
.unwrap();
let first = txn.commit();
assert!(
matches!(first, Err(GraphError::Durable { .. })),
"the panicking commit reports a Durable error, got {first:?}"
);
let deadline = Instant::now() + Duration::from_secs(5);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let second = txn.commit();
assert!(Instant::now() < deadline, "post-poison commit did not hang");
assert!(
matches!(second, Err(GraphError::Durable { .. })),
"post-poison commit fails fast, got {second:?}"
);
}
#[test]
fn concurrent_panicking_commits_all_err_without_hanging() {
let shared = Arc::new(graph_with_panicking_durable(91_005));
let deadline = Instant::now() + Duration::from_secs(10);
let mut handles = Vec::new();
for _ in 0..8 {
let shared = Arc::clone(&shared);
handles.push(std::thread::spawn(move || {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
txn.commit()
}));
}
for h in handles {
let result = h.join().expect("waiter thread did not panic");
assert!(
result.is_err(),
"every waiter behind a poisoned committer gets Err, got {result:?}"
);
}
assert!(Instant::now() < deadline, "no waiter hung after the panic");
}
#[test]
fn returned_write_commit_err_poisons_so_failed_commit_never_leaks() {
let shared = graph_with_fail_first_durable(91_006);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string("L")), PropertyMap::new())
.unwrap();
let first = txn.commit();
assert!(
matches!(first, Err(GraphError::Durable { .. })),
"a returned write_commit Err surfaces as Durable, got {first:?}"
);
assert_eq!(shared.read().node_count(), 0);
assert_eq!(shared.read().meta.generation, 0);
let deadline = Instant::now() + Duration::from_secs(5);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
let second = txn.commit();
assert!(Instant::now() < deadline, "post-poison commit did not hang");
assert!(
matches!(second, Err(GraphError::Durable { .. })),
"post-poison commit fails fast (engine poisoned), got {second:?}"
);
assert_eq!(
shared.read().node_count(),
0,
"the failed commit's node never leaked into the published snapshot",
);
}
#[test]
fn reorder_buffer_publishes_in_seal_order_not_arrival_order() {
let shared = Arc::new(SharedGraph::new(GraphId::new(91_007)));
let mut txn_a = shared.begin_write();
let a = txn_a
.mutator()
.create_node(LabelSet::single(db_string("A")), PropertyMap::new())
.unwrap();
let sealed_a = txn_a.seal(None, None).expect("A seals");
let mut txn_b = shared.begin_write();
let b = txn_b
.mutator()
.create_node(LabelSet::single(db_string("B")), PropertyMap::new())
.unwrap();
let sealed_b = txn_b.seal(None, None).expect("B seals");
let shared_b = Arc::clone(&shared);
let b_thread = std::thread::spawn(move || {
shared_b
.submit_sealed_for_test(sealed_b)
.expect("B publishes after A")
});
for _ in 0..1000 {
std::thread::yield_now();
}
let outcome_a = shared
.submit_sealed_for_test(sealed_a)
.expect("A publishes");
let outcome_b = b_thread.join().expect("B thread did not panic");
assert_eq!(outcome_a.generation, 1, "A is seal_seq 0 ⇒ generation 1");
assert_eq!(outcome_b.generation, 2, "B is seal_seq 1 ⇒ generation 2");
let snap = shared.read();
assert_eq!(
snap.meta.generation, 2,
"final published gen == max seal_seq"
);
assert!(
snap.is_node_alive(a),
"A's node survived in the final snapshot"
);
assert!(
snap.is_node_alive(b),
"B's node is present in the final snapshot"
);
assert_eq!(snap.node_count(), 2);
}
#[test]
fn compact_cannot_clobber_an_earlier_sealed_commit() {
let shared = Arc::new(SharedGraph::new(GraphId::new(91_008)));
{
let mut txn = shared.begin_write();
let mut ids = Vec::new();
for _ in 0..20 {
ids.push(
txn.mutator()
.create_node(LabelSet::single(db_string("S")), PropertyMap::new())
.unwrap(),
);
}
txn.commit().unwrap();
let mut txn = shared.begin_write();
for id in &ids {
txn.mutator().delete_node(*id).unwrap();
}
txn.commit().unwrap();
}
let mut txn_a = shared.begin_write();
let a = txn_a
.mutator()
.create_node(LabelSet::single(db_string("A")), PropertyMap::new())
.unwrap();
let sealed_a = txn_a.seal(None, None).expect("A seals");
let shared_c = Arc::clone(&shared);
let compactor = std::thread::spawn(move || shared_c.compact().expect("compaction ok"));
for _ in 0..1000 {
std::thread::yield_now();
}
let outcome_a = shared
.submit_sealed_for_test(sealed_a)
.expect("A publishes");
let report = compactor.join().expect("compactor did not panic");
assert_eq!(outcome_a.generation, 3);
assert!(report.reclaimed_nodes >= 20, "report: {report:?}");
let snap = shared.read();
assert!(snap.is_node_alive(a));
assert_eq!(snap.node_count(), 1, "only A is alive");
assert_eq!(
snap.node_store.len(),
1,
"published snapshot is dense — the compaction's reclamation was not \
clobbered by A's stale pre-compaction snapshot",
);
snap.assert_indexes_consistent()
.expect("published snapshot is structurally consistent");
}
}