selene_graph/write_txn.rs
1//! Write transaction RAII handle per spec 03 sections 4 and 6.
2
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering},
6};
7
8use parking_lot::{MutexGuard, RwLockWriteGuard};
9use selene_core::Change;
10
11use crate::committer::Committer;
12use crate::error::{GraphError, GraphResult};
13use crate::graph::SeleneGraph;
14use crate::id_allocator::IdAllocator;
15use crate::index_provider::IndexProvider;
16use crate::mutator::Mutator;
17use crate::type_validator::TypeWarning;
18
19mod pipeline;
20
21pub(crate) use pipeline::{AppendedCommit, append_sealed, flush_durables, publish_appended};
22
23#[cfg(test)]
24pub(crate) use pipeline::publish_panic_inject;
25
26/// Non-fatal graph commit warning.
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub struct CommitWarning {
29 /// Closed-graph validation warning.
30 pub warning: TypeWarning,
31}
32
33/// Result metadata returned after a successful commit.
34#[derive(Clone, Debug, PartialEq)]
35pub struct CommitOutcome {
36 /// Published graph generation.
37 pub generation: u64,
38 /// Changes produced by the mutation funnel.
39 pub changes: Vec<Change>,
40 /// Opaque caller-supplied principal bytes for future WAL headers.
41 pub principal: Option<Arc<[u8]>>,
42 /// Highest durable sequence assigned by commit-critical providers.
43 pub durable_at: Option<u64>,
44 /// Next node ID after commit.
45 pub next_node_id: u64,
46 /// Next edge ID after commit.
47 pub next_edge_id: u64,
48 /// Non-fatal warnings produced during commit validation.
49 pub warnings: Vec<CommitWarning>,
50}
51
52/// A frozen, owned, `Send + 'static` commit bundle handed from a session thread
53/// to the single committer thread (v1.2 multi-writer, BRIEF 1).
54///
55/// Produced by [`WriteTxn::seal`] **after** the generation/meta bump + GG02
56/// validation have run under the write lock on the session thread (so error
57/// timing is unchanged), and after the lock + allocator guards have been
58/// released. It contains the fully-built next snapshot plus everything the
59/// committer needs to run the durable+publish tail — **no guards, no graph
60/// reference, no borrow**. The committer never re-validates, re-allocates ids,
61/// or re-applies a change list; it just stamps the HLC, appends to the WAL,
62/// publishes the frozen snapshot, and bumps the schema epoch.
63///
64/// The HLC timestamp is deliberately **not** stamped here: the committer stamps
65/// it per bundle in **seal-sequence** drain order so HLC is monotonic in commit
66/// order (== publish order == seal order). Stamping it on the session thread
67/// would break that monotonicity once seal-order and stamp-order diverge.
68///
69/// # Why a `seal_seq` (publish-order correctness, P0 fix)
70///
71/// `seal()` consumes the [`WriteTxn`], so the write lock + allocator guards drop
72/// as it returns — **before** the caller enqueues the bundle. Two sessions can
73/// therefore seal in lock order (A then B) yet `send()` in the opposite order
74/// (B before A) if A is preempted between lock-release and send. A naive FIFO
75/// committer would then publish B's gen-`N+1` snapshot before A's gen-`N`
76/// snapshot, regressing the published snapshot and losing A's older view under
77/// B — a D10 serializability violation. To prevent it, `seal()` stamps a
78/// strictly-monotonic `seal_seq` **while still holding the write lock** (so
79/// seal-seq order == lock-acquisition order == the intended total order), and
80/// the committer publishes strictly in `seal_seq` order via a reorder buffer,
81/// regardless of channel arrival order. Compaction takes a `seal_seq` from the
82/// same counter under the same lock, so a compact can never be reordered ahead
83/// of an earlier-sealed commit.
84pub(crate) struct SealedCommit {
85 /// Strictly-monotonic publish-order key, allocated under the write lock in
86 /// [`WriteTxn::seal`]. The committer publishes in ascending `seal_seq`.
87 pub(crate) seal_seq: u64,
88 /// Fully-built next snapshot, frozen under the session's write lock.
89 pub(crate) next_snapshot: Arc<SeleneGraph>,
90 /// Persisted change list (the WAL/changeset payload).
91 pub(crate) changes: Vec<Change>,
92 /// Truncate-expanded fan-out view, built on the session thread, or `None`
93 /// when no truncate/reset expansion is staged (the common path).
94 pub(crate) fanout_changes: Option<Vec<Change>>,
95 /// Opaque caller-supplied principal bytes for the WAL entry header (D12).
96 pub(crate) principal: Option<Arc<[u8]>>,
97 /// Whether the change list bumps the schema epoch.
98 pub(crate) schema_changed: bool,
99 /// Already-bumped graph generation.
100 pub(crate) generation: u64,
101 /// Next node id after this commit (peeked under the lock).
102 pub(crate) next_node_id: u64,
103 /// Next edge id after this commit (peeked under the lock).
104 pub(crate) next_edge_id: u64,
105 /// Non-fatal validation warnings collected during seal.
106 pub(crate) warnings: Vec<CommitWarning>,
107}
108
109/// Compile-time proof that [`SealedCommit`] is `Send + 'static`, so it can be
110/// moved to the committer thread. A guard or borrow leaking into the struct
111/// would fail this assertion at compile time.
112const _: fn() = || {
113 fn assert_send_static<T: Send + 'static>() {}
114 assert_send_static::<SealedCommit>();
115};
116
117/// RAII owner of the single graph write lock.
118///
119/// Since v1.2 (BRIEF 1) the transaction no longer holds the snapshot cell,
120/// schema-version, or provider handles — those moved to the single committer
121/// thread, which the transaction reaches via the cheap `Committer` submit
122/// handle. The transaction still owns the write lock + allocator guards for the
123/// duration of execution and releases them when `seal` consumes it.
124pub struct WriteTxn<'g> {
125 pub(crate) guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
126 pub(crate) committer: Committer,
127 pub(crate) pre_txn: Option<Arc<SeleneGraph>>,
128 pub(crate) allocator: MutexGuard<'g, IdAllocator>,
129 /// Index-provider registry, retained so `Mutator::index_provider_by_tag`
130 /// can resolve a provider during execution. Shares the one frozen
131 /// registry allocation with `SharedGraph` and the committer — handing it
132 /// to a transaction is a refcount bump, not a `Vec` clone.
133 pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
134 pub(crate) changes: Vec<Change>,
135 /// Per-truncate per-row tombstone expansions, keyed by the index of the
136 /// declarative truncate change in [`Self::changes`] that produced them.
137 ///
138 /// BRIEF-150 / deletion-reclamation audit Item 11. The WAL/changeset carries
139 /// only the O(1) declarative `NodesOfTypeTruncated`/`EdgesOfTypeTruncated`
140 /// change, but live index-provider fan-out must observe the same per-row
141 /// `NodeDeleted`/`EdgeDeleted` multiset a `MATCH (n:L) DETACH DELETE n` would
142 /// emit (so derived state is reclaimed without leaks). The mutator
143 /// snapshots the matched ids while it still holds the store and stages their
144 /// tombstones here; commit substitutes each truncate change with its staged
145 /// expansion before fan-out.
146 pub(crate) truncate_expansions: Vec<(usize, Vec<Change>)>,
147 pub(crate) warnings: Vec<CommitWarning>,
148}
149
150impl<'g> WriteTxn<'g> {
151 pub(crate) fn new(
152 guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
153 committer: Committer,
154 allocator: MutexGuard<'g, IdAllocator>,
155 providers: Arc<[Arc<dyn IndexProvider>]>,
156 ) -> Self {
157 let pre_txn = Some(Arc::clone(&*guard));
158 Self {
159 guard,
160 committer,
161 pre_txn,
162 allocator,
163 providers,
164 changes: Vec::new(),
165 truncate_expansions: Vec::new(),
166 warnings: Vec::new(),
167 }
168 }
169
170 /// Borrow a mutator tied to this transaction.
171 #[must_use]
172 pub fn mutator(&mut self) -> Mutator<'_, 'g> {
173 Mutator::new(self)
174 }
175
176 /// Borrow the transaction-local working graph.
177 #[must_use]
178 pub fn read(&self) -> &SeleneGraph {
179 self.guard.as_ref()
180 }
181
182 pub(crate) fn guard_mut(&mut self) -> &mut SeleneGraph {
183 Arc::make_mut(&mut *self.guard)
184 }
185
186 /// Commit without caller principal bytes.
187 pub fn commit(self) -> GraphResult<CommitOutcome> {
188 self.commit_with_principal(None)
189 }
190
191 /// Commit with optional caller-owned principal bytes for D12 audit replay.
192 ///
193 /// Since v1.2 (BRIEF 1) commit is **seal-and-handover**: this method runs
194 /// `seal` on the calling thread (generation/meta bump + GG02
195 /// validation under the write lock, then **lock release**), then submits the
196 /// resulting `SealedCommit` to the per-graph single committer thread and
197 /// blocks until it is durable + visible. The public contract is unchanged —
198 /// "`commit()` returns ⇒ durable + visible" — only the internal threading
199 /// model differs.
200 ///
201 /// GG02 closed-graph violations still abort here, on the calling thread,
202 /// before any handoff, so error timing is identical to v1.0/v1.1.
203 ///
204 /// Registered index providers are notified by the committer after the new
205 /// snapshot is published; the same-thread re-entrancy guard now protects the
206 /// committer thread (one committer ⇒ still sound). Same-thread re-entrant
207 /// provider calls into `SharedGraph::begin_write()` are detected via the
208 /// thread-local fanout counter and panic with a clear message; the
209 /// committer's fan-out boundary catches those panics (along with
210 /// callback-internal panics and returned errors) so a single misbehaving
211 /// provider can never crash the committer thread.
212 ///
213 /// # Errors
214 ///
215 /// Returns the GG02 / validation error from `seal`, or a
216 /// [`GraphError::Durable`] if the WAL append failed or the committer thread
217 /// is no longer running.
218 #[tracing::instrument(
219 name = "selene.graph.commit",
220 skip(self, principal),
221 fields(change_count = self.change_count())
222 )]
223 pub fn commit_with_principal(self, principal: Option<Arc<[u8]>>) -> GraphResult<CommitOutcome> {
224 // Clone the submit handle BEFORE sealing — `seal()` consumes `self`
225 // (dropping the write lock + allocator guards as it returns), so the
226 // session releases the lock strictly before it enqueues and blocks on
227 // `recv()`. The committer never takes the write lock (compaction builds
228 // on the caller thread too, since v1.2 BRIEF 1 P0 fix), so a session is
229 // never simultaneously lock-holding and recv-blocked.
230 let committer = self.committer.clone();
231 let sealed = self.seal(principal, None)?;
232 committer.submit_commit(sealed)
233 }
234
235 /// Run the under-lock half of commit and hand back an owned, `Send`
236 /// [`SealedCommit`] for the committer thread (v1.2 multi-writer, BRIEF 1).
237 ///
238 /// Runs, on the calling thread under the held write lock: schema-change
239 /// detection, the generation/meta bump (`generation += 1` + write the next
240 /// ids into `GraphMeta`), and GG02 closed-graph validation — which **still
241 /// aborts synchronously here** (the `?` propagates and `Drop` rolls back the
242 /// generation bump, exactly as in v1.0/v1.1). It then samples the optional
243 /// BRIEF-117 cancellation token **while the lock is still held and before
244 /// `Drop` is disarmed** (see below), allocates the strictly-monotonic
245 /// `seal_seq` under the lock, disarms `Drop` (`pre_txn = None`), clones the
246 /// now-frozen next snapshot, `mem::take`s the change list / truncate
247 /// expansions / warnings, builds the truncate-expanded fan-out view, and
248 /// returns. The write lock + allocator guards drop as this method returns
249 /// (it consumes `self`).
250 ///
251 /// # Cancellation cut-line (BRIEF-117, P0 fix)
252 ///
253 /// The cancellation token is sampled **here, under the write lock, before
254 /// disarming `Drop`** — not on the committer before the WAL append. In the
255 /// seal-and-handover model multiple commits can be sealed-but-unpublished at
256 /// once, each forking `*shared` off the previous; a commit's mutation is
257 /// therefore already woven into `*shared` (and possibly built upon by a
258 /// later seal) by the time the committer would run a pre-WAL check, so a
259 /// committer-side cancel could not surgically remove it without poisoning
260 /// the engine. Sampling under the lock means a cancelled commit is rolled
261 /// back by `Drop` exactly like a GG02 abort — `*shared` is restored, no
262 /// `seal_seq` is consumed, nothing is enqueued — so the cut-line's
263 /// guarantee ("no append, no publish, exactly as an aborted transaction
264 /// would leave it") is *literally* true. A cancel observed after `seal`
265 /// returns is too late: the commit is already in flight and irrevocable.
266 ///
267 /// The HLC timestamp is **not** stamped here (see [`SealedCommit`]); the
268 /// committer stamps it per bundle in seal-sequence drain order.
269 ///
270 /// # Errors
271 ///
272 /// Returns [`GraphError::Cancelled`] when `cancel` is set at entry (rolled
273 /// back via `Drop`), or the GG02 / closed-graph validation error
274 /// ([`GraphError::TypeViolation`]) when a change violates the bound type.
275 pub(crate) fn seal(
276 mut self,
277 principal: Option<Arc<[u8]>>,
278 cancel: Option<&AtomicBool>,
279 ) -> GraphResult<SealedCommit> {
280 debug_assert!(
281 self.pre_txn.is_some(),
282 "pre_txn must be present at seal entry"
283 );
284
285 let schema_changed = self
286 .changes
287 .iter()
288 .any(|change| matches!(change, Change::SchemaChanged { .. }));
289 let next_node_id = self.allocator.peek_next_node();
290 let next_edge_id = self.allocator.peek_next_edge();
291 {
292 let graph = self.guard_mut();
293 graph.meta.generation = graph
294 .meta
295 .generation
296 .checked_add(1)
297 .expect("graph generation exhausted");
298 graph.meta.next_node_id = next_node_id;
299 graph.meta.next_edge_id = next_edge_id;
300 }
301
302 let generation = self.read().meta.generation;
303
304 let mut validation_warnings = Vec::new();
305 if let Some(type_def) = self.read().meta.bound_type.as_deref() {
306 for change in &self.changes {
307 validation_warnings.extend(
308 // NB: `?` here returns Err with the generation bump still
309 // applied to the guard-Arc; `Drop` then restores `pre_txn`
310 // and undoes the bump — error timing + rollback unchanged.
311 crate::type_validator::validate_change(change, self.read(), type_def)?
312 .into_iter()
313 .map(|warning| CommitWarning { warning }),
314 );
315 }
316 if schema_changed {
317 validation_warnings.extend(
318 crate::type_validator::validate_entity_state(self.read(), type_def)?
319 .into_iter()
320 .map(|warning| CommitWarning { warning }),
321 );
322 } else {
323 crate::type_validator::validate_unique_property_changes(
324 &self.changes,
325 self.read(),
326 type_def,
327 )?;
328 }
329 }
330 for warning in validation_warnings {
331 if !self.warnings.contains(&warning) {
332 self.warnings.push(warning);
333 }
334 }
335
336 // BRIEF-117 cut-line: sample the cancellation token while the lock is
337 // still held and `Drop` is still armed. A cancel here returns Err with
338 // the generation bump + staged mutations still on the guard-Arc; `Drop`
339 // then restores `pre_txn`, rolling everything back exactly as a GG02
340 // abort or an aborted transaction would. Nothing is enqueued or
341 // published, and no `seal_seq` is consumed.
342 if let Some(flag) = cancel
343 && flag.load(Ordering::Acquire)
344 {
345 return Err(GraphError::Cancelled);
346 }
347
348 // Allocate the publish-order key under the lock so seal-seq order equals
349 // lock-acquisition order (the intended total order). Done after every
350 // fallible step so an aborted seal consumes no sequence number, keeping
351 // the committer's reorder sequence gap-free.
352 let seal_seq = self.committer.next_seal_seq();
353
354 // Disarm Drop-rollback: from here the commit is handed to the committer
355 // and the in-place mutations become the published state.
356 self.pre_txn = None;
357 // Freeze the next snapshot under the lock. The committer publishes this
358 // exact Arc and never rebuilds it.
359 let next_snapshot = Arc::clone(&*self.guard);
360
361 let changes = std::mem::take(&mut self.changes);
362 let truncate_expansions = std::mem::take(&mut self.truncate_expansions);
363 let warnings = std::mem::take(&mut self.warnings);
364
365 // BRIEF-150 / audit Item 11: build the fan-out-only truncate-expanded
366 // view on the session thread so the committer holds a fully-owned
367 // bundle. `None` on the common (non-truncate) path → zero allocation.
368 let fanout_changes = pipeline::expand_truncates_for_fanout(&changes, &truncate_expansions);
369
370 Ok(SealedCommit {
371 seal_seq,
372 next_snapshot,
373 changes,
374 fanout_changes,
375 principal,
376 schema_changed,
377 generation,
378 next_node_id,
379 next_edge_id,
380 warnings,
381 })
382 }
383
384 /// Roll back graph changes via `Drop` and release the write lock.
385 pub fn rollback(self) {}
386
387 /// Number of changes accumulated since this transaction opened.
388 #[must_use]
389 pub fn change_count(&self) -> usize {
390 self.changes.len()
391 }
392
393 /// Whether this transaction has accumulated schema-changing work.
394 #[must_use]
395 pub fn has_schema_changes(&self) -> bool {
396 self.changes
397 .iter()
398 .any(|change| matches!(change, Change::SchemaChanged { .. }))
399 }
400}
401
402impl Drop for WriteTxn<'_> {
403 fn drop(&mut self) {
404 if let Some(prior) = self.pre_txn.take() {
405 *self.guard = prior;
406 }
407 }
408}
409
410#[cfg(test)]
411mod tests;