selene_graph/write_txn.rs
1//! Write transaction RAII handle per spec 03 sections 4 and 6.
2
3use std::sync::mpsc::SyncSender;
4use std::sync::{
5 Arc,
6 atomic::{AtomicBool, AtomicU64, Ordering},
7};
8use std::time::Instant;
9
10use arc_swap::ArcSwap;
11use parking_lot::{MutexGuard, RwLockWriteGuard};
12use selene_core::{Change, HlcTimestamp, metrics};
13
14use crate::committer::Committer;
15use crate::durable_provider::DurableProvider;
16use crate::error::{GraphError, GraphResult};
17use crate::graph::SeleneGraph;
18use crate::id_allocator::IdAllocator;
19use crate::index_provider::IndexProvider;
20use crate::mutator::Mutator;
21use crate::type_validator::TypeWarning;
22
23/// Non-fatal graph commit warning.
24#[derive(Clone, Debug, Eq, PartialEq)]
25pub struct CommitWarning {
26 /// Closed-graph validation warning.
27 pub warning: TypeWarning,
28}
29
30/// Result metadata returned after a successful commit.
31#[derive(Clone, Debug, PartialEq)]
32pub struct CommitOutcome {
33 /// Published graph generation.
34 pub generation: u64,
35 /// Changes produced by the mutation funnel.
36 pub changes: Vec<Change>,
37 /// Opaque caller-supplied principal bytes for future WAL headers.
38 pub principal: Option<Arc<[u8]>>,
39 /// Highest durable sequence assigned by commit-critical providers.
40 pub durable_at: Option<u64>,
41 /// Next node ID after commit.
42 pub next_node_id: u64,
43 /// Next edge ID after commit.
44 pub next_edge_id: u64,
45 /// Non-fatal warnings produced during commit validation.
46 pub warnings: Vec<CommitWarning>,
47}
48
49/// A frozen, owned, `Send + 'static` commit bundle handed from a session thread
50/// to the single committer thread (v1.2 multi-writer, BRIEF 1).
51///
52/// Produced by [`WriteTxn::seal`] **after** the generation/meta bump + GG02
53/// validation have run under the write lock on the session thread (so error
54/// timing is unchanged), and after the lock + allocator guards have been
55/// released. It contains the fully-built next snapshot plus everything the
56/// committer needs to run the durable+publish tail — **no guards, no graph
57/// reference, no borrow**. The committer never re-validates, re-allocates ids,
58/// or re-applies a change list; it just stamps the HLC, appends to the WAL,
59/// publishes the frozen snapshot, and bumps the schema epoch.
60///
61/// The HLC timestamp is deliberately **not** stamped here: the committer stamps
62/// it per bundle in **seal-sequence** drain order so HLC is monotonic in commit
63/// order (== publish order == seal order). Stamping it on the session thread
64/// would break that monotonicity once seal-order and stamp-order diverge.
65///
66/// # Why a `seal_seq` (publish-order correctness, P0 fix)
67///
68/// `seal()` consumes the [`WriteTxn`], so the write lock + allocator guards drop
69/// as it returns — **before** the caller enqueues the bundle. Two sessions can
70/// therefore seal in lock order (A then B) yet `send()` in the opposite order
71/// (B before A) if A is preempted between lock-release and send. A naive FIFO
72/// committer would then publish B's gen-`N+1` snapshot before A's gen-`N`
73/// snapshot, regressing the published snapshot and losing A's older view under
74/// B — a D10 serializability violation. To prevent it, `seal()` stamps a
75/// strictly-monotonic `seal_seq` **while still holding the write lock** (so
76/// seal-seq order == lock-acquisition order == the intended total order), and
77/// the committer publishes strictly in `seal_seq` order via a reorder buffer,
78/// regardless of channel arrival order. Compaction takes a `seal_seq` from the
79/// same counter under the same lock, so a compact can never be reordered ahead
80/// of an earlier-sealed commit.
81pub(crate) struct SealedCommit {
82 /// Strictly-monotonic publish-order key, allocated under the write lock in
83 /// [`WriteTxn::seal`]. The committer publishes in ascending `seal_seq`.
84 pub(crate) seal_seq: u64,
85 /// Fully-built next snapshot, frozen under the session's write lock.
86 pub(crate) next_snapshot: Arc<SeleneGraph>,
87 /// Persisted change list (the WAL/changeset payload).
88 pub(crate) changes: Vec<Change>,
89 /// Truncate-expanded fan-out view, built on the session thread, or `None`
90 /// when no truncate/reset expansion is staged (the common path).
91 pub(crate) fanout_changes: Option<Vec<Change>>,
92 /// Opaque caller-supplied principal bytes for the WAL entry header (D12).
93 pub(crate) principal: Option<Arc<[u8]>>,
94 /// Whether the change list bumps the schema epoch.
95 pub(crate) schema_changed: bool,
96 /// Already-bumped graph generation.
97 pub(crate) generation: u64,
98 /// Next node id after this commit (peeked under the lock).
99 pub(crate) next_node_id: u64,
100 /// Next edge id after this commit (peeked under the lock).
101 pub(crate) next_edge_id: u64,
102 /// Non-fatal validation warnings collected during seal.
103 pub(crate) warnings: Vec<CommitWarning>,
104}
105
106/// Compile-time proof that [`SealedCommit`] is `Send + 'static`, so it can be
107/// moved to the committer thread. A guard or borrow leaking into the struct
108/// would fail this assertion at compile time.
109const _: fn() = || {
110 fn assert_send_static<T: Send + 'static>() {}
111 assert_send_static::<SealedCommit>();
112};
113
114/// RAII owner of the single graph write lock.
115///
116/// Since v1.2 (BRIEF 1) the transaction no longer holds the snapshot cell,
117/// schema-version, or provider handles — those moved to the single committer
118/// thread, which the transaction reaches via the cheap `Committer` submit
119/// handle. The transaction still owns the write lock + allocator guards for the
120/// duration of execution and releases them when `seal` consumes it.
121pub struct WriteTxn<'g> {
122 pub(crate) guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
123 pub(crate) committer: Committer,
124 pub(crate) pre_txn: Option<Arc<SeleneGraph>>,
125 pub(crate) allocator: MutexGuard<'g, IdAllocator>,
126 /// Index-provider registry, retained so `Mutator::index_provider_by_tag`
127 /// can resolve a provider during execution. Shares the one frozen
128 /// registry allocation with `SharedGraph` and the committer — handing it
129 /// to a transaction is a refcount bump, not a `Vec` clone.
130 pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
131 pub(crate) changes: Vec<Change>,
132 /// Per-truncate per-row tombstone expansions, keyed by the index of the
133 /// declarative truncate change in [`Self::changes`] that produced them.
134 ///
135 /// BRIEF-150 / deletion-reclamation audit Item 11. The WAL/changeset carries
136 /// only the O(1) declarative `NodesOfTypeTruncated`/`EdgesOfTypeTruncated`
137 /// change, but live index-provider fan-out must observe the same per-row
138 /// `NodeDeleted`/`EdgeDeleted` multiset a `MATCH (n:L) DETACH DELETE n` would
139 /// emit (so derived state is reclaimed without leaks). The mutator
140 /// snapshots the matched ids while it still holds the store and stages their
141 /// tombstones here; commit substitutes each truncate change with its staged
142 /// expansion before fan-out.
143 pub(crate) truncate_expansions: Vec<(usize, Vec<Change>)>,
144 pub(crate) warnings: Vec<CommitWarning>,
145}
146
147impl<'g> WriteTxn<'g> {
148 pub(crate) fn new(
149 guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
150 committer: Committer,
151 allocator: MutexGuard<'g, IdAllocator>,
152 providers: Arc<[Arc<dyn IndexProvider>]>,
153 ) -> Self {
154 let pre_txn = Some(Arc::clone(&*guard));
155 Self {
156 guard,
157 committer,
158 pre_txn,
159 allocator,
160 providers,
161 changes: Vec::new(),
162 truncate_expansions: Vec::new(),
163 warnings: Vec::new(),
164 }
165 }
166
167 /// Borrow a mutator tied to this transaction.
168 #[must_use]
169 pub fn mutator(&mut self) -> Mutator<'_, 'g> {
170 Mutator::new(self)
171 }
172
173 /// Borrow the transaction-local working graph.
174 #[must_use]
175 pub fn read(&self) -> &SeleneGraph {
176 self.guard.as_ref()
177 }
178
179 pub(crate) fn guard_mut(&mut self) -> &mut SeleneGraph {
180 Arc::make_mut(&mut *self.guard)
181 }
182
183 /// Commit without caller principal bytes.
184 pub fn commit(self) -> GraphResult<CommitOutcome> {
185 self.commit_with_principal(None)
186 }
187
188 /// Commit with optional caller-owned principal bytes for D12 audit replay.
189 ///
190 /// Since v1.2 (BRIEF 1) commit is **seal-and-handover**: this method runs
191 /// `seal` on the calling thread (generation/meta bump + GG02
192 /// validation under the write lock, then **lock release**), then submits the
193 /// resulting `SealedCommit` to the per-graph single committer thread and
194 /// blocks until it is durable + visible. The public contract is unchanged —
195 /// "`commit()` returns ⇒ durable + visible" — only the internal threading
196 /// model differs.
197 ///
198 /// GG02 closed-graph violations still abort here, on the calling thread,
199 /// before any handoff, so error timing is identical to v1.0/v1.1.
200 ///
201 /// Registered index providers are notified by the committer after the new
202 /// snapshot is published; the same-thread re-entrancy guard now protects the
203 /// committer thread (one committer ⇒ still sound). Same-thread re-entrant
204 /// provider calls into `SharedGraph::begin_write()` are detected via the
205 /// thread-local fanout counter and panic with a clear message; the
206 /// committer's fan-out boundary catches those panics (along with
207 /// callback-internal panics and returned errors) so a single misbehaving
208 /// provider can never crash the committer thread.
209 ///
210 /// # Errors
211 ///
212 /// Returns the GG02 / validation error from `seal`, or a
213 /// [`GraphError::Durable`] if the WAL append failed or the committer thread
214 /// is no longer running.
215 #[tracing::instrument(
216 name = "selene.graph.commit",
217 skip(self, principal),
218 fields(change_count = self.change_count())
219 )]
220 pub fn commit_with_principal(self, principal: Option<Arc<[u8]>>) -> GraphResult<CommitOutcome> {
221 // Clone the submit handle BEFORE sealing — `seal()` consumes `self`
222 // (dropping the write lock + allocator guards as it returns), so the
223 // session releases the lock strictly before it enqueues and blocks on
224 // `recv()`. The committer never takes the write lock (compaction builds
225 // on the caller thread too, since v1.2 BRIEF 1 P0 fix), so a session is
226 // never simultaneously lock-holding and recv-blocked.
227 let committer = self.committer.clone();
228 let sealed = self.seal(principal, None)?;
229 committer.submit_commit(sealed)
230 }
231
232 /// Run the under-lock half of commit and hand back an owned, `Send`
233 /// [`SealedCommit`] for the committer thread (v1.2 multi-writer, BRIEF 1).
234 ///
235 /// Runs, on the calling thread under the held write lock: schema-change
236 /// detection, the generation/meta bump (`generation += 1` + write the next
237 /// ids into `GraphMeta`), and GG02 closed-graph validation — which **still
238 /// aborts synchronously here** (the `?` propagates and `Drop` rolls back the
239 /// generation bump, exactly as in v1.0/v1.1). It then samples the optional
240 /// BRIEF-117 cancellation token **while the lock is still held and before
241 /// `Drop` is disarmed** (see below), allocates the strictly-monotonic
242 /// `seal_seq` under the lock, disarms `Drop` (`pre_txn = None`), clones the
243 /// now-frozen next snapshot, `mem::take`s the change list / truncate
244 /// expansions / warnings, builds the truncate-expanded fan-out view, and
245 /// returns. The write lock + allocator guards drop as this method returns
246 /// (it consumes `self`).
247 ///
248 /// # Cancellation cut-line (BRIEF-117, P0 fix)
249 ///
250 /// The cancellation token is sampled **here, under the write lock, before
251 /// disarming `Drop`** — not on the committer before the WAL append. In the
252 /// seal-and-handover model multiple commits can be sealed-but-unpublished at
253 /// once, each forking `*shared` off the previous; a commit's mutation is
254 /// therefore already woven into `*shared` (and possibly built upon by a
255 /// later seal) by the time the committer would run a pre-WAL check, so a
256 /// committer-side cancel could not surgically remove it without poisoning
257 /// the engine. Sampling under the lock means a cancelled commit is rolled
258 /// back by `Drop` exactly like a GG02 abort — `*shared` is restored, no
259 /// `seal_seq` is consumed, nothing is enqueued — so the cut-line's
260 /// guarantee ("no append, no publish, exactly as an aborted transaction
261 /// would leave it") is *literally* true. A cancel observed after `seal`
262 /// returns is too late: the commit is already in flight and irrevocable.
263 ///
264 /// The HLC timestamp is **not** stamped here (see [`SealedCommit`]); the
265 /// committer stamps it per bundle in seal-sequence drain order.
266 ///
267 /// # Errors
268 ///
269 /// Returns [`GraphError::Cancelled`] when `cancel` is set at entry (rolled
270 /// back via `Drop`), or the GG02 / closed-graph validation error
271 /// ([`GraphError::TypeViolation`]) when a change violates the bound type.
272 pub(crate) fn seal(
273 mut self,
274 principal: Option<Arc<[u8]>>,
275 cancel: Option<&AtomicBool>,
276 ) -> GraphResult<SealedCommit> {
277 debug_assert!(
278 self.pre_txn.is_some(),
279 "pre_txn must be present at seal entry"
280 );
281
282 let schema_changed = self
283 .changes
284 .iter()
285 .any(|change| matches!(change, Change::SchemaChanged { .. }));
286 let next_node_id = self.allocator.peek_next_node();
287 let next_edge_id = self.allocator.peek_next_edge();
288 {
289 let graph = self.guard_mut();
290 graph.meta.generation = graph
291 .meta
292 .generation
293 .checked_add(1)
294 .expect("graph generation exhausted");
295 graph.meta.next_node_id = next_node_id;
296 graph.meta.next_edge_id = next_edge_id;
297 }
298
299 let generation = self.read().meta.generation;
300
301 let mut validation_warnings = Vec::new();
302 if let Some(type_def) = self.read().meta.bound_type.as_deref() {
303 for change in &self.changes {
304 validation_warnings.extend(
305 // NB: `?` here returns Err with the generation bump still
306 // applied to the guard-Arc; `Drop` then restores `pre_txn`
307 // and undoes the bump — error timing + rollback unchanged.
308 crate::type_validator::validate_change(change, self.read(), type_def)?
309 .into_iter()
310 .map(|warning| CommitWarning { warning }),
311 );
312 }
313 if schema_changed {
314 validation_warnings.extend(
315 crate::type_validator::validate_entity_state(self.read(), type_def)?
316 .into_iter()
317 .map(|warning| CommitWarning { warning }),
318 );
319 } else {
320 crate::type_validator::validate_unique_property_changes(
321 &self.changes,
322 self.read(),
323 type_def,
324 )?;
325 }
326 }
327 for warning in validation_warnings {
328 if !self.warnings.contains(&warning) {
329 self.warnings.push(warning);
330 }
331 }
332
333 // BRIEF-117 cut-line: sample the cancellation token while the lock is
334 // still held and `Drop` is still armed. A cancel here returns Err with
335 // the generation bump + staged mutations still on the guard-Arc; `Drop`
336 // then restores `pre_txn`, rolling everything back exactly as a GG02
337 // abort or an aborted transaction would. Nothing is enqueued or
338 // published, and no `seal_seq` is consumed.
339 if let Some(flag) = cancel
340 && flag.load(Ordering::Acquire)
341 {
342 return Err(GraphError::Cancelled);
343 }
344
345 // Allocate the publish-order key under the lock so seal-seq order equals
346 // lock-acquisition order (the intended total order). Done after every
347 // fallible step so an aborted seal consumes no sequence number, keeping
348 // the committer's reorder sequence gap-free.
349 let seal_seq = self.committer.next_seal_seq();
350
351 // Disarm Drop-rollback: from here the commit is handed to the committer
352 // and the in-place mutations become the published state.
353 self.pre_txn = None;
354 // Freeze the next snapshot under the lock. The committer publishes this
355 // exact Arc and never rebuilds it.
356 let next_snapshot = Arc::clone(&*self.guard);
357
358 let changes = std::mem::take(&mut self.changes);
359 let truncate_expansions = std::mem::take(&mut self.truncate_expansions);
360 let warnings = std::mem::take(&mut self.warnings);
361
362 // BRIEF-150 / audit Item 11: build the fan-out-only truncate-expanded
363 // view on the session thread so the committer holds a fully-owned
364 // bundle. `None` on the common (non-truncate) path → zero allocation.
365 let fanout_changes = expand_truncates_for_fanout(&changes, &truncate_expansions);
366
367 Ok(SealedCommit {
368 seal_seq,
369 next_snapshot,
370 changes,
371 fanout_changes,
372 principal,
373 schema_changed,
374 generation,
375 next_node_id,
376 next_edge_id,
377 warnings,
378 })
379 }
380
381 /// Roll back graph changes via `Drop` and release the write lock.
382 pub fn rollback(self) {}
383
384 /// Number of changes accumulated since this transaction opened.
385 #[must_use]
386 pub fn change_count(&self) -> usize {
387 self.changes.len()
388 }
389
390 /// Whether this transaction has accumulated schema-changing work.
391 #[must_use]
392 pub fn has_schema_changes(&self) -> bool {
393 self.changes
394 .iter()
395 .any(|change| matches!(change, Change::SchemaChanged { .. }))
396 }
397}
398
399impl Drop for WriteTxn<'_> {
400 fn drop(&mut self) {
401 if let Some(prior) = self.pre_txn.take() {
402 *self.guard = prior;
403 }
404 }
405}
406
407fn commit_timestamp(durable_providers: &[Arc<dyn DurableProvider>]) -> HlcTimestamp {
408 durable_providers
409 .first()
410 .map_or_else(HlcTimestamp::zero, |provider| provider.next_timestamp())
411}
412
413/// A sealed commit whose durable bytes have been **appended** to every durable
414/// provider but **not yet fsynced or published** (v1.2 multi-writer, BRIEF 2).
415///
416/// This is the intermediate state between Stage 1 ([`append_sealed`]) and
417/// Stage 3 ([`publish_appended`]) of the group-commit pipeline. The committer
418/// forms a contiguous run of `AppendedCommit`s, runs ONE group flush
419/// ([`flush_durables`] — the R1 fsync-before-publish barrier) over the whole
420/// run, then publishes + acks each in `seal_seq` order. Because the flush is the
421/// single barrier for the whole batch, an appended-but-unflushed commit is
422/// **never** published or acked — it is only ever lost atomically with the rest
423/// of its run on a crash (durable-before-visible).
424///
425/// It owns the reply [`SyncSender`] (moved in by the committer when it pops the
426/// `Work::Commit`) so the Stage-4 ack is a single drain in `seal_seq` order with
427/// no parallel-`Vec` correlation between commits and their reply channels.
428///
429/// All post-append/pre-publish state carried here was frozen under the session's
430/// write lock in [`WriteTxn::seal`]; the committer never re-validates,
431/// re-allocates ids, or re-applies a change list.
432pub(crate) struct AppendedCommit {
433 /// Fully-built next snapshot, frozen under the session's write lock. The
434 /// committer stores this exact `Arc` in Stage 3 and never rebuilds it.
435 pub(crate) next_snapshot: Arc<SeleneGraph>,
436 /// Persisted change list, returned in the [`CommitOutcome`].
437 pub(crate) changes: Vec<Change>,
438 /// Truncate-expanded fan-out view, or `None` on the common (non-truncate)
439 /// path (then fan-out uses `changes` directly).
440 pub(crate) fanout_changes: Option<Vec<Change>>,
441 /// Opaque caller-supplied principal bytes (D12), returned in the outcome.
442 pub(crate) principal: Option<Arc<[u8]>>,
443 /// Whether the change list bumps the schema epoch (store-before-schema-bump).
444 pub(crate) schema_changed: bool,
445 /// Already-bumped graph generation.
446 pub(crate) generation: u64,
447 /// Next node id after this commit.
448 pub(crate) next_node_id: u64,
449 /// Next edge id after this commit.
450 pub(crate) next_edge_id: u64,
451 /// Non-fatal validation warnings.
452 pub(crate) warnings: Vec<CommitWarning>,
453 /// Highest durable sequence assigned across the durable providers during
454 /// [`append_sealed`]. Observable only after the group flush + publish.
455 pub(crate) durable_at: Option<u64>,
456 /// The reply channel for this commit's waiter, set by the committer when it
457 /// pops the `Work::Commit`. Drained (acked) in Stage 4, in `seal_seq` order.
458 pub(crate) reply: Option<SyncSender<GraphResult<CommitOutcome>>>,
459 /// `Instant` captured at append time so commit-duration metrics span the
460 /// full durable+publish tail (recorded in [`publish_appended`]).
461 pub(crate) started: Instant,
462}
463
464/// Compile-time proof that [`AppendedCommit`] is `Send + 'static`, so the
465/// committer can hold a batch of them across the group flush.
466const _: fn() = || {
467 fn assert_send_static<T: Send + 'static>() {}
468 assert_send_static::<AppendedCommit>();
469};
470
471/// Stage 1 — **append** a sealed commit's durable bytes to every durable
472/// provider, with fsync **deferred** (v1.2 multi-writer, BRIEF 2). Performs no
473/// store, no schema bump, and no fan-out: the snapshot is not yet visible.
474///
475/// This is the first third of the pre-BRIEF-2 `publish_sealed` body, split at
476/// the append/store seam so the committer can append a whole contiguous run
477/// before fsyncing it once (group commit). Ordering within Stage 1 is verbatim:
478/// 1. Debug-only index-consistency assertion on the frozen `next_snapshot`,
479/// BEFORE any durable append. A detected violation aborts with nothing
480/// durable and nothing visible (it poisons via the committer's
481/// `catch_unwind`, but no WAL entry was written and the published cell never
482/// advanced, so a reopen is clean). Asserting after the store (pre-fix) would
483/// return `Err` for a commit that *did* persist — a P2 inversion.
484/// 2. Stamp the HLC in committer seal-sequence order so HLC is monotonic in
485/// commit order (== publish order).
486/// 3. WAL-first: `write_commit` for each durable provider. Under the BRIEF-2
487/// `OnFlushOnly` policy this append does **not** fsync — the committer's
488/// later [`flush_durables`] is the single fsync for the whole run. The
489/// returned per-provider sequence is folded into `durable_at`.
490///
491/// The split makes durability **fsync-gated, not append-gated**: an appended
492/// commit is durable only after the group [`flush_durables`] returns `Ok`. The
493/// committer holds the append's bytes-but-no-fsync state in the returned
494/// [`AppendedCommit`] and publishes/acks strictly after the barrier.
495///
496/// # Failure ⇒ engine poison (handled by the committer)
497///
498/// A returned `Err` is a **post-seal** failure: `seal()` already wove this
499/// commit's mutation into `*shared` (and a later seal may have forked off it),
500/// so the live graph cannot be surgically rolled back. The committer poisons the
501/// engine, Errs this commit + every already-appended batch member (whose
502/// appended-but-unflushed bytes are correct to lose on reopen), and drains the
503/// buffer. The durable WAL never fsynced any of them, so a reopen heals.
504///
505/// # Errors
506///
507/// Returns [`GraphError::Durable`] if a durable provider's `write_commit`
508/// failed.
509pub(crate) fn append_sealed(
510 sealed: SealedCommit,
511 durable_providers: &[Arc<dyn DurableProvider>],
512) -> GraphResult<AppendedCommit> {
513 let started = Instant::now();
514 let SealedCommit {
515 seal_seq: _,
516 next_snapshot,
517 changes,
518 fanout_changes,
519 principal,
520 schema_changed,
521 generation,
522 next_node_id,
523 next_edge_id,
524 warnings,
525 } = sealed;
526
527 // (1) Debug-only structural net on the frozen snapshot, BEFORE any durable
528 // append. The snapshot is immutable from seal time, so this is just as sound
529 // as asserting after the store — but a detected violation now aborts with
530 // nothing durable and nothing visible (no `Err`-but-committed inversion). A
531 // pure read of the immutable snapshot — never re-enters begin_write.
532 // Compiled out in release builds.
533 #[cfg(debug_assertions)]
534 if let Err(reason) = next_snapshot.assert_indexes_consistent() {
535 panic!("selene-graph: pre-publish index consistency violation: {reason}");
536 }
537
538 // (2) Stamp the HLC in committer seal-sequence order (monotonic in commit
539 // order).
540 let timestamp = commit_timestamp(durable_providers);
541
542 // (3) WAL-first append. Under OnFlushOnly (BRIEF 2) this does NOT fsync —
543 // the committer's group flush is the single barrier. A returned error
544 // poisons the committer (the session-thread seal already mutated `*shared`
545 // and cannot be rolled back).
546 let mut durable_at: Option<u64> = None;
547 for durable in durable_providers {
548 let seq = durable
549 .write_commit(principal.as_ref(), &changes, timestamp)
550 .map_err(|error| GraphError::Durable {
551 reason: format!("{}: {error}", durable.provider_tag()),
552 })?;
553 durable_at = Some(durable_at.map_or(seq, |highest| highest.max(seq)));
554 }
555
556 Ok(AppendedCommit {
557 next_snapshot,
558 changes,
559 fanout_changes,
560 principal,
561 schema_changed,
562 generation,
563 next_node_id,
564 next_edge_id,
565 warnings,
566 durable_at,
567 reply: None,
568 started,
569 })
570}
571
572/// Stage 2 — **flush** every durable provider, fsyncing the whole contiguous run
573/// of appended commits in one barrier (the R1 fsync-before-publish barrier,
574/// v1.2 multi-writer, BRIEF 2).
575///
576/// Called by the committer exactly once per drained run of [`append_sealed`]s,
577/// strictly **before** any of the run's commits are published or acked. After it
578/// returns `Ok`, every byte appended in the run is durable, so publishing the
579/// snapshots (Stage 3) and delivering `durable_at` (Stage 4) cannot expose a
580/// not-yet-durable commit (durable-before-visible). When `commit_batching=Off`
581/// the run length is capped at 1, so this is exactly one fsync per commit at the
582/// same order point as `EveryN(1)`'s append-time fsync — behaviorally identical
583/// to BRIEF 1.
584///
585/// # Failure ⇒ engine poison (handled by the committer)
586///
587/// A flush error means the run's appended bytes may not be durable. The
588/// committer publishes **nothing** from the run, poisons the engine, and Errs
589/// every member — reopen lets WAL recovery decide truth, and no caller was told
590/// "durable."
591///
592/// # Errors
593///
594/// Returns [`GraphError::Durable`] if a durable provider's `flush` failed.
595pub(crate) fn flush_durables(durable_providers: &[Arc<dyn DurableProvider>]) -> GraphResult<()> {
596 for durable in durable_providers {
597 durable.flush().map_err(|error| GraphError::Durable {
598 reason: format!("{}: {error}", durable.provider_tag()),
599 })?;
600 }
601 Ok(())
602}
603
604/// Stage 3+4 — make an appended (and now group-flushed) commit **visible** and
605/// build its [`CommitOutcome`] (v1.2 multi-writer, BRIEF 2). **Infallible.**
606///
607/// Called by the committer only after [`flush_durables`] returned `Ok` for the
608/// whole run, in `seal_seq` order. Infallibility is load-bearing: by the time we
609/// store the snapshot the commit is already durable, so there is no honest way
610/// to return `Err` here — returning `Result` would reintroduce the P2
611/// "returns-Err-but-actually-published" inversion. The committer still wraps the
612/// call in `catch_unwind`, so a `store`/debug-assert PANIC still poisons.
613///
614/// Ordering (verbatim from the pre-BRIEF-2 `publish_sealed` tail):
615/// 1. Publish: `snapshot.store(next_snapshot)` (the ArcSwap linearization point;
616/// the committer is its sole writer).
617/// 2. store-before-schema-bump (PR #127 P1): bump `schema_version` strictly
618/// after the store, so a reader seeing the new epoch also sees the new
619/// snapshot.
620/// 3. No-op provider fan-out under the [`crate::reentry::FanoutGuard`] (now
621/// committer-thread-local — sound with one committer).
622/// 4. Metrics + build the [`CommitOutcome`] (carrying the `durable_at` computed
623/// in [`append_sealed`]).
624pub(crate) fn publish_appended(
625 appended: AppendedCommit,
626 snapshot: &ArcSwap<SeleneGraph>,
627 schema_version: &AtomicU64,
628 providers: &[Arc<dyn IndexProvider>],
629) -> CommitOutcome {
630 let AppendedCommit {
631 next_snapshot,
632 changes,
633 fanout_changes,
634 principal,
635 schema_changed,
636 generation,
637 next_node_id,
638 next_edge_id,
639 warnings,
640 durable_at,
641 reply: _,
642 started,
643 } = appended;
644
645 // Test-only injection seam for the Stage-3 publish-panic path (BRIEF 2 crash
646 // matrix item 6). In production this compiles to nothing. It panics BEFORE the
647 // store so the panicking member never publishes — matching the real
648 // store/debug-assert panic this branch defends against — letting a test drive
649 // the "member i of a multi-member batch panics ⇒ i acked-and-visible,
650 // panicking i+1 + remaining i+2.. Err'd, poisoned, drained" path that
651 // `notify_providers`' panic-swallowing makes otherwise unreachable.
652 #[cfg(test)]
653 publish_panic_inject::maybe_panic();
654
655 // (1) Publish — the sole ArcSwap writer.
656 snapshot.store(Arc::clone(&next_snapshot));
657 // (2) Publish the schema-version bump AFTER snapshot.store so any reader
658 // observing the new epoch is guaranteed to also observe the new snapshot
659 // (Codex PR #127 auto-review P1). Reverse ordering would let a reader read
660 // `epoch=N` then load the prior snapshot, planning against stale schema.
661 if schema_changed {
662 schema_version.fetch_add(1, Ordering::AcqRel);
663 }
664
665 // (3) No-op provider fan-out. The FanoutGuard's thread-local counter now
666 // guards the COMMITTER thread: a provider that re-enters begin_write on the
667 // committer thread panics before locking, and the boundary below catches
668 // it. Safe with exactly one committer (v1.2 design §7.7).
669 let fanout: &[Change] = fanout_changes.as_deref().unwrap_or(&changes);
670 {
671 let _fanout_guard = crate::reentry::FanoutGuard::enter();
672 crate::provider_fanout::notify_providers(providers, generation, fanout);
673 }
674
675 // (4) Metrics + outcome.
676 metrics::counter_inc(metrics::COMMITS_TOTAL);
677 metrics::histogram_record(
678 metrics::COMMIT_DURATION_SECONDS,
679 started.elapsed().as_secs_f64(),
680 );
681 metrics::gauge_set(metrics::GRAPH_NODES, next_snapshot.node_count() as f64);
682 metrics::gauge_set(metrics::GRAPH_EDGES, next_snapshot.edge_count() as f64);
683
684 CommitOutcome {
685 generation,
686 changes,
687 principal,
688 durable_at,
689 next_node_id,
690 next_edge_id,
691 warnings,
692 }
693}
694
695/// Build a fan-out-only change list that substitutes each declarative truncate
696/// change with the per-row tombstones the mutator staged for it.
697///
698/// Returns `None` when no truncate expansions are staged, so the common
699/// (non-truncate) commit path fans out the persisted `changes` slice directly
700/// with zero allocation. When expansions are present, every truncate change at
701/// a staged index is replaced by its expansion (in order), and a truncate
702/// change with no staged expansion (an empty-label no-op) is simply dropped
703/// from fan-out — index providers see no tombstones because no rows were removed.
704fn expand_truncates_for_fanout(
705 changes: &[Change],
706 expansions: &[(usize, Vec<Change>)],
707) -> Option<Vec<Change>> {
708 if expansions.is_empty() {
709 return None;
710 }
711 let mut view = Vec::with_capacity(changes.len());
712 for (index, change) in changes.iter().enumerate() {
713 match change {
714 // BRIEF-152: GraphReset is fanned out as its staged per-row
715 // tombstones too, alongside the BRIEF-150 truncate variants, so
716 // live index providers reclaim derived state for every wiped
717 // node/edge without seeing the bare declarative reset on the commit
718 // path. WAL recovery still replays the persisted declarative change.
719 Change::NodesOfTypeTruncated { .. }
720 | Change::EdgesOfTypeTruncated { .. }
721 | Change::GraphReset { .. } => {
722 if let Some((_, expansion)) = expansions.iter().find(|(staged, _)| *staged == index)
723 {
724 view.extend(expansion.iter().cloned());
725 }
726 // A truncate/reset change with no staged expansion removed zero
727 // rows (empty/absent label, or a reset of an empty graph); it
728 // contributes nothing to fan-out.
729 }
730 other => view.push(other.clone()),
731 }
732 }
733 Some(view)
734}
735
736/// Test-only seam to drive a panic from inside [`publish_appended`] (Stage 3) on
737/// a chosen publish ordinal, so the committer's multi-member publish-panic
738/// poison-and-drain branch can be exercised deterministically.
739///
740/// This is the *only* way to reach that branch from a test: a misbehaving
741/// [`IndexProvider`]'s `on_change` panic is swallowed by
742/// [`crate::provider_fanout::notify_providers`]'s per-callback `catch_unwind`,
743/// and `snapshot.store` does not panic, so without this hook the Stage-3 panic
744/// path is unreachable through the public API. The counter is keyed on the
745/// committer thread (the sole `publish_appended` caller), so "panic on the Nth
746/// publish of this run" is deterministic. Compiled only under `cfg(test)`;
747/// production builds have no injection point and no overhead.
748#[cfg(test)]
749pub(crate) mod publish_panic_inject {
750 use std::cell::Cell;
751
752 thread_local! {
753 /// `Some(remaining)` arms the injection: each [`maybe_panic`] call
754 /// decrements it, panicking when it reaches zero. `None` (the default) is
755 /// disarmed. Thread-local so only the committer thread that ran
756 /// [`arm`] is affected, and it auto-clears after firing.
757 static COUNTDOWN: Cell<Option<u32>> = const { Cell::new(None) };
758 }
759
760 /// Arm the injection so the `after`-th subsequent [`maybe_panic`] panics
761 /// (`after = 1` ⇒ the next publish panics; `after = 2` ⇒ the second). Must be
762 /// called on the committer thread (e.g. from inside a provider `on_change`
763 /// during an earlier publish in the same run).
764 pub(crate) fn arm(after: u32) {
765 COUNTDOWN.with(|cell| cell.set(Some(after)));
766 }
767
768 /// Panic if armed and the countdown has elapsed; otherwise a no-op. Clears the
769 /// arming when it fires so exactly one panic is injected per [`arm`].
770 pub(crate) fn maybe_panic() {
771 COUNTDOWN.with(|cell| {
772 if let Some(remaining) = cell.get() {
773 let next = remaining.saturating_sub(1);
774 if next == 0 {
775 cell.set(None);
776 panic!("selene-graph test: injected Stage-3 publish_appended panic");
777 }
778 cell.set(Some(next));
779 }
780 });
781 }
782}
783
784#[cfg(test)]
785mod tests;