selene_graph/shared.rs
1//! Shared graph wrapper implementing lock-free reads and serialized writes.
2
3use std::path::Path;
4use std::sync::{
5 Arc,
6 atomic::{AtomicU64, Ordering},
7};
8
9use arc_swap::ArcSwap;
10use parking_lot::{Mutex, RwLock};
11
12use selene_core::{Change, DbString, GraphId, HnswIndexConfig, SchemaChange};
13use selene_persist::{AuditLog, SyncPolicy, WalConfig, WalWriter};
14
15use crate::adjacency::AdjacencyEdge;
16use crate::committer_batch::CommitBatching;
17use crate::core_provider::{CoreProvider, DurableState};
18use crate::durable_provider::DurableProvider;
19use crate::error::{GraphError, GraphResult};
20use crate::graph::{PropertyIndexEntry, SeleneGraph};
21use crate::graph_types::GraphTypeDef;
22use crate::id_allocator::IdAllocator;
23use crate::index_provider::{IndexProvider, ProviderError, ProviderTag};
24use crate::schema_index_kind::schema_kind_from;
25use crate::store::{EdgeStore, RowIndex};
26use crate::typed_index::TypedIndexKind;
27use crate::vector_index::{
28 VectorIndexConfig, VectorIndexKind, VectorIndexMaintenancePolicy, VectorIndexRebuildReport,
29};
30use crate::write_txn::WriteTxn;
31
32/// Per-graph shared runtime state.
33///
34/// Since v1.2 (BRIEF 1) every snapshot publish is funneled through a single
35/// per-graph committer thread (`CommitterThread`), which is
36/// the **sole writer** of the `snapshot` [`ArcSwap`] cell. `begin_write` hands
37/// each [`WriteTxn`] a cheap submit handle; `commit`/`compact` seal-and-submit
38/// to the committer and block until it publishes. This single-committer +
39/// sole-publisher discipline is what preserves D10 strict-serializability once
40/// `seal()` drops the write lock early — it is load-bearing and NOT
41/// type-enforced (a second committer or ArcSwap writer would silently break it).
42pub struct SharedGraph {
43 shared: Arc<RwLock<Arc<SeleneGraph>>>,
44 snapshot: Arc<ArcSwap<SeleneGraph>>,
45 schema_version: Arc<AtomicU64>,
46 allocator: Arc<Mutex<IdAllocator>>,
47 /// Fixed provider registry, frozen at construction. Shared as one
48 /// allocation so `begin_write` hands the registry to each transaction
49 /// with a single refcount bump instead of a per-transaction `Vec` clone.
50 providers: Arc<[Arc<dyn IndexProvider>]>,
51 durable_providers: Vec<Arc<dyn DurableProvider>>,
52 /// The single per-graph committer thread; sole publisher of `snapshot`.
53 /// Dropped last via [`SharedGraph`]'s implicit drop order, which joins the
54 /// thread once every outstanding [`WriteTxn`] submit handle is gone.
55 committer: crate::committer::CommitterThread,
56}
57
58/// Builder for a [`SharedGraph`] and its fixed provider registry.
59pub struct SharedGraphBuilder {
60 graph: SeleneGraph,
61 providers: Vec<Arc<dyn IndexProvider>>,
62 wal_writer: Option<WalWriter>,
63 audit_log: Option<AuditLog>,
64 commit_batching: CommitBatching,
65}
66
67impl SharedGraph {
68 /// Construct an empty shared graph.
69 #[must_use]
70 pub fn new(graph_id: GraphId) -> Self {
71 Self::from_graph(SeleneGraph::new(graph_id))
72 }
73
74 /// Start building an empty shared graph with optional providers.
75 #[must_use]
76 pub fn builder(graph_id: GraphId) -> SharedGraphBuilder {
77 SharedGraphBuilder {
78 graph: SeleneGraph::new(graph_id),
79 providers: Vec::new(),
80 wal_writer: None,
81 audit_log: None,
82 commit_batching: CommitBatching::Off,
83 }
84 }
85
86 /// Construct shared state from a pre-built graph snapshot.
87 ///
88 /// The allocator floors are derived from storage length so that stale
89 /// `GraphMeta.next_*_id` values cannot allow ID reuse over rows that
90 /// already exist (recovery hardening — spec 02 §4 forbids ID reuse).
91 ///
92 /// # Panics
93 ///
94 /// Panics if the supplied graph contains more than `u32::MAX` rows in
95 /// either store. Selene-graph's row index is `u32` by construction;
96 /// `SeleneGraph::new()` always satisfies this, and any caller-built
97 /// fixture must too. Use [`SharedGraph::try_from_graph`] for the
98 /// fallible variant when validating untrusted snapshots.
99 #[must_use]
100 pub fn from_graph(graph: SeleneGraph) -> Self {
101 Self::try_from_graph(graph).expect("graph store row count exceeds u32::MAX")
102 }
103
104 /// Fallible variant of [`SharedGraph::from_graph`]. Returns
105 /// [`GraphError::Inconsistent`] when the graph's stores exceed the
106 /// `u32` row capacity.
107 pub fn try_from_graph(graph: SeleneGraph) -> GraphResult<Self> {
108 Self::from_graph_with_core(graph, Vec::new())
109 }
110
111 /// Construct shared state from a graph snapshot and fixed provider list.
112 ///
113 /// # Errors
114 ///
115 /// Returns [`GraphError::Provider`] when two providers declare the same
116 /// [`ProviderTag`], and [`GraphError::Inconsistent`] when the graph's
117 /// stores exceed the `u32` row capacity.
118 pub fn from_graph_with_providers(
119 graph: SeleneGraph,
120 providers: Vec<Arc<dyn IndexProvider>>,
121 ) -> GraphResult<Self> {
122 Self::from_graph_with_core(graph, providers)
123 }
124
125 /// Construct shared state from a graph snapshot and commit-critical WAL file.
126 ///
127 /// Since v1.2 (BRIEF 2) the committer is the sole fsync caller, so the WAL is
128 /// **always** opened in [`SyncPolicy::OnFlushOnly`] regardless of the
129 /// `config.sync_policy` passed (it is overwritten before
130 /// [`WalWriter::open`]). This non-builder constructor uses
131 /// [`CommitBatching::Off`], so the committer still fsyncs once per commit —
132 /// behaviorally identical to BRIEF 1's `EveryN(1)`.
133 ///
134 /// # Errors
135 ///
136 /// Returns [`GraphError::Persist`] when the WAL cannot be opened, plus the
137 /// same consistency and provider-registration errors as [`Self::try_from_graph`].
138 pub fn from_graph_with_wal(
139 graph: SeleneGraph,
140 path: impl AsRef<Path>,
141 mut config: WalConfig,
142 ) -> GraphResult<Self> {
143 // BRIEF 2: the committer owns fsync via flush_durables(); force the
144 // committer-managed WAL into OnFlushOnly before opening it (overwriting
145 // any caller policy), keeping open-error timing unchanged.
146 config.sync_policy = SyncPolicy::OnFlushOnly;
147 let writer = WalWriter::open(path.as_ref(), config)?;
148 Self::from_graph_with_core_and_durables(
149 graph,
150 Vec::new(),
151 Vec::new(),
152 Some(writer),
153 None,
154 CommitBatching::Off,
155 )
156 }
157
158 fn from_graph_with_core(
159 graph: SeleneGraph,
160 providers: Vec<Arc<dyn IndexProvider>>,
161 ) -> GraphResult<Self> {
162 Self::from_graph_with_core_and_durables(
163 graph,
164 providers,
165 Vec::new(),
166 None,
167 None,
168 CommitBatching::Off,
169 )
170 }
171
172 pub(crate) fn from_graph_with_core_and_durables(
173 graph: SeleneGraph,
174 providers: Vec<Arc<dyn IndexProvider>>,
175 mut durable_providers: Vec<Arc<dyn DurableProvider>>,
176 wal_writer: Option<WalWriter>,
177 audit_log: Option<AuditLog>,
178 batching: CommitBatching,
179 ) -> GraphResult<Self> {
180 if audit_log.is_some() && wal_writer.is_none() {
181 return Err(GraphError::Inconsistent {
182 reason: "audit log configured without a WAL; audit mirroring requires durable WAL \
183 state"
184 .to_owned(),
185 });
186 }
187 let snapshot = Arc::new(ArcSwap::from_pointee(graph.clone()));
188 let has_wal = wal_writer.is_some();
189 let durable = wal_writer
190 .map(DurableState::new)
191 .map(|durable| match audit_log {
192 Some(audit) => durable.with_audit_log(audit),
193 None => durable,
194 });
195 let core = CoreProvider::new_for_live_with_wal(Arc::clone(&snapshot), durable);
196 let mut all_providers = Vec::with_capacity(providers.len() + 1);
197 all_providers.push(core.clone() as Arc<dyn IndexProvider>);
198 all_providers.extend(providers);
199 if has_wal {
200 durable_providers.push(core as Arc<dyn DurableProvider>);
201 }
202 validate_unique_provider_tags(&all_providers)?;
203 Self::from_graph_parts_and_snapshot(
204 graph,
205 all_providers,
206 durable_providers,
207 snapshot,
208 batching,
209 )
210 }
211
212 pub(crate) fn from_graph_parts_and_snapshot(
213 graph: SeleneGraph,
214 providers: Vec<Arc<dyn IndexProvider>>,
215 durable_providers: Vec<Arc<dyn DurableProvider>>,
216 snapshot: Arc<ArcSwap<SeleneGraph>>,
217 batching: CommitBatching,
218 ) -> GraphResult<Self> {
219 validate_unique_provider_tags(&providers)?;
220 // Freeze the registry into one shared allocation: the committer and
221 // every `begin_write` transaction clone the `Arc`, not the `Vec`.
222 let providers: Arc<[Arc<dyn IndexProvider>]> = providers.into();
223 let mut graph = graph;
224 rebuild_derived_state(&mut graph)?;
225 crate::property_index::rebuild_property_indexes(&mut graph)?;
226 crate::composite_property_index::rebuild_composite_property_indexes(&mut graph)?;
227 crate::vector_index::rebuild_vector_indexes(&mut graph)?;
228 crate::text_index::rebuild_text_indexes(&mut graph)?;
229 if let Some(type_def) = graph.meta.bound_type.as_deref() {
230 // Why: GraphMeta is publicly constructible, so SharedGraph::from_graph
231 // can land a malformed bound_type that bypassed builder().bound_to()'s
232 // validate(). Re-check self-consistency here so every constructor
233 // arrives at the same closed-graph admissibility contract.
234 type_def.validate_ref()?;
235 crate::type_validator::validate_entity_state(&graph, type_def)?;
236 }
237
238 let node_floor = (graph.node_store.labels.len() as u64).saturating_add(1);
239 let edge_floor = (graph.edge_store.label.len() as u64).saturating_add(1);
240 let allocator = IdAllocator::from_meta_with_floors(&graph.meta, node_floor, edge_floor);
241
242 // Debug-only structural net on the snapshot-load / recovery path: the
243 // rebuild_* helpers above re-derive all indexes from columns, so a
244 // rebuild bug would otherwise surface only as silent query
245 // corruption. Highest-value placement — verify the rebuilt snapshot
246 // before it is ever published. Compiled out in release builds.
247 #[cfg(debug_assertions)]
248 if let Err(reason) = graph.assert_indexes_consistent() {
249 return Err(GraphError::Inconsistent {
250 reason: format!("rebuilt snapshot failed index consistency check: {reason}"),
251 });
252 }
253
254 let graph = Arc::new(graph);
255 snapshot.store(Arc::clone(&graph));
256 let shared = Arc::new(RwLock::new(graph));
257 let schema_version = Arc::new(AtomicU64::new(0));
258 let allocator = Arc::new(Mutex::new(allocator));
259 // Spawn the single per-graph committer thread. It captures clones of
260 // every handle it needs to publish + compact; it is the sole writer of
261 // `snapshot`. All commit/compact/index-DDL publishes route through it.
262 let committer =
263 crate::committer::CommitterThread::spawn(crate::committer::CommitterHandles {
264 snapshot: Arc::clone(&snapshot),
265 schema_version: Arc::clone(&schema_version),
266 providers: Arc::clone(&providers),
267 durable_providers: durable_providers.clone(),
268 batching,
269 });
270 Ok(Self {
271 shared,
272 snapshot,
273 schema_version,
274 allocator,
275 providers,
276 durable_providers,
277 committer,
278 })
279 }
280
281 /// Load the current immutable snapshot without taking the write lock.
282 #[must_use]
283 pub fn read(&self) -> Arc<SeleneGraph> {
284 self.snapshot.load_full()
285 }
286
287 /// Return compaction pressure for the current published snapshot.
288 ///
289 /// This is a lock-free read of row counts and liveness counters. It does not
290 /// compact, rebuild indexes, or take the writer lock.
291 #[must_use]
292 pub fn compaction_stats(&self) -> crate::compaction::CompactionStats {
293 self.read().compaction_stats()
294 }
295
296 /// Compact the live graph in place: reclaim every dead / hole row, renumber
297 /// rows dense, and atomically republish the result so the RAM held by deleted
298 /// rows is reclaimed immediately (BRIEF-Item-4c — the live-densify half of
299 /// snapshot-time compaction).
300 ///
301 /// This is pure space reclamation: it changes only the internal row layout,
302 /// never external `NodeId`/`EdgeId`, properties, or labels, so it emits **no**
303 /// [`Change`] and writes **no** WAL entry. Durability
304 /// comes from the next snapshot, which encodes the now-dense live graph (the
305 /// CORE provider reads the same `snapshot` cell this method publishes into). A
306 /// crash before that snapshot simply reloads the pre-compaction state and
307 /// recompacts later — compaction can never lose data.
308 ///
309 /// The dense graph is built under the write lock on the calling thread
310 /// (seal-and-handover, exactly like a commit), and is allocated a publish
311 /// `seal_seq` under that same lock; the single committer then swaps it into
312 /// the published `snapshot` cell strictly in `seal_seq` order. So compaction
313 /// serializes with writers exactly like a commit and can never be reordered
314 /// ahead of an earlier-sealed commit (which would let that commit's stale,
315 /// non-dense frozen snapshot clobber the dense one). Lock-free readers keep
316 /// observing the old snapshot until the dense graph is published. The
317 /// monotonic allocator high-water marks are preserved (the live allocator is
318 /// untouched, and [`compact_core`](crate::compact_core) carries `GraphMeta`
319 /// verbatim — and the allocator is kept in sync with `GraphMeta` on every
320 /// commit), so no external id is ever reused after a later recovery.
321 ///
322 /// # Errors
323 ///
324 /// Returns [`GraphError`] if the graph's id↔row mapping is corrupt or the
325 /// recompacted graph fails its consistency check (see
326 /// [`compact_core`](crate::compact_core)).
327 pub fn compact(&self) -> GraphResult<crate::CompactionReport> {
328 // Seal-and-handover for compaction (v1.2 BRIEF 1, P1 fix): build the
329 // dense graph HERE, on the caller thread, under the write lock — exactly
330 // like a commit seals under the lock — then hand the committer a
331 // pre-built dense snapshot to publish in seal_seq order. This keeps the
332 // committer off the write lock entirely (no deadlock surface) and, more
333 // importantly, ties compaction's publish position to a seal_seq taken
334 // under the same lock as commits, so a compact can never be reordered
335 // ahead of an earlier-sealed commit (which would otherwise let an
336 // earlier commit's stale, non-dense frozen snapshot clobber the dense
337 // one in the published cell).
338 //
339 // Ordering under the lock is load-bearing for the reorder buffer's
340 // gap-free invariant: densify FIRST (the only fallible step), and only
341 // THEN allocate the seal_seq, so a failed compaction consumes no
342 // sequence number (which would otherwise wedge the committer waiting for
343 // a seq that never arrives).
344 let committer = self.committer.handle();
345 let (seal_seq, dense, report) = {
346 let mut guard = self.shared.write();
347 let compacted = crate::compaction::compact_core(&guard)?;
348 let dense = Arc::new(compacted.graph);
349 // Allocate the publish-order key under the lock, after the fallible
350 // densify, so seal_seq order == lock-acquisition order and no seq is
351 // ever burned by a failed compaction.
352 let seal_seq = committer.next_seal_seq();
353 *guard = Arc::clone(&dense);
354 (seal_seq, dense, compacted.report)
355 // Lock released here, before the (blocking) enqueue + recv — the
356 // committer never needs the write lock, but releasing here also
357 // means a compactor never holds the lock while blocked on the
358 // committer.
359 };
360 committer.submit_compact(seal_seq, dense, report)
361 }
362
363 /// Rebuild every registered vector index from primary node values.
364 ///
365 /// HNSW indexes retain stale deleted entries after vector update/delete so
366 /// in-flight search can still traverse the neighbor graph safely. This
367 /// maintenance path reclaims those stale entries by rebuilding only the
368 /// derived vector-index state; it does not change graph data, emit
369 /// [`Change`], write a WAL entry, bump schema epoch, or
370 /// notify providers. The HNSW graph is derived, not durable: snapshots and
371 /// recovery persist only vector-index registrations plus primary values, so
372 /// a reopen rebuilds the index from that authoritative state.
373 ///
374 /// The rebuild is strict on live data: if an indexed row no longer satisfies
375 /// the registered vector dimension/metric invariant, this method returns an
376 /// error instead of silently dropping the row from the index.
377 pub fn rebuild_vector_indexes(&self) -> GraphResult<VectorIndexRebuildReport> {
378 let committer = self.committer.handle();
379 let (seal_seq, rebuilt, report) = {
380 let mut guard = self.shared.write();
381 let mut rebuilt = guard.as_ref().clone();
382 let report = crate::vector_index::rebuild_vector_indexes_strict(&mut rebuilt)?;
383 let rebuilt = Arc::new(rebuilt);
384 let seal_seq = committer.next_seal_seq();
385 *guard = Arc::clone(&rebuilt);
386 (seal_seq, rebuilt, report)
387 };
388 committer.submit_vector_index_rebuild(seal_seq, rebuilt, report)
389 }
390
391 /// Rebuild only vector indexes whose diagnostics recommend maintenance.
392 ///
393 /// This is the bounded maintenance variant for IVF drift: it uses each index's current
394 /// [`ivf_rebuild_recommended`](crate::vector_index::VectorIndexMemoryUsage::ivf_rebuild_recommended)
395 /// value to decide whether to rebuild that derived index. Indexes that do not recommend rebuild
396 /// are left untouched, and a no-op call returns an empty report without publishing a maintenance
397 /// item.
398 ///
399 /// The rebuild is strict on live data for selected indexes, matching
400 /// [`Self::rebuild_vector_indexes`].
401 pub fn rebuild_recommended_vector_indexes(&self) -> GraphResult<VectorIndexRebuildReport> {
402 self.maintain_vector_indexes(VectorIndexMaintenancePolicy::recommended())
403 }
404
405 /// Maintain recommended vector indexes under a caller-supplied policy.
406 ///
407 /// This is the explicit orchestration API for amortized vector-index maintenance. It rebuilds
408 /// only indexes whose diagnostics currently recommend maintenance and applies the policy cap
409 /// after ordering recommended indexes by pending IVF retrain pressure. It remains a
410 /// maintenance-tier operation: reads never trigger it, and a no-op call returns an empty report
411 /// without publishing a derived-state replacement.
412 ///
413 /// The rebuild is strict on live data for selected indexes, matching
414 /// [`Self::rebuild_vector_indexes`].
415 pub fn maintain_vector_indexes(
416 &self,
417 policy: VectorIndexMaintenancePolicy,
418 ) -> GraphResult<VectorIndexRebuildReport> {
419 let committer = self.committer.handle();
420 let (seal_seq, rebuilt, report) = {
421 let mut guard = self.shared.write();
422 let mut rebuilt = guard.as_ref().clone();
423 let report = crate::vector_index::maintain_vector_indexes_strict(&mut rebuilt, policy)?;
424 if report.entries.is_empty() {
425 return Ok(report);
426 }
427 let rebuilt = Arc::new(rebuilt);
428 let seal_seq = committer.next_seal_seq();
429 *guard = Arc::clone(&rebuilt);
430 (seal_seq, rebuilt, report)
431 };
432 committer.submit_vector_index_rebuild(seal_seq, rebuilt, report)
433 }
434
435 /// Return the runtime schema-version epoch used for plan-cache invalidation.
436 ///
437 /// The epoch starts at zero for each [`SharedGraph`] instance and advances
438 /// only after a successful commit whose change set contains
439 /// [`Change::SchemaChanged`].
440 #[must_use]
441 pub fn schema_version(&self) -> u64 {
442 self.schema_version.load(Ordering::Acquire)
443 }
444
445 /// Return the bound graph type, if this is a closed graph.
446 #[must_use]
447 pub fn graph_type(&self) -> Option<Arc<GraphTypeDef>> {
448 self.read().meta.bound_type.as_ref().map(Arc::clone)
449 }
450
451 /// Return true when this graph is bound to a closed graph type.
452 #[must_use]
453 pub fn is_closed(&self) -> bool {
454 self.read().meta.bound_type.is_some()
455 }
456
457 /// Look up a registered provider by tag.
458 #[must_use]
459 pub fn index_provider_by_tag(&self, tag: ProviderTag) -> Option<Arc<dyn IndexProvider>> {
460 self.providers
461 .iter()
462 .find_map(|provider| (provider.provider_tag() == tag).then(|| Arc::clone(provider)))
463 }
464
465 /// Borrow the fixed provider registry for executor procedure contexts.
466 #[must_use]
467 pub fn index_providers(&self) -> &[Arc<dyn IndexProvider>] {
468 &self.providers
469 }
470
471 /// Borrow the fixed commit-critical durable provider registry.
472 #[must_use]
473 pub fn durable_providers(&self) -> &[Arc<dyn DurableProvider>] {
474 &self.durable_providers
475 }
476
477 /// Register a built-in node property index for `(label, property)`.
478 ///
479 /// The current node columns are scanned under the write lock and the
480 /// published snapshot is updated in one transaction.
481 ///
482 /// # Errors
483 ///
484 /// Returns [`GraphError::PropertyIndexAlreadyExists`] if the pair is
485 /// already registered, or [`GraphError::IndexValueRejected`] if any
486 /// existing node with `label` has a non-null value that does not match
487 /// `kind`.
488 pub fn create_property_index(
489 &self,
490 label: DbString,
491 property: DbString,
492 kind: TypedIndexKind,
493 ) -> GraphResult<()> {
494 self.create_property_index_named(label, property, kind, None)
495 }
496
497 /// Register a built-in node property index with optional catalog name.
498 pub fn create_property_index_named(
499 &self,
500 label: DbString,
501 property: DbString,
502 kind: TypedIndexKind,
503 name: Option<DbString>,
504 ) -> GraphResult<()> {
505 let mut txn = self.begin_write();
506 if txn
507 .read()
508 .property_index
509 .contains_key(&(label.clone(), property.clone()))
510 {
511 return Err(GraphError::PropertyIndexAlreadyExists { label, property });
512 }
513 let index = crate::property_index::build_property_index(
514 txn.read(),
515 label.clone(),
516 property.clone(),
517 kind,
518 )?;
519 txn.guard_mut().property_index.insert(
520 (label.clone(), property.clone()),
521 PropertyIndexEntry::new(index, name.clone()),
522 );
523 let graph = txn.read().graph_id();
524 txn.changes.push(Change::SchemaChanged {
525 graph,
526 change: SchemaChange::PropertyIndexCreatedNamed {
527 label,
528 property,
529 kind: schema_kind_from(kind),
530 name,
531 },
532 });
533 txn.commit()?;
534 Ok(())
535 }
536
537 /// Drop a built-in node property index.
538 ///
539 /// The operation is idempotent; dropping an absent index succeeds without
540 /// publishing a new snapshot.
541 pub fn drop_property_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
542 let mut txn = self.begin_write();
543 if !txn
544 .read()
545 .property_index
546 .contains_key(&(label.clone(), property.clone()))
547 {
548 return Ok(());
549 }
550 txn.guard_mut()
551 .property_index
552 .remove(&(label.clone(), property.clone()));
553 let graph = txn.read().graph_id();
554 txn.changes.push(Change::SchemaChanged {
555 graph,
556 change: SchemaChange::PropertyIndexDropped { label, property },
557 });
558 txn.commit()?;
559 Ok(())
560 }
561
562 /// Register a built-in node vector index for `(label, property)`.
563 ///
564 /// The current node columns are scanned under the write lock and the
565 /// published snapshot is updated in one transaction.
566 ///
567 /// # Errors
568 ///
569 /// Returns [`GraphError::VectorIndexAlreadyExists`] if the pair is already
570 /// registered, [`GraphError::VectorIndexInvalidDimension`] when `dimension`
571 /// is zero, or [`GraphError::VectorIndexValueRejected`] if any existing
572 /// node with `label` has a non-null value for `property` that is not a
573 /// vector with the declared dimension.
574 pub fn create_vector_index(
575 &self,
576 label: DbString,
577 property: DbString,
578 kind: VectorIndexKind,
579 dimension: u32,
580 ) -> GraphResult<()> {
581 self.create_vector_index_named(label, property, kind, dimension, None)
582 }
583
584 /// Register a built-in node vector index with optional catalog name.
585 pub fn create_vector_index_named(
586 &self,
587 label: DbString,
588 property: DbString,
589 kind: VectorIndexKind,
590 dimension: u32,
591 name: Option<DbString>,
592 ) -> GraphResult<()> {
593 self.create_vector_index_named_with_config(label, property, kind, dimension, name, None)
594 }
595
596 /// Register a built-in node vector index with optional HNSW construction config.
597 pub fn create_vector_index_named_with_config(
598 &self,
599 label: DbString,
600 property: DbString,
601 kind: VectorIndexKind,
602 dimension: u32,
603 name: Option<DbString>,
604 hnsw_config: Option<HnswIndexConfig>,
605 ) -> GraphResult<()> {
606 self.create_vector_index_named_with_configs(
607 label,
608 property,
609 kind,
610 dimension,
611 name,
612 VectorIndexConfig::new(hnsw_config, None),
613 )
614 }
615
616 /// Register a built-in node vector index with optional ANN construction config.
617 pub fn create_vector_index_named_with_configs(
618 &self,
619 label: DbString,
620 property: DbString,
621 kind: VectorIndexKind,
622 dimension: u32,
623 name: Option<DbString>,
624 config: VectorIndexConfig,
625 ) -> GraphResult<()> {
626 let mut txn = self.begin_write();
627 txn.mutator().create_vector_index_named_with_configs(
628 label, property, kind, dimension, name, config,
629 )?;
630 txn.commit()?;
631 Ok(())
632 }
633
634 /// Drop a built-in node vector index.
635 ///
636 /// The operation is idempotent; dropping an absent index succeeds without
637 /// publishing a new snapshot.
638 pub fn drop_vector_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
639 let mut txn = self.begin_write();
640 txn.mutator().drop_vector_index(label, property)?;
641 txn.commit()?;
642 Ok(())
643 }
644
645 /// Register a built-in node text index for `(label, property)`.
646 ///
647 /// The current node columns are scanned under the write lock and the
648 /// published snapshot is updated in one transaction.
649 ///
650 /// # Errors
651 ///
652 /// Returns [`GraphError::TextIndexAlreadyExists`] if the pair is already
653 /// registered, or [`GraphError::Inconsistent`] if index construction
654 /// observes corrupt graph columns.
655 pub fn create_text_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
656 self.create_text_index_named(label, property, None)
657 }
658
659 /// Register a built-in node text index with optional catalog name.
660 pub fn create_text_index_named(
661 &self,
662 label: DbString,
663 property: DbString,
664 name: Option<DbString>,
665 ) -> GraphResult<()> {
666 let mut txn = self.begin_write();
667 txn.mutator()
668 .create_text_index_named(label, property, name)?;
669 txn.commit()?;
670 Ok(())
671 }
672
673 /// Drop a built-in node text index.
674 ///
675 /// The operation is idempotent; dropping an absent index succeeds without
676 /// publishing a new snapshot.
677 pub fn drop_text_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
678 let mut txn = self.begin_write();
679 txn.mutator().drop_text_index(label, property)?;
680 txn.commit()?;
681 Ok(())
682 }
683
684 /// Begin a write transaction by acquiring the single graph write lock.
685 ///
686 /// Concurrent writers from other threads queue normally on the write
687 /// lock; the engine does **not** panic legitimate concurrent writes
688 /// during another commit's provider fanout.
689 ///
690 /// Since v1.2 (BRIEF 1) the actual snapshot publish happens on the single
691 /// committer thread, not here: `WriteTxn::commit` seals under this lock,
692 /// releases it, and hands the frozen bundle to the committer. Provider
693 /// fan-out therefore now runs on the **committer thread**, so the
694 /// re-entrancy guard below protects the committer thread (sound with exactly
695 /// one committer — see `reentry.rs` and the v1.2 design §7.7).
696 ///
697 /// # Panics
698 ///
699 /// Panics when called from inside an [`IndexProvider`] callback **on
700 /// the committer thread** as the active fanout. Re-entrant writes from a
701 /// provider callback are unsupported; the committer is publishing, so a
702 /// nested write would recurse indefinitely. The panic is caught by the
703 /// committer's `notify_providers` boundary; provider state may drift, but
704 /// the commit still completes.
705 ///
706 /// Cross-thread re-entry — a provider spawning a worker thread that
707 /// calls `begin_write` and waiting for it — is **documented misuse**
708 /// rather than a detectable footgun (the engine cannot trace causal
709 /// thread ancestry). See the module docs in `reentry.rs` and the
710 /// `IndexProvider` rustdoc for the contract.
711 #[must_use]
712 #[tracing::instrument(name = "selene.graph.begin_write", skip(self))]
713 pub fn begin_write(&self) -> WriteTxn<'_> {
714 if crate::reentry::in_fanout() {
715 panic!(
716 "selene-graph: SharedGraph::begin_write() called from within \
717 a provider fan-out callback on the committer thread; \
718 re-entrant writes from a provider callback are not supported. \
719 The committer's fan-out boundary will catch this panic; \
720 the commit succeeds, but the offending provider's \
721 chained mutation does not."
722 );
723 }
724 WriteTxn::new(
725 self.shared.write(),
726 self.committer.handle(),
727 self.allocator.lock(),
728 Arc::clone(&self.providers),
729 )
730 }
731
732 #[cfg(test)]
733 pub(crate) fn locked_arc_ptr_for_test(&self) -> *const SeleneGraph {
734 let guard = self.shared.read();
735 Arc::as_ptr(&*guard)
736 }
737
738 /// Read the generation of the **live RwLock graph** (`*shared`), as opposed
739 /// to the published `ArcSwap` snapshot. Used by divergence tests to assert
740 /// the two never disagree after a failed / cancelled commit (the P0
741 /// WAL-failure + cancel rollback invariants).
742 #[cfg(test)]
743 pub(crate) fn locked_generation_for_test(&self) -> u64 {
744 self.shared.read().meta.generation
745 }
746
747 /// Submit an already-[`seal`](crate::WriteTxn::seal)ed commit straight to the
748 /// committer, blocking until it is durable + visible. Test-only seam for
749 /// exercising the BRIEF-117 cancellation cut-line (which has no production
750 /// producer yet) without re-entering `commit_with_principal`.
751 #[cfg(test)]
752 pub(crate) fn submit_sealed_for_test(
753 &self,
754 sealed: crate::write_txn::SealedCommit,
755 ) -> GraphResult<crate::CommitOutcome> {
756 self.committer.handle().submit_commit(sealed)
757 }
758
759 /// Enqueue a sealed commit and return its reply receiver without waiting.
760 #[cfg(test)]
761 pub(crate) fn submit_sealed_async_for_test(
762 &self,
763 sealed: crate::write_txn::SealedCommit,
764 ) -> GraphResult<std::sync::mpsc::Receiver<GraphResult<crate::CommitOutcome>>> {
765 self.committer.handle().submit_commit_async_for_test(sealed)
766 }
767}
768
769impl SharedGraphBuilder {
770 /// Register an index provider.
771 ///
772 /// Providers are retained in registration order, which is the order used
773 /// for committed mutation delivery.
774 #[must_use]
775 pub fn with_provider(mut self, provider: Arc<dyn IndexProvider>) -> Self {
776 self.providers.push(provider);
777 self
778 }
779
780 /// Open a WAL file and route commits through the CORE durable provider.
781 ///
782 /// The path is the WAL file path, not a directory. Callers using the
783 /// conventional layout should pass `dir.join(selene_persist::DEFAULT_WAL_FILE_NAME)`.
784 ///
785 /// # SyncPolicy is OVERRIDDEN (v1.2 BRIEF 2 — read this)
786 ///
787 /// The single per-graph committer thread is the **sole fsync caller** for the
788 /// committer-managed WAL: it appends a contiguous run of commits with fsync
789 /// deferred, then issues exactly one [`WalWriter::flush`] per run (the R1
790 /// fsync-before-publish barrier). To make that the *only* fsync path, this
791 /// method **forces `config.sync_policy` to [`SyncPolicy::OnFlushOnly`]**
792 /// before opening the WAL — **whatever policy you pass is discarded.** The
793 /// fsync cadence is instead controlled by [`Self::with_commit_batching`]:
794 /// [`CommitBatching::Off`] (the default) fsyncs once per commit (behaviorally
795 /// identical to the old `EveryN(1)`), and [`CommitBatching::On`] coalesces a
796 /// contiguous run into one fsync. `config.snapshot_seq` is passed through
797 /// verbatim. Durability is unchanged: the committer always flushes before it
798 /// publishes or acks, so a commit is durable before it is ever visible.
799 ///
800 /// # Errors
801 ///
802 /// Returns [`GraphError::Persist`] when the WAL cannot be opened, including
803 /// when another writer already holds the file lock.
804 pub fn with_wal(mut self, path: impl AsRef<Path>, mut config: WalConfig) -> GraphResult<Self> {
805 // BRIEF 2: the committer owns fsync. Force OnFlushOnly before opening so
806 // the committer's group flush is the single durability barrier. Done
807 // before WalWriter::open so open-error timing (e.g. WriterLockHeld) is
808 // unchanged for existing .unwrap() call sites.
809 config.sync_policy = SyncPolicy::OnFlushOnly;
810 self.wal_writer = Some(WalWriter::open(path.as_ref(), config)?);
811 Ok(self)
812 }
813
814 /// Set the group-commit batching policy for the committer-managed WAL
815 /// (v1.2 BRIEF 2). Default [`CommitBatching::Off`].
816 ///
817 /// With [`CommitBatching::Off`] the committer fsyncs once per commit
818 /// (behaviorally identical to BRIEF 1). With [`CommitBatching::On`] it
819 /// coalesces up to `max_commits` (capped by aggregate `max_bytes`) contiguous
820 /// commits into one fsync — higher throughput + lower tail latency under
821 /// fan-in, at the cost of grouping several commits behind one barrier (all
822 /// still durable before any of them is acked or published). Has no effect
823 /// without [`Self::with_wal`] (no durable provider to flush).
824 #[must_use]
825 pub fn with_commit_batching(mut self, batching: CommitBatching) -> Self {
826 self.commit_batching = batching;
827 self
828 }
829
830 /// Attach a durable audit log at `path` (conventionally
831 /// `dir.join(selene_persist::DEFAULT_AUDIT_FILE_NAME)`).
832 ///
833 /// Engine-owned audit events committed through this graph are mirrored to
834 /// the audit log so they survive WAL-archive pruning (Item 7 / Seam D, D24).
835 /// Requires [`Self::with_wal`]: audit mirroring is part of the durable
836 /// commit path, so [`Self::build`] errors if an audit log is configured
837 /// without a WAL.
838 ///
839 /// # Errors
840 ///
841 /// Returns [`GraphError::Persist`] when the audit log cannot be opened.
842 pub fn with_audit_log(mut self, path: impl AsRef<Path>) -> GraphResult<Self> {
843 self.audit_log = Some(AuditLog::open(path.as_ref()).map_err(GraphError::Persist)?);
844 Ok(self)
845 }
846
847 /// Bind this graph to `type_def` at construction time.
848 ///
849 /// # Errors
850 ///
851 /// Returns [`GraphError::Inconsistent`] when the builder is already bound
852 /// or when `type_def` fails self-consistency validation.
853 pub fn bound_to(mut self, type_def: GraphTypeDef) -> GraphResult<Self> {
854 if self.graph.meta.bound_type.is_some() {
855 return Err(GraphError::Inconsistent {
856 reason: "graph builder is already bound to a graph type".to_owned(),
857 });
858 }
859 self.graph.meta.bound_type = Some(Arc::new(type_def.validate()?));
860 Ok(self)
861 }
862
863 /// Build shared graph state and validate provider registration.
864 ///
865 /// # Errors
866 ///
867 /// Returns [`GraphError::Provider`] when provider tags are duplicated.
868 pub fn build(self) -> GraphResult<SharedGraph> {
869 SharedGraph::from_graph_with_core_and_durables(
870 self.graph,
871 self.providers,
872 Vec::new(),
873 self.wal_writer,
874 self.audit_log,
875 self.commit_batching,
876 )
877 }
878}
879
880mod rebuild;
881pub(crate) use rebuild::{rebuild_derived_state, validate_unique_provider_tags};
882
883#[cfg(test)]
884mod compaction_tests;
885#[cfg(test)]
886#[path = "shared_property_tests.rs"]
887mod property_tests;
888#[cfg(test)]
889mod tests;