1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
//! Write transaction RAII handle per spec 03 sections 4 and 6.
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use parking_lot::{MutexGuard, RwLockWriteGuard};
use selene_core::Change;
use crate::committer::Committer;
use crate::error::{GraphError, GraphResult};
use crate::graph::SeleneGraph;
use crate::id_allocator::IdAllocator;
use crate::index_provider::IndexProvider;
use crate::mutator::Mutator;
use crate::type_validator::TypeWarning;
mod pipeline;
pub(crate) use pipeline::{AppendedCommit, append_sealed, flush_durables, publish_appended};
#[cfg(test)]
pub(crate) use pipeline::publish_panic_inject;
/// Non-fatal graph commit warning.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CommitWarning {
/// Closed-graph validation warning.
pub warning: TypeWarning,
}
/// Result metadata returned after a successful commit.
#[derive(Clone, Debug, PartialEq)]
pub struct CommitOutcome {
/// Published graph generation.
pub generation: u64,
/// Changes produced by the mutation funnel.
pub changes: Vec<Change>,
/// Opaque caller-supplied principal bytes for future WAL headers.
pub principal: Option<Arc<[u8]>>,
/// Highest durable sequence assigned by commit-critical providers.
pub durable_at: Option<u64>,
/// Next node ID after commit.
pub next_node_id: u64,
/// Next edge ID after commit.
pub next_edge_id: u64,
/// Non-fatal warnings produced during commit validation.
pub warnings: Vec<CommitWarning>,
}
/// A frozen, owned, `Send + 'static` commit bundle handed from a session thread
/// to the single committer thread (v1.2 multi-writer, BRIEF 1).
///
/// Produced by [`WriteTxn::seal`] **after** the generation/meta bump + GG02
/// validation have run under the write lock on the session thread (so error
/// timing is unchanged), and after the lock + allocator guards have been
/// released. It contains the fully-built next snapshot plus everything the
/// committer needs to run the durable+publish tail — **no guards, no graph
/// reference, no borrow**. The committer never re-validates, re-allocates ids,
/// or re-applies a change list; it just stamps the HLC, appends to the WAL,
/// publishes the frozen snapshot, and bumps the schema epoch.
///
/// The HLC timestamp is deliberately **not** stamped here: the committer stamps
/// it per bundle in **seal-sequence** drain order so HLC is monotonic in commit
/// order (== publish order == seal order). Stamping it on the session thread
/// would break that monotonicity once seal-order and stamp-order diverge.
///
/// # Why a `seal_seq` (publish-order correctness, P0 fix)
///
/// `seal()` consumes the [`WriteTxn`], so the write lock + allocator guards drop
/// as it returns — **before** the caller enqueues the bundle. Two sessions can
/// therefore seal in lock order (A then B) yet `send()` in the opposite order
/// (B before A) if A is preempted between lock-release and send. A naive FIFO
/// committer would then publish B's gen-`N+1` snapshot before A's gen-`N`
/// snapshot, regressing the published snapshot and losing A's older view under
/// B — a D10 serializability violation. To prevent it, `seal()` stamps a
/// strictly-monotonic `seal_seq` **while still holding the write lock** (so
/// seal-seq order == lock-acquisition order == the intended total order), and
/// the committer publishes strictly in `seal_seq` order via a reorder buffer,
/// regardless of channel arrival order. Compaction takes a `seal_seq` from the
/// same counter under the same lock, so a compact can never be reordered ahead
/// of an earlier-sealed commit.
pub(crate) struct SealedCommit {
/// Strictly-monotonic publish-order key, allocated under the write lock in
/// [`WriteTxn::seal`]. The committer publishes in ascending `seal_seq`.
pub(crate) seal_seq: u64,
/// Fully-built next snapshot, frozen under the session's write lock.
pub(crate) next_snapshot: Arc<SeleneGraph>,
/// Persisted change list (the WAL/changeset payload).
pub(crate) changes: Vec<Change>,
/// Truncate-expanded fan-out view, built on the session thread, or `None`
/// when no truncate/reset expansion is staged (the common path).
pub(crate) fanout_changes: Option<Vec<Change>>,
/// Opaque caller-supplied principal bytes for the WAL entry header (D12).
pub(crate) principal: Option<Arc<[u8]>>,
/// Whether the change list bumps the schema epoch.
pub(crate) schema_changed: bool,
/// Already-bumped graph generation.
pub(crate) generation: u64,
/// Next node id after this commit (peeked under the lock).
pub(crate) next_node_id: u64,
/// Next edge id after this commit (peeked under the lock).
pub(crate) next_edge_id: u64,
/// Non-fatal validation warnings collected during seal.
pub(crate) warnings: Vec<CommitWarning>,
}
/// Compile-time proof that [`SealedCommit`] is `Send + 'static`, so it can be
/// moved to the committer thread. A guard or borrow leaking into the struct
/// would fail this assertion at compile time.
const _: fn() = || {
fn assert_send_static<T: Send + 'static>() {}
assert_send_static::<SealedCommit>();
};
/// RAII owner of the single graph write lock.
///
/// Since v1.2 (BRIEF 1) the transaction no longer holds the snapshot cell,
/// schema-version, or provider handles — those moved to the single committer
/// thread, which the transaction reaches via the cheap `Committer` submit
/// handle. The transaction still owns the write lock + allocator guards for the
/// duration of execution and releases them when `seal` consumes it.
pub struct WriteTxn<'g> {
pub(crate) guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
pub(crate) committer: Committer,
pub(crate) pre_txn: Option<Arc<SeleneGraph>>,
pub(crate) allocator: MutexGuard<'g, IdAllocator>,
/// Index-provider registry, retained so `Mutator::index_provider_by_tag`
/// can resolve a provider during execution. Shares the one frozen
/// registry allocation with `SharedGraph` and the committer — handing it
/// to a transaction is a refcount bump, not a `Vec` clone.
pub(crate) providers: Arc<[Arc<dyn IndexProvider>]>,
pub(crate) changes: Vec<Change>,
/// Per-truncate per-row tombstone expansions, keyed by the index of the
/// declarative truncate change in [`Self::changes`] that produced them.
///
/// BRIEF-150 / deletion-reclamation audit Item 11. The WAL/changeset carries
/// only the O(1) declarative `NodesOfTypeTruncated`/`EdgesOfTypeTruncated`
/// change, but live index-provider fan-out must observe the same per-row
/// `NodeDeleted`/`EdgeDeleted` multiset a `MATCH (n:L) DETACH DELETE n` would
/// emit (so derived state is reclaimed without leaks). The mutator
/// snapshots the matched ids while it still holds the store and stages their
/// tombstones here; commit substitutes each truncate change with its staged
/// expansion before fan-out.
pub(crate) truncate_expansions: Vec<(usize, Vec<Change>)>,
pub(crate) warnings: Vec<CommitWarning>,
}
impl<'g> WriteTxn<'g> {
pub(crate) fn new(
guard: RwLockWriteGuard<'g, Arc<SeleneGraph>>,
committer: Committer,
allocator: MutexGuard<'g, IdAllocator>,
providers: Arc<[Arc<dyn IndexProvider>]>,
) -> Self {
let pre_txn = Some(Arc::clone(&*guard));
Self {
guard,
committer,
pre_txn,
allocator,
providers,
changes: Vec::new(),
truncate_expansions: Vec::new(),
warnings: Vec::new(),
}
}
/// Borrow a mutator tied to this transaction.
#[must_use]
pub fn mutator(&mut self) -> Mutator<'_, 'g> {
Mutator::new(self)
}
/// Borrow the transaction-local working graph.
#[must_use]
pub fn read(&self) -> &SeleneGraph {
self.guard.as_ref()
}
pub(crate) fn guard_mut(&mut self) -> &mut SeleneGraph {
Arc::make_mut(&mut *self.guard)
}
/// Commit without caller principal bytes.
pub fn commit(self) -> GraphResult<CommitOutcome> {
self.commit_with_principal(None)
}
/// Commit with optional caller-owned principal bytes for D12 audit replay.
///
/// Since v1.2 (BRIEF 1) commit is **seal-and-handover**: this method runs
/// `seal` on the calling thread (generation/meta bump + GG02
/// validation under the write lock, then **lock release**), then submits the
/// resulting `SealedCommit` to the per-graph single committer thread and
/// blocks until it is durable + visible. The public contract is unchanged —
/// "`commit()` returns ⇒ durable + visible" — only the internal threading
/// model differs.
///
/// GG02 closed-graph violations still abort here, on the calling thread,
/// before any handoff, so error timing is identical to v1.0/v1.1.
///
/// Registered index providers are notified by the committer after the new
/// snapshot is published; the same-thread re-entrancy guard now protects the
/// committer thread (one committer ⇒ still sound). Same-thread re-entrant
/// provider calls into `SharedGraph::begin_write()` are detected via the
/// thread-local fanout counter and panic with a clear message; the
/// committer's fan-out boundary catches those panics (along with
/// callback-internal panics and returned errors) so a single misbehaving
/// provider can never crash the committer thread.
///
/// # Errors
///
/// Returns the GG02 / validation error from `seal`, or a
/// [`GraphError::Durable`] if the WAL append failed or the committer thread
/// is no longer running.
#[tracing::instrument(
name = "selene.graph.commit",
skip(self, principal),
fields(change_count = self.change_count())
)]
pub fn commit_with_principal(self, principal: Option<Arc<[u8]>>) -> GraphResult<CommitOutcome> {
// Clone the submit handle BEFORE sealing — `seal()` consumes `self`
// (dropping the write lock + allocator guards as it returns), so the
// session releases the lock strictly before it enqueues and blocks on
// `recv()`. The committer never takes the write lock (compaction builds
// on the caller thread too, since v1.2 BRIEF 1 P0 fix), so a session is
// never simultaneously lock-holding and recv-blocked.
let committer = self.committer.clone();
let sealed = self.seal(principal, None)?;
committer.submit_commit(sealed)
}
/// Run the under-lock half of commit and hand back an owned, `Send`
/// [`SealedCommit`] for the committer thread (v1.2 multi-writer, BRIEF 1).
///
/// Runs, on the calling thread under the held write lock: schema-change
/// detection, the generation/meta bump (`generation += 1` + write the next
/// ids into `GraphMeta`), and GG02 closed-graph validation — which **still
/// aborts synchronously here** (the `?` propagates and `Drop` rolls back the
/// generation bump, exactly as in v1.0/v1.1). It then samples the optional
/// BRIEF-117 cancellation token **while the lock is still held and before
/// `Drop` is disarmed** (see below), allocates the strictly-monotonic
/// `seal_seq` under the lock, disarms `Drop` (`pre_txn = None`), clones the
/// now-frozen next snapshot, `mem::take`s the change list / truncate
/// expansions / warnings, builds the truncate-expanded fan-out view, and
/// returns. The write lock + allocator guards drop as this method returns
/// (it consumes `self`).
///
/// # Cancellation cut-line (BRIEF-117, P0 fix)
///
/// The cancellation token is sampled **here, under the write lock, before
/// disarming `Drop`** — not on the committer before the WAL append. In the
/// seal-and-handover model multiple commits can be sealed-but-unpublished at
/// once, each forking `*shared` off the previous; a commit's mutation is
/// therefore already woven into `*shared` (and possibly built upon by a
/// later seal) by the time the committer would run a pre-WAL check, so a
/// committer-side cancel could not surgically remove it without poisoning
/// the engine. Sampling under the lock means a cancelled commit is rolled
/// back by `Drop` exactly like a GG02 abort — `*shared` is restored, no
/// `seal_seq` is consumed, nothing is enqueued — so the cut-line's
/// guarantee ("no append, no publish, exactly as an aborted transaction
/// would leave it") is *literally* true. A cancel observed after `seal`
/// returns is too late: the commit is already in flight and irrevocable.
///
/// The HLC timestamp is **not** stamped here (see [`SealedCommit`]); the
/// committer stamps it per bundle in seal-sequence drain order.
///
/// # Errors
///
/// Returns [`GraphError::Cancelled`] when `cancel` is set at entry (rolled
/// back via `Drop`), or the GG02 / closed-graph validation error
/// ([`GraphError::TypeViolation`]) when a change violates the bound type.
pub(crate) fn seal(
mut self,
principal: Option<Arc<[u8]>>,
cancel: Option<&AtomicBool>,
) -> GraphResult<SealedCommit> {
debug_assert!(
self.pre_txn.is_some(),
"pre_txn must be present at seal entry"
);
let schema_changed = self
.changes
.iter()
.any(|change| matches!(change, Change::SchemaChanged { .. }));
let next_node_id = self.allocator.peek_next_node();
let next_edge_id = self.allocator.peek_next_edge();
{
let graph = self.guard_mut();
graph.meta.generation = graph
.meta
.generation
.checked_add(1)
.expect("graph generation exhausted");
graph.meta.next_node_id = next_node_id;
graph.meta.next_edge_id = next_edge_id;
}
let generation = self.read().meta.generation;
let mut validation_warnings = Vec::new();
if let Some(type_def) = self.read().meta.bound_type.as_deref() {
for change in &self.changes {
validation_warnings.extend(
// NB: `?` here returns Err with the generation bump still
// applied to the guard-Arc; `Drop` then restores `pre_txn`
// and undoes the bump — error timing + rollback unchanged.
crate::type_validator::validate_change(change, self.read(), type_def)?
.into_iter()
.map(|warning| CommitWarning { warning }),
);
}
if schema_changed {
validation_warnings.extend(
crate::type_validator::validate_entity_state(self.read(), type_def)?
.into_iter()
.map(|warning| CommitWarning { warning }),
);
} else {
crate::type_validator::validate_unique_property_changes(
&self.changes,
self.read(),
type_def,
)?;
}
}
for warning in validation_warnings {
if !self.warnings.contains(&warning) {
self.warnings.push(warning);
}
}
// BRIEF-117 cut-line: sample the cancellation token while the lock is
// still held and `Drop` is still armed. A cancel here returns Err with
// the generation bump + staged mutations still on the guard-Arc; `Drop`
// then restores `pre_txn`, rolling everything back exactly as a GG02
// abort or an aborted transaction would. Nothing is enqueued or
// published, and no `seal_seq` is consumed.
if let Some(flag) = cancel
&& flag.load(Ordering::Acquire)
{
return Err(GraphError::Cancelled);
}
// Allocate the publish-order key under the lock so seal-seq order equals
// lock-acquisition order (the intended total order). Done after every
// fallible step so an aborted seal consumes no sequence number, keeping
// the committer's reorder sequence gap-free.
let seal_seq = self.committer.next_seal_seq();
// Disarm Drop-rollback: from here the commit is handed to the committer
// and the in-place mutations become the published state.
self.pre_txn = None;
// Freeze the next snapshot under the lock. The committer publishes this
// exact Arc and never rebuilds it.
let next_snapshot = Arc::clone(&*self.guard);
let changes = std::mem::take(&mut self.changes);
let truncate_expansions = std::mem::take(&mut self.truncate_expansions);
let warnings = std::mem::take(&mut self.warnings);
// BRIEF-150 / audit Item 11: build the fan-out-only truncate-expanded
// view on the session thread so the committer holds a fully-owned
// bundle. `None` on the common (non-truncate) path → zero allocation.
let fanout_changes = pipeline::expand_truncates_for_fanout(&changes, &truncate_expansions);
Ok(SealedCommit {
seal_seq,
next_snapshot,
changes,
fanout_changes,
principal,
schema_changed,
generation,
next_node_id,
next_edge_id,
warnings,
})
}
/// Roll back graph changes via `Drop` and release the write lock.
pub fn rollback(self) {}
/// Number of changes accumulated since this transaction opened.
#[must_use]
pub fn change_count(&self) -> usize {
self.changes.len()
}
/// Whether this transaction has accumulated schema-changing work.
#[must_use]
pub fn has_schema_changes(&self) -> bool {
self.changes
.iter()
.any(|change| matches!(change, Change::SchemaChanged { .. }))
}
}
impl Drop for WriteTxn<'_> {
fn drop(&mut self) {
if let Some(prior) = self.pre_txn.take() {
*self.guard = prior;
}
}
}
#[cfg(test)]
mod tests;