selene-db-graph 1.3.0

In-memory property-graph storage core (ArcSwap + imbl CoW, label/typed indexes, write funnel) for selene-db.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
//! Single per-graph commit thread — the sole publisher of the live snapshot.
//!
//! # Why a dedicated committer (v1.2 multi-writer, BRIEF 1)
//!
//! Before v1.2 every writer thread published its own snapshot under the held
//! write lock (`WriteTxn::commit_with_principal`). v1.2 splits commit into two
//! halves (see [`crate::write_txn::WriteTxn::seal`]):
//!
//! 1. **seal** (session thread, under the lock): generation/meta bump + GG02
//!    validation + build the frozen next snapshot, then **release the lock**.
//! 2. **publish tail** (this committer thread, FIFO): HLC stamp → WAL append →
//!    `snapshot.store` → store-before-schema-bump → no-op provider fan-out.
//!
//! The committer is the **sole writer of the [`ArcSwap`] snapshot cell**. That
//! single-writer + seal-sequence-ordered discipline is what preserves D10
//! strict-serializability once `seal()` drops the write lock early.
//!
//! # Publish order == seal order (the P0 correctness invariant)
//!
//! `seal()` consumes the [`crate::WriteTxn`], so the write lock drops as it
//! returns — *before* the caller enqueues the bundle. Two sessions can seal in
//! lock order (A then B) yet `send()` in the opposite order, so raw channel
//! arrival order is **not** seal order. To publish in the correct total order
//! anyway, each publishable unit is stamped with a strictly-monotonic `seal_seq`
//! **under the write lock**, and the committer publishes strictly in ascending
//! `seal_seq` via a reorder buffer ([`run_committer`]). Channel arrival order
//! only governs *when* an item reaches the buffer, never the order it publishes.
//! **This is a new, load-bearing, NOT type-enforced invariant** — a second
//! committer or a second `ArcSwap` writer anywhere would silently break
//! serializability (see the v1.2 design §4 "the one honest shift"). Every
//! snapshot publisher routes here; the rerouting completeness is grep-gated and
//! load-bearing.
//!
//! # No committer-held write lock (deadlock surface removed)
//!
//! Compaction also follows seal-and-handover: `SharedGraph::compact` acquires
//! the write lock on the **caller** thread, allocates a `seal_seq`, densifies
//! the live graph, writes it back into `*shared`, releases the lock, and hands
//! the committer a *pre-built* dense snapshot. The committer therefore **never**
//! takes the write lock for any work item — eliminating the
//! send-under-lock/queued-compact deadlock entirely (a session is never
//! simultaneously lock-holding and blocked on the committer).
//!
//! # Group commit (v1.2 multi-writer, BRIEF 2)
//!
//! The committer now drives WAL durability in [`SyncPolicy::OnFlushOnly`]
//! (forced by the builder — see [`crate::SharedGraphBuilder::with_wal`]) and is
//! the **sole fsync caller**. Each loop iteration forms a contiguous-`seal_seq`
//! run of commits ([`crate::committer_batch::drain_contiguous_batch`] — Stage 1
//! append, fsync deferred), runs ONE group flush
//! ([`crate::write_txn::flush_durables`] — Stage 2, the R1
//! fsync-before-publish barrier), then publishes + acks each member in
//! `seal_seq` order ([`crate::write_txn::publish_appended`] — Stage 3/4). The
//! run length is capped at 1 when [`CommitBatching::Off`] (the default) and at
//! N when [`CommitBatching::On`], so **OFF is the degenerate `N=1` case of the
//! identical batched code** — one append + one fsync + one publish + one ack,
//! the same syscalls in the same order as BRIEF 1's `EveryN(1)`. A
//! Snapshot-maintenance work is a hard flush boundary: it is never co-batched
//! (F2 — its replacement snapshot already contains every lower-`seal_seq`
//! commit's mutation, so the pending commit run must be flushed + published
//! first to keep durable-before-visible).
//!
//! [`SyncPolicy::OnFlushOnly`]: crate::SyncPolicy::OnFlushOnly

use std::collections::BTreeMap;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;

use arc_swap::ArcSwap;

use crate::committer_batch::{
    BatchDrain, BatchLimits, CommitBatching, drain_contiguous_batch, flush_and_publish_batch,
};
use crate::durable_provider::DurableProvider;
use crate::error::{GraphError, GraphResult};
use crate::graph::SeleneGraph;
use crate::index_provider::IndexProvider;
use crate::write_txn::{CommitOutcome, SealedCommit};

