Skip to main content

selene_graph/
write_txn.rs

1//! Write transaction RAII handle per spec 03 sections 4 and 6.
2
3use std::sync::mpsc::SyncSender;
4use std::sync::{
5    Arc,
6    atomic::{AtomicBool, AtomicU64, Ordering},
7};
8use std::time::Instant;
9
10use arc_swap::ArcSwap;
11use parking_lot::{MutexGuard, RwLockWriteGuard};
12use selene_core::{Change, HlcTimestamp, metrics};
13
14use crate::committer::Committer;
15use crate::durable_provider::DurableProvider;
16use crate::error::{GraphError, GraphResult};
17use crate::graph::SeleneGraph;
18use crate::id_allocator::IdAllocator;
19use crate::index_provider::IndexProvider;
20use crate::mutator::Mutator;
21use crate::type_validator::TypeWarning;
22
23/// Non-fatal graph commit warning.
24#[derive(Clone, Debug, Eq, PartialEq)]
25pub struct CommitWarning {
26    /// Closed-graph validation warning.
27    pub warning: TypeWarning,
28}
29
30/// Result metadata returned after a successful commit.
31#[derive(Clone, Debug, PartialEq)]
32pub struct CommitOutcome {
33    /// Published graph generation.
34    pub generation: u64,
35    /// Changes produced by the mutation funnel.
36    pub changes: Vec<Change>,
37    /// Opaque caller-supplied principal bytes for future WAL headers.
38    pub principal: Option<Arc<[u8]>>,
39    /// Highest durable sequence assigned by commit-critical providers.
40    pub durable_at: Option<u64>,
41    /// Next node ID after commit.
42    pub next_node_id: u64,
43    /// Next edge ID after commit.
44    pub next_edge_id: u64,
45    /// Non-fatal warnings produced during commit validation.
46    pub warnings: Vec<CommitWarning>,
47}
48
49/// A frozen, owned, `Send + 'static` commit bundle handed from a session thread
50/// to the single committer thread (v1.2 multi-writer, BRIEF 1).
51///
52/// Produced by [`WriteTxn::seal`] **after** the generation/meta bump + GG02
53/// validation have run under the write lock on the session thread (so error
54/// timing is unchanged), and after the lock + allocator guards have been
55/// released. It contains the fully-built next snapshot plus everything the
56/// committer needs to run the durable+publish tail — **no guards, no graph
57/// reference, no borrow**. The committer never re-validates, re-allocates ids,
58/// or re-applies a change list; it just stamps the HLC, appends to the WAL,
59/// publishes the frozen snapshot, and bumps the schema epoch.
60///
61/// The HLC timestamp is deliberately **not** stamped here: the committer stamps
62/// it per bundle in **seal-sequence** drain order so HLC is monotonic in commit
63/// order (== publish order == seal order). Stamping it on the session thread
64/// would break that monotonicity once seal-order and stamp-order diverge.
65///
66/// # Why a `seal_seq` (publish-order correctness, P0 fix)
67///
68/// `seal()` consumes the [`WriteTxn`], so the write lock + allocator guards drop
69/// as it returns — **before** the caller enqueues the bundle. Two sessions can
70/// therefore seal in lock order (A then B) yet `send()` in the opposite order
71/// (B before A) if A is preempted between lock-release and send. A naive FIFO
72/// committer would then publish B's gen-`N+1` snapshot before A's gen-`N`
73/// snapshot, regressing the published snapshot and losing A's older view under
74/// B — a D10 serializability violation. To prevent it, `seal()` stamps a
75/// strictly-monotonic `seal_seq` **while still holding the write lock** (so
76/// seal-seq order == lock-acquisition order == the intended total order), and
77/// the committer publishes strictly in `seal_seq` order via a reorder buffer,
78/// regardless of channel arrival order. Compaction takes a `seal_seq` from the
79/// same counter under the same lock, so a compact can never be reordered ahead
80/// of an earlier-sealed commit.
81pub(crate) struct SealedCommit {
82    /// Strictly-monotonic publish-order key, allocated under the write lock in
83    /// [`WriteTxn::seal`]. The committer publishes in ascending `seal_seq`.
84    pub(crate) seal_seq: u64,
85    /// Fully-built next snapshot, frozen under the session's write lock.
86    pub(crate) next_snapshot: Arc<SeleneGraph>,
87    /// Persisted change list (the WAL/changeset payload).
88    pub(crate) changes: Vec<Change>,
89    /// Truncate-expanded fan-out view, built on the session thread, or `None`
90    /// when no truncate/reset expansion is staged (the common path).
91    pub(crate) fanout_changes: Option<Vec<Change>>,
92    /// Opaque caller-supplied principal bytes for the WAL entry header (D12).
93    pub(crate) principal: Option<Arc<[u8]>>,
94    /// Whether the change list bumps the schema epoch.
95    pub(crate) schema_changed: bool,
96    /// Already-bumped graph generation.
97    pub(crate) generation: u64,
98    /// Next node id after this commit (peeked under the lock).
99    pub(crate) next_node_id: u64,
100    /// Next edge id after this commit (peeked under the lock).
101    pub(crate) next_edge_id: u64,
102    /// Non-fatal validation warnings collected during seal.
103    pub(crate) warnings: Vec<CommitWarning>,
104}
105
106/// Compile-time proof that [`SealedCommit`] is `Send + 'static`, so it can be
107/// moved to the committer thread. A guard or borrow leaking into the struct
108/// would fail this assertion at compile time.
109const _: fn() = || {
110    fn assert_send_static<T: Send + 'static>() {}
111    assert_send_static::<SealedCommit>();
112};
113
114/// RAII owner of the single graph write lock.
115///
116/// Since v1.2 (BRIEF 1) the transaction no longer holds the snapshot cell,
117/// schema-version, or provider handles — those moved to the single committer
118/// thread, which the transaction reaches via the cheap `Committer` submit
119/// handle. The transaction still owns the write lock + allocator guards for the
120/// duration of execution and releases them when `seal` consumes it.
121pub struct WriteTxn<'g> {
122    pub(crate) guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
123    pub(crate) committer: Committer,
124    pub(crate) pre_txn: Option<Arc<SeleneGraph>>,
125    pub(crate) allocator: MutexGuard<'g, IdAllocator>,
126    /// Index-provider registry, retained so `Mutator::index_provider_by_tag`
127    /// can resolve a provider during execution. Shares the one frozen
128    /// registry allocation with `SharedGraph` and the committer — handing it
129    /// to a transaction is a refcount bump, not a `Vec` clone.
130    pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
131    pub(crate) changes: Vec<Change>,
132    /// Per-truncate per-row tombstone expansions, keyed by the index of the
133    /// declarative truncate change in [`Self::changes`] that produced them.
134    ///
135    /// BRIEF-150 / deletion-reclamation audit Item 11. The WAL/changeset carries
136    /// only the O(1) declarative `NodesOfTypeTruncated`/`EdgesOfTypeTruncated`
137    /// change, but live index-provider fan-out must observe the same per-row
138    /// `NodeDeleted`/`EdgeDeleted` multiset a `MATCH (n:L) DETACH DELETE n` would
139    /// emit (so derived state is reclaimed without leaks). The mutator
140    /// snapshots the matched ids while it still holds the store and stages their
141    /// tombstones here; commit substitutes each truncate change with its staged
142    /// expansion before fan-out.
143    pub(crate) truncate_expansions: Vec<(usize, Vec<Change>)>,
144    pub(crate) warnings: Vec<CommitWarning>,
145}
146
147impl<'g> WriteTxn<'g> {
148    pub(crate) fn new(
149        guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
150        committer: Committer,
151        allocator: MutexGuard<'g, IdAllocator>,
152        providers: Arc<[Arc<dyn IndexProvider>]>,
153    ) -> Self {
154        let pre_txn = Some(Arc::clone(&*guard));
155        Self {
156            guard,
157            committer,
158            pre_txn,
159            allocator,
160            providers,
161            changes: Vec::new(),
162            truncate_expansions: Vec::new(),
163            warnings: Vec::new(),
164        }
165    }
166
167    /// Borrow a mutator tied to this transaction.
168    #[must_use]
169    pub fn mutator(&mut self) -> Mutator<'_, 'g> {
170        Mutator::new(self)
171    }
172
173    /// Borrow the transaction-local working graph.
174    #[must_use]
175    pub fn read(&self) -> &SeleneGraph {
176        self.guard.as_ref()
177    }
178
179    pub(crate) fn guard_mut(&mut self) -> &mut SeleneGraph {
180        Arc::make_mut(&mut *self.guard)
181    }
182
183    /// Commit without caller principal bytes.
184    pub fn commit(self) -> GraphResult<CommitOutcome> {
185        self.commit_with_principal(None)
186    }
187
188    /// Commit with optional caller-owned principal bytes for D12 audit replay.
189    ///
190    /// Since v1.2 (BRIEF 1) commit is **seal-and-handover**: this method runs
191    /// `seal` on the calling thread (generation/meta bump + GG02
192    /// validation under the write lock, then **lock release**), then submits the
193    /// resulting `SealedCommit` to the per-graph single committer thread and
194    /// blocks until it is durable + visible. The public contract is unchanged —
195    /// "`commit()` returns ⇒ durable + visible" — only the internal threading
196    /// model differs.
197    ///
198    /// GG02 closed-graph violations still abort here, on the calling thread,
199    /// before any handoff, so error timing is identical to v1.0/v1.1.
200    ///
201    /// Registered index providers are notified by the committer after the new
202    /// snapshot is published; the same-thread re-entrancy guard now protects the
203    /// committer thread (one committer ⇒ still sound). Same-thread re-entrant
204    /// provider calls into `SharedGraph::begin_write()` are detected via the
205    /// thread-local fanout counter and panic with a clear message; the
206    /// committer's fan-out boundary catches those panics (along with
207    /// callback-internal panics and returned errors) so a single misbehaving
208    /// provider can never crash the committer thread.
209    ///
210    /// # Errors
211    ///
212    /// Returns the GG02 / validation error from `seal`, or a
213    /// [`GraphError::Durable`] if the WAL append failed or the committer thread
214    /// is no longer running.
215    #[tracing::instrument(
216        name = "selene.graph.commit",
217        skip(self, principal),
218        fields(change_count = self.change_count())
219    )]
220    pub fn commit_with_principal(self, principal: Option<Arc<[u8]>>) -> GraphResult<CommitOutcome> {
221        // Clone the submit handle BEFORE sealing — `seal()` consumes `self`
222        // (dropping the write lock + allocator guards as it returns), so the
223        // session releases the lock strictly before it enqueues and blocks on
224        // `recv()`. The committer never takes the write lock (compaction builds
225        // on the caller thread too, since v1.2 BRIEF 1 P0 fix), so a session is
226        // never simultaneously lock-holding and recv-blocked.
227        let committer = self.committer.clone();
228        let sealed = self.seal(principal, None)?;
229        committer.submit_commit(sealed)
230    }
231
232    /// Run the under-lock half of commit and hand back an owned, `Send`
233    /// [`SealedCommit`] for the committer thread (v1.2 multi-writer, BRIEF 1).
234    ///
235    /// Runs, on the calling thread under the held write lock: schema-change
236    /// detection, the generation/meta bump (`generation += 1` + write the next
237    /// ids into `GraphMeta`), and GG02 closed-graph validation — which **still
238    /// aborts synchronously here** (the `?` propagates and `Drop` rolls back the
239    /// generation bump, exactly as in v1.0/v1.1). It then samples the optional
240    /// BRIEF-117 cancellation token **while the lock is still held and before
241    /// `Drop` is disarmed** (see below), allocates the strictly-monotonic
242    /// `seal_seq` under the lock, disarms `Drop` (`pre_txn = None`), clones the
243    /// now-frozen next snapshot, `mem::take`s the change list / truncate
244    /// expansions / warnings, builds the truncate-expanded fan-out view, and
245    /// returns. The write lock + allocator guards drop as this method returns
246    /// (it consumes `self`).
247    ///
248    /// # Cancellation cut-line (BRIEF-117, P0 fix)
249    ///
250    /// The cancellation token is sampled **here, under the write lock, before
251    /// disarming `Drop`** — not on the committer before the WAL append. In the
252    /// seal-and-handover model multiple commits can be sealed-but-unpublished at
253    /// once, each forking `*shared` off the previous; a commit's mutation is
254    /// therefore already woven into `*shared` (and possibly built upon by a
255    /// later seal) by the time the committer would run a pre-WAL check, so a
256    /// committer-side cancel could not surgically remove it without poisoning
257    /// the engine. Sampling under the lock means a cancelled commit is rolled
258    /// back by `Drop` exactly like a GG02 abort — `*shared` is restored, no
259    /// `seal_seq` is consumed, nothing is enqueued — so the cut-line's
260    /// guarantee ("no append, no publish, exactly as an aborted transaction
261    /// would leave it") is *literally* true. A cancel observed after `seal`
262    /// returns is too late: the commit is already in flight and irrevocable.
263    ///
264    /// The HLC timestamp is **not** stamped here (see [`SealedCommit`]); the
265    /// committer stamps it per bundle in seal-sequence drain order.
266    ///
267    /// # Errors
268    ///
269    /// Returns [`GraphError::Cancelled`] when `cancel` is set at entry (rolled
270    /// back via `Drop`), or the GG02 / closed-graph validation error
271    /// ([`GraphError::TypeViolation`]) when a change violates the bound type.
272    pub(crate) fn seal(
273        mut self,
274        principal: Option<Arc<[u8]>>,
275        cancel: Option<&AtomicBool>,
276    ) -> GraphResult<SealedCommit> {
277        debug_assert!(
278            self.pre_txn.is_some(),
279            "pre_txn must be present at seal entry"
280        );
281
282        let schema_changed = self
283            .changes
284            .iter()
285            .any(|change| matches!(change, Change::SchemaChanged { .. }));
286        let next_node_id = self.allocator.peek_next_node();
287        let next_edge_id = self.allocator.peek_next_edge();
288        {
289            let graph = self.guard_mut();
290            graph.meta.generation = graph
291                .meta
292                .generation
293                .checked_add(1)
294                .expect("graph generation exhausted");
295            graph.meta.next_node_id = next_node_id;
296            graph.meta.next_edge_id = next_edge_id;
297        }
298
299        let generation = self.read().meta.generation;
300
301        let mut validation_warnings = Vec::new();
302        if let Some(type_def) = self.read().meta.bound_type.as_deref() {
303            for change in &self.changes {
304                validation_warnings.extend(
305                    // NB: `?` here returns Err with the generation bump still
306                    // applied to the guard-Arc; `Drop` then restores `pre_txn`
307                    // and undoes the bump — error timing + rollback unchanged.
308                    crate::type_validator::validate_change(change, self.read(), type_def)?
309                        .into_iter()
310                        .map(|warning| CommitWarning { warning }),
311                );
312            }
313            if schema_changed {
314                validation_warnings.extend(
315                    crate::type_validator::validate_entity_state(self.read(), type_def)?
316                        .into_iter()
317                        .map(|warning| CommitWarning { warning }),
318                );
319            } else {
320                crate::type_validator::validate_unique_property_changes(
321                    &self.changes,
322                    self.read(),
323                    type_def,
324                )?;
325            }
326        }
327        for warning in validation_warnings {
328            if !self.warnings.contains(&warning) {
329                self.warnings.push(warning);
330            }
331        }
332
333        // BRIEF-117 cut-line: sample the cancellation token while the lock is
334        // still held and `Drop` is still armed. A cancel here returns Err with
335        // the generation bump + staged mutations still on the guard-Arc; `Drop`
336        // then restores `pre_txn`, rolling everything back exactly as a GG02
337        // abort or an aborted transaction would. Nothing is enqueued or
338        // published, and no `seal_seq` is consumed.
339        if let Some(flag) = cancel
340            && flag.load(Ordering::Acquire)
341        {
342            return Err(GraphError::Cancelled);
343        }
344
345        // Allocate the publish-order key under the lock so seal-seq order equals
346        // lock-acquisition order (the intended total order). Done after every
347        // fallible step so an aborted seal consumes no sequence number, keeping
348        // the committer's reorder sequence gap-free.
349        let seal_seq = self.committer.next_seal_seq();
350
351        // Disarm Drop-rollback: from here the commit is handed to the committer
352        // and the in-place mutations become the published state.
353        self.pre_txn = None;
354        // Freeze the next snapshot under the lock. The committer publishes this
355        // exact Arc and never rebuilds it.
356        let next_snapshot = Arc::clone(&*self.guard);
357
358        let changes = std::mem::take(&mut self.changes);
359        let truncate_expansions = std::mem::take(&mut self.truncate_expansions);
360        let warnings = std::mem::take(&mut self.warnings);
361
362        // BRIEF-150 / audit Item 11: build the fan-out-only truncate-expanded
363        // view on the session thread so the committer holds a fully-owned
364        // bundle. `None` on the common (non-truncate) path → zero allocation.
365        let fanout_changes = expand_truncates_for_fanout(&changes, &truncate_expansions);
366
367        Ok(SealedCommit {
368            seal_seq,
369            next_snapshot,
370            changes,
371            fanout_changes,
372            principal,
373            schema_changed,
374            generation,
375            next_node_id,
376            next_edge_id,
377            warnings,
378        })
379    }
380
381    /// Roll back graph changes via `Drop` and release the write lock.
382    pub fn rollback(self) {}
383
384    /// Number of changes accumulated since this transaction opened.
385    #[must_use]
386    pub fn change_count(&self) -> usize {
387        self.changes.len()
388    }
389
390    /// Whether this transaction has accumulated schema-changing work.
391    #[must_use]
392    pub fn has_schema_changes(&self) -> bool {
393        self.changes
394            .iter()
395            .any(|change| matches!(change, Change::SchemaChanged { .. }))
396    }
397}
398
399impl Drop for WriteTxn<'_> {
400    fn drop(&mut self) {
401        if let Some(prior) = self.pre_txn.take() {
402            *self.guard = prior;
403        }
404    }
405}
406
407fn commit_timestamp(durable_providers: &[Arc<dyn DurableProvider>]) -> HlcTimestamp {
408    durable_providers
409        .first()
410        .map_or_else(HlcTimestamp::zero, |provider| provider.next_timestamp())
411}
412
413/// A sealed commit whose durable bytes have been **appended** to every durable
414/// provider but **not yet fsynced or published** (v1.2 multi-writer, BRIEF 2).
415///
416/// This is the intermediate state between Stage 1 ([`append_sealed`]) and
417/// Stage 3 ([`publish_appended`]) of the group-commit pipeline. The committer
418/// forms a contiguous run of `AppendedCommit`s, runs ONE group flush
419/// ([`flush_durables`] — the R1 fsync-before-publish barrier) over the whole
420/// run, then publishes + acks each in `seal_seq` order. Because the flush is the
421/// single barrier for the whole batch, an appended-but-unflushed commit is
422/// **never** published or acked — it is only ever lost atomically with the rest
423/// of its run on a crash (durable-before-visible).
424///
425/// It owns the reply [`SyncSender`] (moved in by the committer when it pops the
426/// `Work::Commit`) so the Stage-4 ack is a single drain in `seal_seq` order with
427/// no parallel-`Vec` correlation between commits and their reply channels.
428///
429/// All post-append/pre-publish state carried here was frozen under the session's
430/// write lock in [`WriteTxn::seal`]; the committer never re-validates,
431/// re-allocates ids, or re-applies a change list.
432pub(crate) struct AppendedCommit {
433    /// Fully-built next snapshot, frozen under the session's write lock. The
434    /// committer stores this exact `Arc` in Stage 3 and never rebuilds it.
435    pub(crate) next_snapshot: Arc<SeleneGraph>,
436    /// Persisted change list, returned in the [`CommitOutcome`].
437    pub(crate) changes: Vec<Change>,
438    /// Truncate-expanded fan-out view, or `None` on the common (non-truncate)
439    /// path (then fan-out uses `changes` directly).
440    pub(crate) fanout_changes: Option<Vec<Change>>,
441    /// Opaque caller-supplied principal bytes (D12), returned in the outcome.
442    pub(crate) principal: Option<Arc<[u8]>>,
443    /// Whether the change list bumps the schema epoch (store-before-schema-bump).
444    pub(crate) schema_changed: bool,
445    /// Already-bumped graph generation.
446    pub(crate) generation: u64,
447    /// Next node id after this commit.
448    pub(crate) next_node_id: u64,
449    /// Next edge id after this commit.
450    pub(crate) next_edge_id: u64,
451    /// Non-fatal validation warnings.
452    pub(crate) warnings: Vec<CommitWarning>,
453    /// Highest durable sequence assigned across the durable providers during
454    /// [`append_sealed`]. Observable only after the group flush + publish.
455    pub(crate) durable_at: Option<u64>,
456    /// The reply channel for this commit's waiter, set by the committer when it
457    /// pops the `Work::Commit`. Drained (acked) in Stage 4, in `seal_seq` order.
458    pub(crate) reply: Option<SyncSender<GraphResult<CommitOutcome>>>,
459    /// `Instant` captured at append time so commit-duration metrics span the
460    /// full durable+publish tail (recorded in [`publish_appended`]).
461    pub(crate) started: Instant,
462}
463
464/// Compile-time proof that [`AppendedCommit`] is `Send + 'static`, so the
465/// committer can hold a batch of them across the group flush.
466const _: fn() = || {
467    fn assert_send_static<T: Send + 'static>() {}
468    assert_send_static::<AppendedCommit>();
469};
470
471/// Stage 1 — **append** a sealed commit's durable bytes to every durable
472/// provider, with fsync **deferred** (v1.2 multi-writer, BRIEF 2). Performs no
473/// store, no schema bump, and no fan-out: the snapshot is not yet visible.
474///
475/// This is the first third of the pre-BRIEF-2 `publish_sealed` body, split at
476/// the append/store seam so the committer can append a whole contiguous run
477/// before fsyncing it once (group commit). Ordering within Stage 1 is verbatim:
478/// 1. Debug-only index-consistency assertion on the frozen `next_snapshot`,
479///    BEFORE any durable append. A detected violation aborts with nothing
480///    durable and nothing visible (it poisons via the committer's
481///    `catch_unwind`, but no WAL entry was written and the published cell never
482///    advanced, so a reopen is clean). Asserting after the store (pre-fix) would
483///    return `Err` for a commit that *did* persist — a P2 inversion.
484/// 2. Stamp the HLC in committer seal-sequence order so HLC is monotonic in
485///    commit order (== publish order).
486/// 3. WAL-first: `write_commit` for each durable provider. Under the BRIEF-2
487///    `OnFlushOnly` policy this append does **not** fsync — the committer's
488///    later [`flush_durables`] is the single fsync for the whole run. The
489///    returned per-provider sequence is folded into `durable_at`.
490///
491/// The split makes durability **fsync-gated, not append-gated**: an appended
492/// commit is durable only after the group [`flush_durables`] returns `Ok`. The
493/// committer holds the append's bytes-but-no-fsync state in the returned
494/// [`AppendedCommit`] and publishes/acks strictly after the barrier.
495///
496/// # Failure ⇒ engine poison (handled by the committer)
497///
498/// A returned `Err` is a **post-seal** failure: `seal()` already wove this
499/// commit's mutation into `*shared` (and a later seal may have forked off it),
500/// so the live graph cannot be surgically rolled back. The committer poisons the
501/// engine, Errs this commit + every already-appended batch member (whose
502/// appended-but-unflushed bytes are correct to lose on reopen), and drains the
503/// buffer. The durable WAL never fsynced any of them, so a reopen heals.
504///
505/// # Errors
506///
507/// Returns [`GraphError::Durable`] if a durable provider's `write_commit`
508/// failed.
509pub(crate) fn append_sealed(
510    sealed: SealedCommit,
511    durable_providers: &[Arc<dyn DurableProvider>],
512) -> GraphResult<AppendedCommit> {
513    let started = Instant::now();
514    let SealedCommit {
515        seal_seq: _,
516        next_snapshot,
517        changes,
518        fanout_changes,
519        principal,
520        schema_changed,
521        generation,
522        next_node_id,
523        next_edge_id,
524        warnings,
525    } = sealed;
526
527    // (1) Debug-only structural net on the frozen snapshot, BEFORE any durable
528    // append. The snapshot is immutable from seal time, so this is just as sound
529    // as asserting after the store — but a detected violation now aborts with
530    // nothing durable and nothing visible (no `Err`-but-committed inversion). A
531    // pure read of the immutable snapshot — never re-enters begin_write.
532    // Compiled out in release builds.
533    #[cfg(debug_assertions)]
534    if let Err(reason) = next_snapshot.assert_indexes_consistent() {
535        panic!("selene-graph: pre-publish index consistency violation: {reason}");
536    }
537
538    // (2) Stamp the HLC in committer seal-sequence order (monotonic in commit
539    // order).
540    let timestamp = commit_timestamp(durable_providers);
541
542    // (3) WAL-first append. Under OnFlushOnly (BRIEF 2) this does NOT fsync —
543    // the committer's group flush is the single barrier. A returned error
544    // poisons the committer (the session-thread seal already mutated `*shared`
545    // and cannot be rolled back).
546    let mut durable_at: Option<u64> = None;
547    for durable in durable_providers {
548        let seq = durable
549            .write_commit(principal.as_ref(), &changes, timestamp)
550            .map_err(|error| GraphError::Durable {
551                reason: format!("{}: {error}", durable.provider_tag()),
552            })?;
553        durable_at = Some(durable_at.map_or(seq, |highest| highest.max(seq)));
554    }
555
556    Ok(AppendedCommit {
557        next_snapshot,
558        changes,
559        fanout_changes,
560        principal,
561        schema_changed,
562        generation,
563        next_node_id,
564        next_edge_id,
565        warnings,
566        durable_at,
567        reply: None,
568        started,
569    })
570}
571
572/// Stage 2 — **flush** every durable provider, fsyncing the whole contiguous run
573/// of appended commits in one barrier (the R1 fsync-before-publish barrier,
574/// v1.2 multi-writer, BRIEF 2).
575///
576/// Called by the committer exactly once per drained run of [`append_sealed`]s,
577/// strictly **before** any of the run's commits are published or acked. After it
578/// returns `Ok`, every byte appended in the run is durable, so publishing the
579/// snapshots (Stage 3) and delivering `durable_at` (Stage 4) cannot expose a
580/// not-yet-durable commit (durable-before-visible). When `commit_batching=Off`
581/// the run length is capped at 1, so this is exactly one fsync per commit at the
582/// same order point as `EveryN(1)`'s append-time fsync — behaviorally identical
583/// to BRIEF 1.
584///
585/// # Failure ⇒ engine poison (handled by the committer)
586///
587/// A flush error means the run's appended bytes may not be durable. The
588/// committer publishes **nothing** from the run, poisons the engine, and Errs
589/// every member — reopen lets WAL recovery decide truth, and no caller was told
590/// "durable."
591///
592/// # Errors
593///
594/// Returns [`GraphError::Durable`] if a durable provider's `flush` failed.
595pub(crate) fn flush_durables(durable_providers: &[Arc<dyn DurableProvider>]) -> GraphResult<()> {
596    for durable in durable_providers {
597        durable.flush().map_err(|error| GraphError::Durable {
598            reason: format!("{}: {error}", durable.provider_tag()),
599        })?;
600    }
601    Ok(())
602}
603
604/// Stage 3+4 — make an appended (and now group-flushed) commit **visible** and
605/// build its [`CommitOutcome`] (v1.2 multi-writer, BRIEF 2). **Infallible.**
606///
607/// Called by the committer only after [`flush_durables`] returned `Ok` for the
608/// whole run, in `seal_seq` order. Infallibility is load-bearing: by the time we
609/// store the snapshot the commit is already durable, so there is no honest way
610/// to return `Err` here — returning `Result` would reintroduce the P2
611/// "returns-Err-but-actually-published" inversion. The committer still wraps the
612/// call in `catch_unwind`, so a `store`/debug-assert PANIC still poisons.
613///
614/// Ordering (verbatim from the pre-BRIEF-2 `publish_sealed` tail):
615/// 1. Publish: `snapshot.store(next_snapshot)` (the ArcSwap linearization point;
616///    the committer is its sole writer).
617/// 2. store-before-schema-bump (PR #127 P1): bump `schema_version` strictly
618///    after the store, so a reader seeing the new epoch also sees the new
619///    snapshot.
620/// 3. No-op provider fan-out under the [`crate::reentry::FanoutGuard`] (now
621///    committer-thread-local — sound with one committer).
622/// 4. Metrics + build the [`CommitOutcome`] (carrying the `durable_at` computed
623///    in [`append_sealed`]).
624pub(crate) fn publish_appended(
625    appended: AppendedCommit,
626    snapshot: &ArcSwap<SeleneGraph>,
627    schema_version: &AtomicU64,
628    providers: &[Arc<dyn IndexProvider>],
629) -> CommitOutcome {
630    let AppendedCommit {
631        next_snapshot,
632        changes,
633        fanout_changes,
634        principal,
635        schema_changed,
636        generation,
637        next_node_id,
638        next_edge_id,
639        warnings,
640        durable_at,
641        reply: _,
642        started,
643    } = appended;
644
645    // Test-only injection seam for the Stage-3 publish-panic path (BRIEF 2 crash
646    // matrix item 6). In production this compiles to nothing. It panics BEFORE the
647    // store so the panicking member never publishes — matching the real
648    // store/debug-assert panic this branch defends against — letting a test drive
649    // the "member i of a multi-member batch panics ⇒ i acked-and-visible,
650    // panicking i+1 + remaining i+2.. Err'd, poisoned, drained" path that
651    // `notify_providers`' panic-swallowing makes otherwise unreachable.
652    #[cfg(test)]
653    publish_panic_inject::maybe_panic();
654
655    // (1) Publish — the sole ArcSwap writer.
656    snapshot.store(Arc::clone(&next_snapshot));
657    // (2) Publish the schema-version bump AFTER snapshot.store so any reader
658    // observing the new epoch is guaranteed to also observe the new snapshot
659    // (Codex PR #127 auto-review P1). Reverse ordering would let a reader read
660    // `epoch=N` then load the prior snapshot, planning against stale schema.
661    if schema_changed {
662        schema_version.fetch_add(1, Ordering::AcqRel);
663    }
664
665    // (3) No-op provider fan-out. The FanoutGuard's thread-local counter now
666    // guards the COMMITTER thread: a provider that re-enters begin_write on the
667    // committer thread panics before locking, and the boundary below catches
668    // it. Safe with exactly one committer (v1.2 design §7.7).
669    let fanout: &[Change] = fanout_changes.as_deref().unwrap_or(&changes);
670    {
671        let _fanout_guard = crate::reentry::FanoutGuard::enter();
672        crate::provider_fanout::notify_providers(providers, generation, fanout);
673    }
674
675    // (4) Metrics + outcome.
676    metrics::counter_inc(metrics::COMMITS_TOTAL);
677    metrics::histogram_record(
678        metrics::COMMIT_DURATION_SECONDS,
679        started.elapsed().as_secs_f64(),
680    );
681    metrics::gauge_set(metrics::GRAPH_NODES, next_snapshot.node_count() as f64);
682    metrics::gauge_set(metrics::GRAPH_EDGES, next_snapshot.edge_count() as f64);
683
684    CommitOutcome {
685        generation,
686        changes,
687        principal,
688        durable_at,
689        next_node_id,
690        next_edge_id,
691        warnings,
692    }
693}
694
695/// Build a fan-out-only change list that substitutes each declarative truncate
696/// change with the per-row tombstones the mutator staged for it.
697///
698/// Returns `None` when no truncate expansions are staged, so the common
699/// (non-truncate) commit path fans out the persisted `changes` slice directly
700/// with zero allocation. When expansions are present, every truncate change at
701/// a staged index is replaced by its expansion (in order), and a truncate
702/// change with no staged expansion (an empty-label no-op) is simply dropped
703/// from fan-out — index providers see no tombstones because no rows were removed.
704fn expand_truncates_for_fanout(
705    changes: &[Change],
706    expansions: &[(usize, Vec<Change>)],
707) -> Option<Vec<Change>> {
708    if expansions.is_empty() {
709        return None;
710    }
711    let mut view = Vec::with_capacity(changes.len());
712    for (index, change) in changes.iter().enumerate() {
713        match change {
714            // BRIEF-152: GraphReset is fanned out as its staged per-row
715            // tombstones too, alongside the BRIEF-150 truncate variants, so
716            // live index providers reclaim derived state for every wiped
717            // node/edge without seeing the bare declarative reset on the commit
718            // path. WAL recovery still replays the persisted declarative change.
719            Change::NodesOfTypeTruncated { .. }
720            | Change::EdgesOfTypeTruncated { .. }
721            | Change::GraphReset { .. } => {
722                if let Some((_, expansion)) = expansions.iter().find(|(staged, _)| *staged == index)
723                {
724                    view.extend(expansion.iter().cloned());
725                }
726                // A truncate/reset change with no staged expansion removed zero
727                // rows (empty/absent label, or a reset of an empty graph); it
728                // contributes nothing to fan-out.
729            }
730            other => view.push(other.clone()),
731        }
732    }
733    Some(view)
734}
735
736/// Test-only seam to drive a panic from inside [`publish_appended`] (Stage 3) on
737/// a chosen publish ordinal, so the committer's multi-member publish-panic
738/// poison-and-drain branch can be exercised deterministically.
739///
740/// This is the *only* way to reach that branch from a test: a misbehaving
741/// [`IndexProvider`]'s `on_change` panic is swallowed by
742/// [`crate::provider_fanout::notify_providers`]'s per-callback `catch_unwind`,
743/// and `snapshot.store` does not panic, so without this hook the Stage-3 panic
744/// path is unreachable through the public API. The counter is keyed on the
745/// committer thread (the sole `publish_appended` caller), so "panic on the Nth
746/// publish of this run" is deterministic. Compiled only under `cfg(test)`;
747/// production builds have no injection point and no overhead.
748#[cfg(test)]
749pub(crate) mod publish_panic_inject {
750    use std::cell::Cell;
751
752    thread_local! {
753        /// `Some(remaining)` arms the injection: each [`maybe_panic`] call
754        /// decrements it, panicking when it reaches zero. `None` (the default) is
755        /// disarmed. Thread-local so only the committer thread that ran
756        /// [`arm`] is affected, and it auto-clears after firing.
757        static COUNTDOWN: Cell<Option<u32>> = const { Cell::new(None) };
758    }
759
760    /// Arm the injection so the `after`-th subsequent [`maybe_panic`] panics
761    /// (`after = 1` ⇒ the next publish panics; `after = 2` ⇒ the second). Must be
762    /// called on the committer thread (e.g. from inside a provider `on_change`
763    /// during an earlier publish in the same run).
764    pub(crate) fn arm(after: u32) {
765        COUNTDOWN.with(|cell| cell.set(Some(after)));
766    }
767
768    /// Panic if armed and the countdown has elapsed; otherwise a no-op. Clears the
769    /// arming when it fires so exactly one panic is injected per [`arm`].
770    pub(crate) fn maybe_panic() {
771        COUNTDOWN.with(|cell| {
772            if let Some(remaining) = cell.get() {
773                let next = remaining.saturating_sub(1);
774                if next == 0 {
775                    cell.set(None);
776                    panic!("selene-graph test: injected Stage-3 publish_appended panic");
777                }
778                cell.set(Some(next));
779            }
780        });
781    }
782}
783
784#[cfg(test)]
785mod tests;