Skip to main content

selene_graph/
write_txn.rs

1//! Write transaction RAII handle per spec 03 sections 4 and 6.
2
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, Ordering},
6};
7
8use parking_lot::{MutexGuard, RwLockWriteGuard};
9use selene_core::Change;
10
11use crate::committer::Committer;
12use crate::error::{GraphError, GraphResult};
13use crate::graph::SeleneGraph;
14use crate::id_allocator::IdAllocator;
15use crate::index_provider::IndexProvider;
16use crate::mutator::Mutator;
17use crate::type_validator::TypeWarning;
18
19mod pipeline;
20
21pub(crate) use pipeline::{AppendedCommit, append_sealed, flush_durables, publish_appended};
22
23#[cfg(test)]
24pub(crate) use pipeline::publish_panic_inject;
25
26/// Non-fatal graph commit warning.
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub struct CommitWarning {
29    /// Closed-graph validation warning.
30    pub warning: TypeWarning,
31}
32
33/// Result metadata returned after a successful commit.
34#[derive(Clone, Debug, PartialEq)]
35pub struct CommitOutcome {
36    /// Published graph generation.
37    pub generation: u64,
38    /// Changes produced by the mutation funnel.
39    pub changes: Vec<Change>,
40    /// Opaque caller-supplied principal bytes for future WAL headers.
41    pub principal: Option<Arc<[u8]>>,
42    /// Highest durable sequence assigned by commit-critical providers.
43    pub durable_at: Option<u64>,
44    /// Next node ID after commit.
45    pub next_node_id: u64,
46    /// Next edge ID after commit.
47    pub next_edge_id: u64,
48    /// Non-fatal warnings produced during commit validation.
49    pub warnings: Vec<CommitWarning>,
50}
51
52/// A frozen, owned, `Send + 'static` commit bundle handed from a session thread
53/// to the single committer thread (v1.2 multi-writer, BRIEF 1).
54///
55/// Produced by [`WriteTxn::seal`] **after** the generation/meta bump + GG02
56/// validation have run under the write lock on the session thread (so error
57/// timing is unchanged), and after the lock + allocator guards have been
58/// released. It contains the fully-built next snapshot plus everything the
59/// committer needs to run the durable+publish tail — **no guards, no graph
60/// reference, no borrow**. The committer never re-validates, re-allocates ids,
61/// or re-applies a change list; it just stamps the HLC, appends to the WAL,
62/// publishes the frozen snapshot, and bumps the schema epoch.
63///
64/// The HLC timestamp is deliberately **not** stamped here: the committer stamps
65/// it per bundle in **seal-sequence** drain order so HLC is monotonic in commit
66/// order (== publish order == seal order). Stamping it on the session thread
67/// would break that monotonicity once seal-order and stamp-order diverge.
68///
69/// # Why a `seal_seq` (publish-order correctness, P0 fix)
70///
71/// `seal()` consumes the [`WriteTxn`], so the write lock + allocator guards drop
72/// as it returns — **before** the caller enqueues the bundle. Two sessions can
73/// therefore seal in lock order (A then B) yet `send()` in the opposite order
74/// (B before A) if A is preempted between lock-release and send. A naive FIFO
75/// committer would then publish B's gen-`N+1` snapshot before A's gen-`N`
76/// snapshot, regressing the published snapshot and losing A's older view under
77/// B — a D10 serializability violation. To prevent it, `seal()` stamps a
78/// strictly-monotonic `seal_seq` **while still holding the write lock** (so
79/// seal-seq order == lock-acquisition order == the intended total order), and
80/// the committer publishes strictly in `seal_seq` order via a reorder buffer,
81/// regardless of channel arrival order. Compaction takes a `seal_seq` from the
82/// same counter under the same lock, so a compact can never be reordered ahead
83/// of an earlier-sealed commit.
84pub(crate) struct SealedCommit {
85    /// Strictly-monotonic publish-order key, allocated under the write lock in
86    /// [`WriteTxn::seal`]. The committer publishes in ascending `seal_seq`.
87    pub(crate) seal_seq: u64,
88    /// Fully-built next snapshot, frozen under the session's write lock.
89    pub(crate) next_snapshot: Arc<SeleneGraph>,
90    /// Persisted change list (the WAL/changeset payload).
91    pub(crate) changes: Vec<Change>,
92    /// Truncate-expanded fan-out view, built on the session thread, or `None`
93    /// when no truncate/reset expansion is staged (the common path).
94    pub(crate) fanout_changes: Option<Vec<Change>>,
95    /// Opaque caller-supplied principal bytes for the WAL entry header (D12).
96    pub(crate) principal: Option<Arc<[u8]>>,
97    /// Whether the change list bumps the schema epoch.
98    pub(crate) schema_changed: bool,
99    /// Already-bumped graph generation.
100    pub(crate) generation: u64,
101    /// Next node id after this commit (peeked under the lock).
102    pub(crate) next_node_id: u64,
103    /// Next edge id after this commit (peeked under the lock).
104    pub(crate) next_edge_id: u64,
105    /// Non-fatal validation warnings collected during seal.
106    pub(crate) warnings: Vec<CommitWarning>,
107}
108
109/// Compile-time proof that [`SealedCommit`] is `Send + 'static`, so it can be
110/// moved to the committer thread. A guard or borrow leaking into the struct
111/// would fail this assertion at compile time.
112const _: fn() = || {
113    fn assert_send_static<T: Send + 'static>() {}
114    assert_send_static::<SealedCommit>();
115};
116
117/// RAII owner of the single graph write lock.
118///
119/// Since v1.2 (BRIEF 1) the transaction no longer holds the snapshot cell,
120/// schema-version, or provider handles — those moved to the single committer
121/// thread, which the transaction reaches via the cheap `Committer` submit
122/// handle. The transaction still owns the write lock + allocator guards for the
123/// duration of execution and releases them when `seal` consumes it.
124pub struct WriteTxn<'g> {
125    pub(crate) guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
126    pub(crate) committer: Committer,
127    pub(crate) pre_txn: Option<Arc<SeleneGraph>>,
128    pub(crate) allocator: MutexGuard<'g, IdAllocator>,
129    /// Index-provider registry, retained so `Mutator::index_provider_by_tag`
130    /// can resolve a provider during execution. Shares the one frozen
131    /// registry allocation with `SharedGraph` and the committer — handing it
132    /// to a transaction is a refcount bump, not a `Vec` clone.
133    pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
134    pub(crate) changes: Vec<Change>,
135    /// Per-truncate per-row tombstone expansions, keyed by the index of the
136    /// declarative truncate change in [`Self::changes`] that produced them.
137    ///
138    /// BRIEF-150 / deletion-reclamation audit Item 11. The WAL/changeset carries
139    /// only the O(1) declarative `NodesOfTypeTruncated`/`EdgesOfTypeTruncated`
140    /// change, but live index-provider fan-out must observe the same per-row
141    /// `NodeDeleted`/`EdgeDeleted` multiset a `MATCH (n:L) DETACH DELETE n` would
142    /// emit (so derived state is reclaimed without leaks). The mutator
143    /// snapshots the matched ids while it still holds the store and stages their
144    /// tombstones here; commit substitutes each truncate change with its staged
145    /// expansion before fan-out.
146    pub(crate) truncate_expansions: Vec<(usize, Vec<Change>)>,
147    pub(crate) warnings: Vec<CommitWarning>,
148}
149
150impl<'g> WriteTxn<'g> {
151    pub(crate) fn new(
152        guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
153        committer: Committer,
154        allocator: MutexGuard<'g, IdAllocator>,
155        providers: Arc<[Arc<dyn IndexProvider>]>,
156    ) -> Self {
157        let pre_txn = Some(Arc::clone(&*guard));
158        Self {
159            guard,
160            committer,
161            pre_txn,
162            allocator,
163            providers,
164            changes: Vec::new(),
165            truncate_expansions: Vec::new(),
166            warnings: Vec::new(),
167        }
168    }
169
170    /// Borrow a mutator tied to this transaction.
171    #[must_use]
172    pub fn mutator(&mut self) -> Mutator<'_, 'g> {
173        Mutator::new(self)
174    }
175
176    /// Borrow the transaction-local working graph.
177    #[must_use]
178    pub fn read(&self) -> &SeleneGraph {
179        self.guard.as_ref()
180    }
181
182    pub(crate) fn guard_mut(&mut self) -> &mut SeleneGraph {
183        Arc::make_mut(&mut *self.guard)
184    }
185
186    /// Commit without caller principal bytes.
187    pub fn commit(self) -> GraphResult<CommitOutcome> {
188        self.commit_with_principal(None)
189    }
190
191    /// Commit with optional caller-owned principal bytes for D12 audit replay.
192    ///
193    /// Since v1.2 (BRIEF 1) commit is **seal-and-handover**: this method runs
194    /// `seal` on the calling thread (generation/meta bump + GG02
195    /// validation under the write lock, then **lock release**), then submits the
196    /// resulting `SealedCommit` to the per-graph single committer thread and
197    /// blocks until it is durable + visible. The public contract is unchanged —
198    /// "`commit()` returns ⇒ durable + visible" — only the internal threading
199    /// model differs.
200    ///
201    /// GG02 closed-graph violations still abort here, on the calling thread,
202    /// before any handoff, so error timing is identical to v1.0/v1.1.
203    ///
204    /// Registered index providers are notified by the committer after the new
205    /// snapshot is published; the same-thread re-entrancy guard now protects the
206    /// committer thread (one committer ⇒ still sound). Same-thread re-entrant
207    /// provider calls into `SharedGraph::begin_write()` are detected via the
208    /// thread-local fanout counter and panic with a clear message; the
209    /// committer's fan-out boundary catches those panics (along with
210    /// callback-internal panics and returned errors) so a single misbehaving
211    /// provider can never crash the committer thread.
212    ///
213    /// # Errors
214    ///
215    /// Returns the GG02 / validation error from `seal`, or a
216    /// [`GraphError::Durable`] if the WAL append failed or the committer thread
217    /// is no longer running.
218    #[tracing::instrument(
219        name = "selene.graph.commit",
220        skip(self, principal),
221        fields(change_count = self.change_count())
222    )]
223    pub fn commit_with_principal(self, principal: Option<Arc<[u8]>>) -> GraphResult<CommitOutcome> {
224        // Clone the submit handle BEFORE sealing — `seal()` consumes `self`
225        // (dropping the write lock + allocator guards as it returns), so the
226        // session releases the lock strictly before it enqueues and blocks on
227        // `recv()`. The committer never takes the write lock (compaction builds
228        // on the caller thread too, since v1.2 BRIEF 1 P0 fix), so a session is
229        // never simultaneously lock-holding and recv-blocked.
230        let committer = self.committer.clone();
231        let sealed = self.seal(principal, None)?;
232        committer.submit_commit(sealed)
233    }
234
235    /// Run the under-lock half of commit and hand back an owned, `Send`
236    /// [`SealedCommit`] for the committer thread (v1.2 multi-writer, BRIEF 1).
237    ///
238    /// Runs, on the calling thread under the held write lock: schema-change
239    /// detection, the generation/meta bump (`generation += 1` + write the next
240    /// ids into `GraphMeta`), and GG02 closed-graph validation — which **still
241    /// aborts synchronously here** (the `?` propagates and `Drop` rolls back the
242    /// generation bump, exactly as in v1.0/v1.1). It then samples the optional
243    /// BRIEF-117 cancellation token **while the lock is still held and before
244    /// `Drop` is disarmed** (see below), allocates the strictly-monotonic
245    /// `seal_seq` under the lock, disarms `Drop` (`pre_txn = None`), clones the
246    /// now-frozen next snapshot, `mem::take`s the change list / truncate
247    /// expansions / warnings, builds the truncate-expanded fan-out view, and
248    /// returns. The write lock + allocator guards drop as this method returns
249    /// (it consumes `self`).
250    ///
251    /// # Cancellation cut-line (BRIEF-117, P0 fix)
252    ///
253    /// The cancellation token is sampled **here, under the write lock, before
254    /// disarming `Drop`** — not on the committer before the WAL append. In the
255    /// seal-and-handover model multiple commits can be sealed-but-unpublished at
256    /// once, each forking `*shared` off the previous; a commit's mutation is
257    /// therefore already woven into `*shared` (and possibly built upon by a
258    /// later seal) by the time the committer would run a pre-WAL check, so a
259    /// committer-side cancel could not surgically remove it without poisoning
260    /// the engine. Sampling under the lock means a cancelled commit is rolled
261    /// back by `Drop` exactly like a GG02 abort — `*shared` is restored, no
262    /// `seal_seq` is consumed, nothing is enqueued — so the cut-line's
263    /// guarantee ("no append, no publish, exactly as an aborted transaction
264    /// would leave it") is *literally* true. A cancel observed after `seal`
265    /// returns is too late: the commit is already in flight and irrevocable.
266    ///
267    /// The HLC timestamp is **not** stamped here (see [`SealedCommit`]); the
268    /// committer stamps it per bundle in seal-sequence drain order.
269    ///
270    /// # Errors
271    ///
272    /// Returns [`GraphError::Cancelled`] when `cancel` is set at entry (rolled
273    /// back via `Drop`), or the GG02 / closed-graph validation error
274    /// ([`GraphError::TypeViolation`]) when a change violates the bound type.
275    pub(crate) fn seal(
276        mut self,
277        principal: Option<Arc<[u8]>>,
278        cancel: Option<&AtomicBool>,
279    ) -> GraphResult<SealedCommit> {
280        debug_assert!(
281            self.pre_txn.is_some(),
282            "pre_txn must be present at seal entry"
283        );
284
285        let schema_changed = self
286            .changes
287            .iter()
288            .any(|change| matches!(change, Change::SchemaChanged { .. }));
289        let next_node_id = self.allocator.peek_next_node();
290        let next_edge_id = self.allocator.peek_next_edge();
291        {
292            let graph = self.guard_mut();
293            graph.meta.generation = graph
294                .meta
295                .generation
296                .checked_add(1)
297                .expect("graph generation exhausted");
298            graph.meta.next_node_id = next_node_id;
299            graph.meta.next_edge_id = next_edge_id;
300        }
301
302        let generation = self.read().meta.generation;
303
304        let mut validation_warnings = Vec::new();
305        if let Some(type_def) = self.read().meta.bound_type.as_deref() {
306            for change in &self.changes {
307                validation_warnings.extend(
308                    // NB: `?` here returns Err with the generation bump still
309                    // applied to the guard-Arc; `Drop` then restores `pre_txn`
310                    // and undoes the bump — error timing + rollback unchanged.
311                    crate::type_validator::validate_change(change, self.read(), type_def)?
312                        .into_iter()
313                        .map(|warning| CommitWarning { warning }),
314                );
315            }
316            if schema_changed {
317                validation_warnings.extend(
318                    crate::type_validator::validate_entity_state(self.read(), type_def)?
319                        .into_iter()
320                        .map(|warning| CommitWarning { warning }),
321                );
322            } else {
323                crate::type_validator::validate_unique_property_changes(
324                    &self.changes,
325                    self.read(),
326                    type_def,
327                )?;
328            }
329        }
330        for warning in validation_warnings {
331            if !self.warnings.contains(&warning) {
332                self.warnings.push(warning);
333            }
334        }
335
336        // BRIEF-117 cut-line: sample the cancellation token while the lock is
337        // still held and `Drop` is still armed. A cancel here returns Err with
338        // the generation bump + staged mutations still on the guard-Arc; `Drop`
339        // then restores `pre_txn`, rolling everything back exactly as a GG02
340        // abort or an aborted transaction would. Nothing is enqueued or
341        // published, and no `seal_seq` is consumed.
342        if let Some(flag) = cancel
343            && flag.load(Ordering::Acquire)
344        {
345            return Err(GraphError::Cancelled);
346        }
347
348        // Allocate the publish-order key under the lock so seal-seq order equals
349        // lock-acquisition order (the intended total order). Done after every
350        // fallible step so an aborted seal consumes no sequence number, keeping
351        // the committer's reorder sequence gap-free.
352        let seal_seq = self.committer.next_seal_seq();
353
354        // Disarm Drop-rollback: from here the commit is handed to the committer
355        // and the in-place mutations become the published state.
356        self.pre_txn = None;
357        // Freeze the next snapshot under the lock. The committer publishes this
358        // exact Arc and never rebuilds it.
359        let next_snapshot = Arc::clone(&*self.guard);
360
361        let changes = std::mem::take(&mut self.changes);
362        let truncate_expansions = std::mem::take(&mut self.truncate_expansions);
363        let warnings = std::mem::take(&mut self.warnings);
364
365        // BRIEF-150 / audit Item 11: build the fan-out-only truncate-expanded
366        // view on the session thread so the committer holds a fully-owned
367        // bundle. `None` on the common (non-truncate) path → zero allocation.
368        let fanout_changes = pipeline::expand_truncates_for_fanout(&changes, &truncate_expansions);
369
370        Ok(SealedCommit {
371            seal_seq,
372            next_snapshot,
373            changes,
374            fanout_changes,
375            principal,
376            schema_changed,
377            generation,
378            next_node_id,
379            next_edge_id,
380            warnings,
381        })
382    }
383
384    /// Roll back graph changes via `Drop` and release the write lock.
385    pub fn rollback(self) {}
386
387    /// Number of changes accumulated since this transaction opened.
388    #[must_use]
389    pub fn change_count(&self) -> usize {
390        self.changes.len()
391    }
392
393    /// Whether this transaction has accumulated schema-changing work.
394    #[must_use]
395    pub fn has_schema_changes(&self) -> bool {
396        self.changes
397            .iter()
398            .any(|change| matches!(change, Change::SchemaChanged { .. }))
399    }
400}
401
402impl Drop for WriteTxn<'_> {
403    fn drop(&mut self) {
404        if let Some(prior) = self.pre_txn.take() {
405            *self.guard = prior;
406        }
407    }
408}
409
410#[cfg(test)]
411mod tests;