/// Bound on the inbound work queue (global back-pressure). A full channel
/// blocks the enqueuing session — natural global back-pressure with no
/// semaphore. Sized generously so steady-state sessions never block on a
/// healthy committer, while still bounding unbounded fan-in memory.
const WORK_CHANNEL_CAPACITY: usize = 1024;

/// Work submitted to the committer thread, each tagged with its publish-order
/// `seal_seq` (allocated under the write lock by the caller).
///
/// `Commit` carries a fully-built, frozen [`SealedCommit`] (no lock, no graph
/// reference): the committer never re-validates, re-allocates ids, or re-applies
/// a change list. Snapshot-maintenance variants carry a *pre-built* replacement
/// snapshot (built on the caller thread under the lock, like a commit) — the
/// committer never touches the write lock.
///
/// Index DDL is **not** a distinct variant: `create_property_index_named` /
/// `drop_property_index` build + `seal()` their `WriteTxn` on the caller thread
/// (releasing the lock) exactly like any other write, then submit a
/// `Work::Commit`.
pub(crate) enum Work {
    /// Publish a pre-sealed commit (the common path: autocommit, explicit-txn
    /// terminal COMMIT, and index DDL).
    Commit {
        sealed: SealedCommit,
        reply: SyncSender<GraphResult<CommitOutcome>>,
    },
    /// Publish a pre-built dense compacted snapshot. Built + written into
    /// `*shared` on the caller thread under the lock; the committer only swaps
    /// the [`ArcSwap`] cell in `seal_seq` order.
    Compact {
        seal_seq: u64,
        dense: Arc<SeleneGraph>,
        report: crate::CompactionReport,
        reply: SyncSender<GraphResult<crate::CompactionReport>>,
    },
    /// Publish a pre-built snapshot with rebuilt vector indexes. This is pure
    /// derived-state reclamation: no WAL append, no schema bump, no provider
    /// fan-out.
    VectorIndexRebuild {
        seal_seq: u64,
        rebuilt: Arc<SeleneGraph>,
        report: crate::VectorIndexRebuildReport,
        reply: SyncSender<GraphResult<crate::VectorIndexRebuildReport>>,
    },
}

impl Work {
    /// The publish-order key under which the reorder buffer releases this item.
    pub(crate) fn seal_seq(&self) -> u64 {
        match self {
            Work::Commit { sealed, .. } => sealed.seal_seq,
            Work::Compact { seal_seq, .. } => *seal_seq,
            Work::VectorIndexRebuild { seal_seq, .. } => *seal_seq,
        }
    }

    /// Return true for WAL-free snapshot replacement work.
    pub(crate) fn is_snapshot_maintenance(&self) -> bool {
        matches!(self, Work::Compact { .. } | Work::VectorIndexRebuild { .. })
    }
}

/// Long-lived Arc handles the committer thread needs to publish.
///
/// These are clones of the [`crate::SharedGraph`] internals. The committer owns
/// them for its whole life; it is the only thread that calls `snapshot.store`.
/// It does **not** hold the write lock (`shared`) — compaction builds on the
/// caller thread — so no `RwLock` handle is needed here.
pub(crate) struct CommitterHandles {
    /// The published-snapshot cell. The committer is its sole writer.
    pub(crate) snapshot: Arc<ArcSwap<SeleneGraph>>,
    /// Plan-cache schema epoch, bumped strictly after `snapshot.store`.
    pub(crate) schema_version: Arc<AtomicU64>,
    /// Fan-out (no-op in production) providers. Shares the construction-time
    /// frozen registry allocation with [`crate::SharedGraph`].
    pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
    /// Commit-critical durable providers (WAL). The committer is their sole
    /// `write_commit`/`flush` caller, which is what makes the BRIEF 2
    /// `OnFlushOnly` toggle committer-exclusive.
    pub(crate) durable_providers: Vec<Arc<dyn DurableProvider>>,
    /// Group-commit policy (BRIEF 2). `Off` (the default) caps each drained run
    /// at one commit ⇒ one append + one fsync per commit (BRIEF-1 behavior);
    /// `On` coalesces a contiguous run into one group flush.
    pub(crate) batching: CommitBatching,
}

