use std::collections::BTreeMap;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use arc_swap::ArcSwap;
use crate::committer_batch::{
BatchDrain, BatchLimits, CommitBatching, drain_contiguous_batch, flush_and_publish_batch,
};
use crate::durable_provider::DurableProvider;
use crate::error::{GraphError, GraphResult};
use crate::graph::SeleneGraph;
use crate::index_provider::IndexProvider;
use crate::write_txn::{CommitOutcome, SealedCommit};
const WORK_CHANNEL_CAPACITY: usize = 1024;
pub(crate) enum Work {
Commit {
sealed: SealedCommit,
reply: SyncSender<GraphResult<CommitOutcome>>,
},
Compact {
seal_seq: u64,
dense: Arc<SeleneGraph>,
report: crate::CompactionReport,
reply: SyncSender<GraphResult<crate::CompactionReport>>,
},
VectorIndexRebuild {
seal_seq: u64,
rebuilt: Arc<SeleneGraph>,
report: crate::VectorIndexRebuildReport,
reply: SyncSender<GraphResult<crate::VectorIndexRebuildReport>>,
},
}
impl Work {
pub(crate) fn seal_seq(&self) -> u64 {
match self {
Work::Commit { sealed, .. } => sealed.seal_seq,
Work::Compact { seal_seq, .. } => *seal_seq,
Work::VectorIndexRebuild { seal_seq, .. } => *seal_seq,
}
}
pub(crate) fn is_snapshot_maintenance(&self) -> bool {
matches!(self, Work::Compact { .. } | Work::VectorIndexRebuild { .. })
}
}
pub(crate) struct CommitterHandles {
pub(crate) snapshot: Arc<ArcSwap<SeleneGraph>>,
pub(crate) schema_version: Arc<AtomicU64>,
pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
pub(crate) durable_providers: Vec<Arc<dyn DurableProvider>>,
pub(crate) batching: CommitBatching,
}
#[derive(Clone)]
pub(crate) struct Committer {
sender: SyncSender<Work>,
poisoned: Arc<std::sync::atomic::AtomicBool>,
next_seal_seq: Arc<AtomicU64>,
}
pub(crate) struct CommitterThread {
sender: Option<SyncSender<Work>>,
poisoned: Arc<std::sync::atomic::AtomicBool>,
next_seal_seq: Arc<AtomicU64>,
join: Mutex<Option<JoinHandle<()>>>,
}
impl CommitterThread {
pub(crate) fn spawn(handles: CommitterHandles) -> Self {
let (sender, receiver) = sync_channel::<Work>(WORK_CHANNEL_CAPACITY);
let poisoned = Arc::new(std::sync::atomic::AtomicBool::new(false));
let next_seal_seq = Arc::new(AtomicU64::new(0));
let thread_poisoned = Arc::clone(&poisoned);
let join = std::thread::Builder::new()
.name("selene-committer".to_owned())
.spawn(move || run_committer(receiver, handles, &thread_poisoned))
.expect("committer thread spawns");
Self {
sender: Some(sender),
poisoned,
next_seal_seq,
join: Mutex::new(Some(join)),
}
}
pub(crate) fn handle(&self) -> Committer {
Committer {
sender: self.sender.clone().expect("committer sender live"),
poisoned: Arc::clone(&self.poisoned),
next_seal_seq: Arc::clone(&self.next_seal_seq),
}
}
}
impl Drop for CommitterThread {
fn drop(&mut self) {
self.sender = None;
if let Some(join) = self.join.lock().expect("committer join lock").take() {
let _ = join.join();
}
}
}
impl Committer {
pub(crate) fn next_seal_seq(&self) -> u64 {
self.next_seal_seq.fetch_add(1, Ordering::Relaxed)
}
pub(crate) fn submit_commit(&self, sealed: SealedCommit) -> GraphResult<CommitOutcome> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<CommitOutcome>>(1);
self.sender
.send(Work::Commit {
sealed,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
reply_rx.recv().map_err(|_| committer_dead())?
}
#[cfg(test)]
pub(crate) fn submit_commit_async_for_test(
&self,
sealed: SealedCommit,
) -> GraphResult<Receiver<GraphResult<CommitOutcome>>> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<CommitOutcome>>(1);
self.sender
.send(Work::Commit {
sealed,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
Ok(reply_rx)
}
pub(crate) fn submit_compact(
&self,
seal_seq: u64,
dense: Arc<SeleneGraph>,
report: crate::CompactionReport,
) -> GraphResult<crate::CompactionReport> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<crate::CompactionReport>>(1);
self.sender
.send(Work::Compact {
seal_seq,
dense,
report,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
reply_rx.recv().map_err(|_| committer_dead())?
}
pub(crate) fn submit_vector_index_rebuild(
&self,
seal_seq: u64,
rebuilt: Arc<SeleneGraph>,
report: crate::VectorIndexRebuildReport,
) -> GraphResult<crate::VectorIndexRebuildReport> {
if self.poisoned.load(Ordering::Acquire) {
return Err(committer_dead());
}
let (reply_tx, reply_rx) = sync_channel::<GraphResult<crate::VectorIndexRebuildReport>>(1);
self.sender
.send(Work::VectorIndexRebuild {
seal_seq,
rebuilt,
report,
reply: reply_tx,
})
.map_err(|_| committer_dead())?;
reply_rx.recv().map_err(|_| committer_dead())?
}
}
pub(crate) fn committer_dead() -> GraphError {
GraphError::Durable {
reason: "commit thread is no longer running; the graph must be reopened".to_owned(),
}
}
fn run_committer(
receiver: Receiver<Work>,
handles: CommitterHandles,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
) {
let mut next_publish_seq: u64 = 0;
let mut reorder: BTreeMap<u64, Work> = BTreeMap::new();
let limits = BatchLimits::resolve(handles.batching);
loop {
let work = match receiver.recv() {
Ok(work) => work,
Err(_) => return,
};
reorder.insert(work.seal_seq(), work);
while reorder.contains_key(&next_publish_seq) {
if reorder
.get(&next_publish_seq)
.is_some_and(Work::is_snapshot_maintenance)
{
let Some(work) = reorder.remove(&next_publish_seq) else {
unreachable!("checked maintenance work at next_publish_seq above");
};
publish_snapshot_maintenance(work, poisoned, &handles);
next_publish_seq += 1;
if poisoned.load(Ordering::Acquire) {
drain_buffer_with_error(&mut reorder);
return;
}
continue;
}
match drain_contiguous_batch(
&receiver,
&mut reorder,
&mut next_publish_seq,
limits,
&handles,
poisoned,
) {
BatchDrain::Run { batch } => {
if flush_and_publish_batch(batch, &mut reorder, &handles, poisoned) {
return;
}
}
BatchDrain::AppendFailed { appended } => {
crate::committer_batch::ack_appended_with_error(appended);
drain_buffer_with_error(&mut reorder);
return;
}
}
}
}
}
fn publish_snapshot_maintenance(
work: Work,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
handles: &CommitterHandles,
) {
match work {
Work::Compact {
seal_seq: _,
dense,
report,
reply,
} => {
let result =
run_protected(|| publish_replacement_snapshot(&dense, &handles.snapshot, report));
let result = unwrap_protected(result, poisoned);
let _ = reply.send(result);
}
Work::VectorIndexRebuild {
seal_seq: _,
rebuilt,
report,
reply,
} => {
let result =
run_protected(|| publish_replacement_snapshot(&rebuilt, &handles.snapshot, report));
let result = unwrap_protected(result, poisoned);
let _ = reply.send(result);
}
Work::Commit { .. } => {
unreachable!("publish_snapshot_maintenance is never called with Work::Commit");
}
}
}
fn publish_replacement_snapshot<T>(
replacement: &Arc<SeleneGraph>,
snapshot: &ArcSwap<SeleneGraph>,
report: T,
) -> GraphResult<T> {
snapshot.store(Arc::clone(replacement));
Ok(report)
}
pub(crate) fn drain_buffer_with_error(reorder: &mut BTreeMap<u64, Work>) {
for (_, work) in std::mem::take(reorder) {
match work {
Work::Commit { reply, .. } => {
let _ = reply.send(Err(committer_dead()));
}
Work::Compact { reply, .. } => {
let _ = reply.send(Err(committer_dead()));
}
Work::VectorIndexRebuild { reply, .. } => {
let _ = reply.send(Err(committer_dead()));
}
}
}
}
pub(crate) fn run_protected<T>(
body: impl FnOnce() -> GraphResult<T>,
) -> Result<GraphResult<T>, Box<dyn std::any::Any + Send>> {
std::panic::catch_unwind(AssertUnwindSafe(body))
}
pub(crate) fn unwrap_protected<T>(
result: Result<GraphResult<T>, Box<dyn std::any::Any + Send>>,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
) -> GraphResult<T> {
match result {
Ok(Ok(value)) => Ok(value),
Ok(Err(error)) => {
poisoned.store(true, Ordering::Release);
tracing::error!(
error = %error,
"selene-graph: commit failed after seal; engine poisoned, reopen required",
);
Err(error)
}
Err(payload) => {
poisoned.store(true, Ordering::Release);
let description = crate::panic_payload::describe(&payload);
tracing::error!(
payload = %description,
"selene-graph: commit thread panicked; engine poisoned, reopen required",
);
Err(GraphError::Durable {
reason: format!("commit thread panicked: {description}"),
})
}
}
}
#[cfg(test)]
mod tests;