/// Handle to the per-graph committer thread, owned by [`crate::SharedGraph`].
///
/// Cloned into every [`crate::WriteTxn`] so `commit()`/`commit_with_principal()`
/// can seal-and-submit without a back-reference to the graph. Dropping the last
/// handle closes the inbound channel; the committer thread then drains and
/// exits, and its [`JoinHandle`] is joined by [`SharedGraph`]'s `Drop`.
#[derive(Clone)]
pub(crate) struct Committer {
    sender: SyncSender<Work>,
    /// Set true if the committer thread died (panic) or a post-seal commit
    /// failed durably. Subsequent submits fail fast with [`GraphError::Durable`]
    /// instead of blocking forever on a `recv()` whose `SyncSender` was dropped,
    /// or trusting an in-memory graph that diverged from the published snapshot.
    poisoned: Arc<std::sync::atomic::AtomicBool>,
    /// Strictly-monotonic publish-order allocator. Each `seal()` / `compact()`
    /// takes the next value **under the write lock**, so the sequence order is
    /// the lock-acquisition (total) order. The committer publishes in this
    /// order via its reorder buffer.
    next_seal_seq: Arc<AtomicU64>,
}

/// Owner-side committer state held by [`crate::SharedGraph`]: the canonical
/// submit handle plus the join handle so the thread is shut down cleanly on drop.
///
/// The canonical [`SyncSender`] lives here in an `Option`; [`Self::handle`]
/// hands out cheap clones to each [`WriteTxn`](crate::WriteTxn). On drop the
/// canonical sender is taken (dropped) first; once every `WriteTxn`-held clone
/// is also gone (they borrow `&SharedGraph`, so they are dropped before
/// `SharedGraph` itself), the channel disconnects, the committer's `recv()`
/// returns `Err`, and the loop exits — then we join the thread.
pub(crate) struct CommitterThread {
    /// The canonical sender — the single structural sender owned here. Cloned
    /// for each `WriteTxn`. Taken (dropped) first on shutdown.
    sender: Option<SyncSender<Work>>,
    poisoned: Arc<std::sync::atomic::AtomicBool>,
    next_seal_seq: Arc<AtomicU64>,
    join: Mutex<Option<JoinHandle<()>>>,
}

impl CommitterThread {
    /// Spawn the committer thread for a graph and return its owner-side handle.
    pub(crate) fn spawn(handles: CommitterHandles) -> Self {
        let (sender, receiver) = sync_channel::<Work>(WORK_CHANNEL_CAPACITY);
        let poisoned = Arc::new(std::sync::atomic::AtomicBool::new(false));
        // seal_seq starts at 0; the committer's reorder buffer expects the first
        // published item to carry seal_seq 0 (its `next_publish_seq` init).
        let next_seal_seq = Arc::new(AtomicU64::new(0));
        let thread_poisoned = Arc::clone(&poisoned);
        let join = std::thread::Builder::new()
            .name("selene-committer".to_owned())
            .spawn(move || run_committer(receiver, handles, &thread_poisoned))
            .expect("committer thread spawns");
        Self {
            sender: Some(sender),
            poisoned,
            next_seal_seq,
            join: Mutex::new(Some(join)),
        }
    }

    /// Hand out a cheaply-cloneable submit handle bound to this committer.
    pub(crate) fn handle(&self) -> Committer {
        Committer {
            sender: self.sender.clone().expect("committer sender live"),
            poisoned: Arc::clone(&self.poisoned),
            next_seal_seq: Arc::clone(&self.next_seal_seq),
        }
    }
}

impl Drop for CommitterThread {
    fn drop(&mut self) {
        // Drop the canonical sender so the channel closes once every
        // WriteTxn-held clone is also dropped (those borrow &SharedGraph and so
        // are already gone when SharedGraph drops). The committer's recv() then
        // returns Err and the loop exits.
        self.sender = None;
        if let Some(join) = self.join.lock().expect("committer join lock").take() {
            let _ = join.join();
        }
    }
}

impl Committer {
    /// Allocate the next strictly-monotonic publish-order key.
    ///
    /// MUST be called while holding the write lock (in `seal()` / `compact()`),
    /// so the allocation order equals lock-acquisition order. `Relaxed` is sound
    /// for the counter itself: the cross-thread ordering comes from the write
    /// lock's release/acquire, not from this atomic.
    pub(crate) fn next_seal_seq(&self) -> u64 {
        self.next_seal_seq.fetch_add(1, Ordering::Relaxed)
    }

    /// Seal-and-submit a commit, blocking until it is durable + visible.
    ///
    /// The caller MUST have released the write lock (i.e. `sealed` came from a
    /// consumed [`crate::WriteTxn`]) **before** calling this — `seal()` does
    /// exactly that. The committer publishes strictly in `sealed.seal_seq`
    /// order, so channel arrival order (which can differ from seal order) does
    /// not affect the published total order.
    pub(crate) fn submit_commit(&self, sealed: SealedCommit) -> GraphResult<CommitOutcome> {
        if self.poisoned.load(Ordering::Acquire) {
            return Err(committer_dead());
        }
        let (reply_tx, reply_rx) = sync_channel::<GraphResult<CommitOutcome>>(1);
        self.sender
            .send(Work::Commit {
                sealed,
                reply: reply_tx,
            })
            .map_err(|_| committer_dead())?;
        // BLOCKS until the committer publishes and acks, so a session never
        // observes its own commit before linearization.
        reply_rx.recv().map_err(|_| committer_dead())?
    }

    #[cfg(test)]
    pub(crate) fn submit_commit_async_for_test(
        &self,
        sealed: SealedCommit,
    ) -> GraphResult<Receiver<GraphResult<CommitOutcome>>> {
        if self.poisoned.load(Ordering::Acquire) {
            return Err(committer_dead());
        }
        let (reply_tx, reply_rx) = sync_channel::<GraphResult<CommitOutcome>>(1);
        self.sender
            .send(Work::Commit {
                sealed,
                reply: reply_tx,
            })
            .map_err(|_| committer_dead())?;
        Ok(reply_rx)
    }

    /// Submit a pre-built dense compacted snapshot, blocking until the committer
    /// publishes it (in `seal_seq` order) or reports an error.
    ///
    /// The dense graph is built + written into `*shared` on the caller thread
    /// under the write lock (see [`crate::SharedGraph::compact`]); the committer
    /// only swaps the `ArcSwap` cell. Because the `seal_seq` was allocated under
    /// the same lock, a compact can never be reordered ahead of an
    /// earlier-sealed commit, so the published snapshot never regresses to a
    /// stale (non-dense) layout (P1 fix).
    pub(crate) fn submit_compact(
        &self,
        seal_seq: u64,
        dense: Arc<SeleneGraph>,
        report: crate::CompactionReport,
    ) -> GraphResult<crate::CompactionReport> {
        if self.poisoned.load(Ordering::Acquire) {
            return Err(committer_dead());
        }
        let (reply_tx, reply_rx) = sync_channel::<GraphResult<crate::CompactionReport>>(1);
        self.sender
            .send(Work::Compact {
                seal_seq,
                dense,
                report,
                reply: reply_tx,
            })
            .map_err(|_| committer_dead())?;
        reply_rx.recv().map_err(|_| committer_dead())?
    }

    /// Submit a pre-built snapshot with rebuilt vector indexes.
    pub(crate) fn submit_vector_index_rebuild(
        &self,
        seal_seq: u64,
        rebuilt: Arc<SeleneGraph>,
        report: crate::VectorIndexRebuildReport,
    ) -> GraphResult<crate::VectorIndexRebuildReport> {
        if self.poisoned.load(Ordering::Acquire) {
            return Err(committer_dead());
        }
        let (reply_tx, reply_rx) = sync_channel::<GraphResult<crate::VectorIndexRebuildReport>>(1);
        self.sender
            .send(Work::VectorIndexRebuild {
                seal_seq,
                rebuilt,
                report,
                reply: reply_tx,
            })
            .map_err(|_| committer_dead())?;
        reply_rx.recv().map_err(|_| committer_dead())?
    }
}

/// Error returned to every waiter when the committer thread is gone (panicked
/// or shutting down). Maps to GQLSTATUS `5GQL0` like any durable failure.
pub(crate) fn committer_dead() -> GraphError {
    GraphError::Durable {
        reason: "commit thread is no longer running; the graph must be reopened".to_owned(),
    }
}

/// Committer thread entry point: drain [`Work`] into a reorder buffer and
/// publish strictly in `seal_seq` order, batching contiguous commits into one
/// group fsync (v1.2 multi-writer, BRIEF 2).
///
/// Items can arrive out of seal order (the lock drops inside `seal()`, before
/// the caller's `send`). The committer buffers each arrival keyed by `seal_seq`
/// and only publishes the contiguous run starting at `next_publish_seq`. This
/// makes publish order == seal order == lock-acquisition order regardless of
/// channel arrival order — the P0 publish-ordering invariant.
///
/// Each contiguous-run pass:
/// 1. If the head is snapshot-maintenance work, publish it **solo** (store the
///    replacement Arc, no append/flush/schema-bump/fan-out) — a hard flush
///    boundary (F2).
/// 2. Otherwise form the contiguous commit run
///    ([`drain_contiguous_batch`] — append, fsync deferred), then
///    [`flush_and_publish_batch`] (ONE group flush == the R1 barrier, then
///    publish + ack each member in `seal_seq` order).
///
/// With [`CommitBatching::Off`] each run is capped at one commit, so this is the
/// degenerate `N=1` case of the identical code — one append + one fsync + one
/// publish + one ack per commit (BRIEF-1 behavior).
fn run_committer(
    receiver: Receiver<Work>,
    handles: CommitterHandles,
    poisoned: &Arc<std::sync::atomic::AtomicBool>,
) {
    // The seal_seq the next publish must carry. Allocation starts at 0, so the
    // first published item is seal_seq 0; thereafter strictly +1 with no gaps
    // (an aborted/cancelled seal never consumes a seal_seq — see
    // `WriteTxn::seal`, which allocates only after every fallible step).
    let mut next_publish_seq: u64 = 0;
    let mut reorder: BTreeMap<u64, Work> = BTreeMap::new();
    let limits = BatchLimits::resolve(handles.batching);
    loop {
        // Block for the next arrival. Channel-closed => owner dropped => exit.
        // A clean shutdown drops the canonical sender only after every
        // WriteTxn-held clone is gone, so no in-flight commit can still be
        // mid-seal when the channel closes; any buffered-but-unpublishable item
        // (a gap that will never fill) is dropped, which Errs its waiter via the
        // dropped reply sender — correct, since shutdown means no more commits.
        let work = match receiver.recv() {
            Ok(work) => work,
            Err(_) => return,
        };
        reorder.insert(work.seal_seq(), work);

        // Drain every contiguous run now available, in seal_seq order.
        while reorder.contains_key(&next_publish_seq) {
            // F2: snapshot maintenance at head publishes solo (empty batch by
            // construction — all lower seqs are already durable + visible, so
            // this needs zero flush calls). It is never co-batched with commits.
            if reorder
                .get(&next_publish_seq)
                .is_some_and(Work::is_snapshot_maintenance)
            {
                let Some(work) = reorder.remove(&next_publish_seq) else {
                    unreachable!("checked maintenance work at next_publish_seq above");
                };
                publish_snapshot_maintenance(work, poisoned, &handles);
                next_publish_seq += 1;
                if poisoned.load(Ordering::Acquire) {
                    drain_buffer_with_error(&mut reorder);
                    return;
                }
                continue;
            }

            // Phase 1: form the contiguous commit run (append, fsync deferred).
            match drain_contiguous_batch(
                &receiver,
                &mut reorder,
                &mut next_publish_seq,
                limits,
                &handles,
                poisoned,
            ) {
                BatchDrain::Run { batch } => {
                    // Phase 2 (R1 barrier) + Phase 3/4 (publish + ack). Returns
                    // true when poisoned + drained ⇒ stop.
                    if flush_and_publish_batch(batch, &mut reorder, &handles, poisoned) {
                        return;
                    }
                }
                BatchDrain::AppendFailed { appended } => {
                    // A Stage-1 append failed: the failed waiter was already
                    // Err'd inside drain_contiguous_batch; here we Err every
                    // already-appended member (their unflushed bytes are correct
                    // to lose on reopen), drain the buffer, and exit. Nothing in
                    // the run was flushed or published.
                    crate::committer_batch::ack_appended_with_error(appended);
                    drain_buffer_with_error(&mut reorder);
                    return;
                }
            }
        }
    }
}

/// Publish WAL-free snapshot maintenance (F2 hard flush boundary): store the
/// replacement Arc only — no append, no flush, no schema-bump, no fan-out.
/// Wrapped in `catch_unwind`; a `store` panic poisons (see
/// [`unwrap_protected`]). All lower `seal_seq` commits are already durable +
/// visible by the time maintenance reaches head, so it needs zero flush calls.
fn publish_snapshot_maintenance(
    work: Work,
    poisoned: &Arc<std::sync::atomic::AtomicBool>,
    handles: &CommitterHandles,
) {
    match work {
        Work::Compact {
            seal_seq: _,
            dense,
            report,
            reply,
        } => {
            let result =
                run_protected(|| publish_replacement_snapshot(&dense, &handles.snapshot, report));
            let result = unwrap_protected(result, poisoned);
            let _ = reply.send(result);
        }
        Work::VectorIndexRebuild {
            seal_seq: _,
            rebuilt,
            report,
            reply,
        } => {
            let result =
                run_protected(|| publish_replacement_snapshot(&rebuilt, &handles.snapshot, report));
            let result = unwrap_protected(result, poisoned);
            let _ = reply.send(result);
        }
        Work::Commit { .. } => {
            unreachable!("publish_snapshot_maintenance is never called with Work::Commit");
        }
    }
}

/// Publish a pre-built replacement snapshot. Pure space reclamation: no WAL
/// append, no schema bump, no fan-out — only the `ArcSwap` swap.
///
/// The replacement graph's structural consistency was already verified or
/// rebuilt on the caller thread, **before** it was written into `*shared` — so a
/// broken replacement never reaches this point. Re-asserting here, after the
/// store, would risk the same "returns-Err-but-actually-published" inversion
/// the commit path avoids (P2), for zero added coverage. Wrapped in
/// `catch_unwind` by the caller only so a `store` panic still poisons rather
/// than aborts the committer thread.
fn publish_replacement_snapshot<T>(
    replacement: &Arc<SeleneGraph>,
    snapshot: &ArcSwap<SeleneGraph>,
    report: T,
) -> GraphResult<T> {
    snapshot.store(Arc::clone(replacement));
    Ok(report)
}

/// Fail every buffered waiter with `committer_dead` so no reply `SyncSender` is
/// dropped silently (which would hang its `recv()` with a `RecvError`). Called
/// once the committer is poisoned and about to exit.
pub(crate) fn drain_buffer_with_error(reorder: &mut BTreeMap<u64, Work>) {
    for (_, work) in std::mem::take(reorder) {
        match work {
            Work::Commit { reply, .. } => {
                let _ = reply.send(Err(committer_dead()));
            }
            Work::Compact { reply, .. } => {
                let _ = reply.send(Err(committer_dead()));
            }
            Work::VectorIndexRebuild { reply, .. } => {
                let _ = reply.send(Err(committer_dead()));
            }
        }
    }
}

/// Run a committer body inside `catch_unwind`. parking_lot does not poison, so
/// a panic leaves locks usable; the engine is poisoned at a higher level
/// instead (no further commits trusted).
pub(crate) fn run_protected<T>(
    body: impl FnOnce() -> GraphResult<T>,
) -> Result<GraphResult<T>, Box<dyn std::any::Any + Send>> {
    std::panic::catch_unwind(AssertUnwindSafe(body))
}

/// Convert a `catch_unwind` result into a `GraphResult`, poisoning the committer
/// on a panic **or a returned error** so subsequent submits fail fast.
///
/// A returned `Err` from a publish is a *post-seal* failure: `seal()` already
/// wove the commit's mutation into `*shared` (and a later seal may have forked
/// off it), so the live in-memory graph cannot be surgically rolled back to
/// exclude only the failed commit. Poisoning is therefore the only consistent
/// recovery — the durable WAL never received the failed entry, so a reopen
/// (recovery) heals the divergence. This restores the pre-v1.2 invariant that a
/// commit reporting `Err` never leaks into the published snapshot or any later
/// commit's baseline (the failed-`write_commit` regression, P0).
pub(crate) fn unwrap_protected<T>(
    result: Result<GraphResult<T>, Box<dyn std::any::Any + Send>>,
    poisoned: &Arc<std::sync::atomic::AtomicBool>,
) -> GraphResult<T> {
    match result {
        Ok(Ok(value)) => Ok(value),
        Ok(Err(error)) => {
            // Post-seal durable failure: the in-memory `*shared` already
            // advanced past this commit and cannot be unwound, so poison.
            poisoned.store(true, Ordering::Release);
            tracing::error!(
                error = %error,
                "selene-graph: commit failed after seal; engine poisoned, reopen required",
            );
            Err(error)
        }
        Err(payload) => {
            poisoned.store(true, Ordering::Release);
            let description = crate::panic_payload::describe(&payload);
            tracing::error!(
                payload = %description,
                "selene-graph: commit thread panicked; engine poisoned, reopen required",
            );
            // Open-risk #2 (split-brain): a panic between seal() and store()
            // can leave the live guard-Arc and the published snapshot
            // divergent for that commit. We cannot reconcile in-process; the
            // engine is poisoned and a reopen (recovery from the durable WAL,
            // which never saw the un-appended commit) restores consistency.
            Err(GraphError::Durable {
                reason: format!("commit thread panicked: {description}"),
            })
        }
    }
}

#[cfg(test)]
mod tests;