Skip to main content

selene_graph/
shared.rs

1//! Shared graph wrapper implementing lock-free reads and serialized writes.
2
3use std::path::Path;
4use std::sync::{
5    Arc,
6    atomic::{AtomicU64, Ordering},
7};
8
9use arc_swap::ArcSwap;
10use parking_lot::{Mutex, RwLock};
11
12use selene_core::{Change, DbString, GraphId, HnswIndexConfig, SchemaChange};
13use selene_persist::{AuditLog, SyncPolicy, WalConfig, WalWriter};
14
15use crate::adjacency::AdjacencyEdge;
16use crate::committer_batch::CommitBatching;
17use crate::core_provider::{CoreProvider, DurableState};
18use crate::durable_provider::DurableProvider;
19use crate::error::{GraphError, GraphResult};
20use crate::graph::{PropertyIndexEntry, SeleneGraph};
21use crate::graph_types::GraphTypeDef;
22use crate::id_allocator::IdAllocator;
23use crate::index_provider::{IndexProvider, ProviderError, ProviderTag};
24use crate::schema_index_kind::schema_kind_from;
25use crate::store::{EdgeStore, RowIndex};
26use crate::typed_index::TypedIndexKind;
27use crate::vector_index::{
28    VectorIndexConfig, VectorIndexKind, VectorIndexMaintenancePolicy, VectorIndexRebuildReport,
29};
30use crate::write_txn::WriteTxn;
31
32/// Per-graph shared runtime state.
33///
34/// Since v1.2 (BRIEF 1) every snapshot publish is funneled through a single
35/// per-graph committer thread (`CommitterThread`), which is
36/// the **sole writer** of the `snapshot` [`ArcSwap`] cell. `begin_write` hands
37/// each [`WriteTxn`] a cheap submit handle; `commit`/`compact` seal-and-submit
38/// to the committer and block until it publishes. This single-committer +
39/// sole-publisher discipline is what preserves D10 strict-serializability once
40/// `seal()` drops the write lock early — it is load-bearing and NOT
41/// type-enforced (a second committer or ArcSwap writer would silently break it).
42pub struct SharedGraph {
43    shared: Arc<RwLock<Arc<SeleneGraph>>>,
44    snapshot: Arc<ArcSwap<SeleneGraph>>,
45    schema_version: Arc<AtomicU64>,
46    allocator: Arc<Mutex<IdAllocator>>,
47    /// Fixed provider registry, frozen at construction. Shared as one
48    /// allocation so `begin_write` hands the registry to each transaction
49    /// with a single refcount bump instead of a per-transaction `Vec` clone.
50    providers: Arc<[Arc<dyn IndexProvider>]>,
51    durable_providers: Vec<Arc<dyn DurableProvider>>,
52    /// The single per-graph committer thread; sole publisher of `snapshot`.
53    /// Dropped last via [`SharedGraph`]'s implicit drop order, which joins the
54    /// thread once every outstanding [`WriteTxn`] submit handle is gone.
55    committer: crate::committer::CommitterThread,
56}
57
58/// Builder for a [`SharedGraph`] and its fixed provider registry.
59pub struct SharedGraphBuilder {
60    graph: SeleneGraph,
61    providers: Vec<Arc<dyn IndexProvider>>,
62    wal_writer: Option<WalWriter>,
63    audit_log: Option<AuditLog>,
64    commit_batching: CommitBatching,
65}
66
67impl SharedGraph {
68    /// Construct an empty shared graph.
69    #[must_use]
70    pub fn new(graph_id: GraphId) -> Self {
71        Self::from_graph(SeleneGraph::new(graph_id))
72    }
73
74    /// Start building an empty shared graph with optional providers.
75    #[must_use]
76    pub fn builder(graph_id: GraphId) -> SharedGraphBuilder {
77        SharedGraphBuilder {
78            graph: SeleneGraph::new(graph_id),
79            providers: Vec::new(),
80            wal_writer: None,
81            audit_log: None,
82            commit_batching: CommitBatching::Off,
83        }
84    }
85
86    /// Construct shared state from a pre-built graph snapshot.
87    ///
88    /// The allocator floors are derived from storage length so that stale
89    /// `GraphMeta.next_*_id` values cannot allow ID reuse over rows that
90    /// already exist (recovery hardening — spec 02 §4 forbids ID reuse).
91    ///
92    /// # Panics
93    ///
94    /// Panics if the supplied graph contains more than `u32::MAX` rows in
95    /// either store. Selene-graph's row index is `u32` by construction;
96    /// `SeleneGraph::new()` always satisfies this, and any caller-built
97    /// fixture must too. Use [`SharedGraph::try_from_graph`] for the
98    /// fallible variant when validating untrusted snapshots.
99    #[must_use]
100    pub fn from_graph(graph: SeleneGraph) -> Self {
101        Self::try_from_graph(graph).expect("graph store row count exceeds u32::MAX")
102    }
103
104    /// Fallible variant of [`SharedGraph::from_graph`]. Returns
105    /// [`GraphError::Inconsistent`] when the graph's stores exceed the
106    /// `u32` row capacity.
107    pub fn try_from_graph(graph: SeleneGraph) -> GraphResult<Self> {
108        Self::from_graph_with_core(graph, Vec::new())
109    }
110
111    /// Construct shared state from a graph snapshot and fixed provider list.
112    ///
113    /// # Errors
114    ///
115    /// Returns [`GraphError::Provider`] when two providers declare the same
116    /// [`ProviderTag`], and [`GraphError::Inconsistent`] when the graph's
117    /// stores exceed the `u32` row capacity.
118    pub fn from_graph_with_providers(
119        graph: SeleneGraph,
120        providers: Vec<Arc<dyn IndexProvider>>,
121    ) -> GraphResult<Self> {
122        Self::from_graph_with_core(graph, providers)
123    }
124
125    /// Construct shared state from a graph snapshot and commit-critical WAL file.
126    ///
127    /// Since v1.2 (BRIEF 2) the committer is the sole fsync caller, so the WAL is
128    /// **always** opened in [`SyncPolicy::OnFlushOnly`] regardless of the
129    /// `config.sync_policy` passed (it is overwritten before
130    /// [`WalWriter::open`]). This non-builder constructor uses
131    /// [`CommitBatching::Off`], so the committer still fsyncs once per commit —
132    /// behaviorally identical to BRIEF 1's `EveryN(1)`.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`GraphError::Persist`] when the WAL cannot be opened, plus the
137    /// same consistency and provider-registration errors as [`Self::try_from_graph`].
138    pub fn from_graph_with_wal(
139        graph: SeleneGraph,
140        path: impl AsRef<Path>,
141        mut config: WalConfig,
142    ) -> GraphResult<Self> {
143        // BRIEF 2: the committer owns fsync via flush_durables(); force the
144        // committer-managed WAL into OnFlushOnly before opening it (overwriting
145        // any caller policy), keeping open-error timing unchanged.
146        config.sync_policy = SyncPolicy::OnFlushOnly;
147        let writer = WalWriter::open(path.as_ref(), config)?;
148        Self::from_graph_with_core_and_durables(
149            graph,
150            Vec::new(),
151            Vec::new(),
152            Some(writer),
153            None,
154            CommitBatching::Off,
155        )
156    }
157
158    fn from_graph_with_core(
159        graph: SeleneGraph,
160        providers: Vec<Arc<dyn IndexProvider>>,
161    ) -> GraphResult<Self> {
162        Self::from_graph_with_core_and_durables(
163            graph,
164            providers,
165            Vec::new(),
166            None,
167            None,
168            CommitBatching::Off,
169        )
170    }
171
172    pub(crate) fn from_graph_with_core_and_durables(
173        graph: SeleneGraph,
174        providers: Vec<Arc<dyn IndexProvider>>,
175        mut durable_providers: Vec<Arc<dyn DurableProvider>>,
176        wal_writer: Option<WalWriter>,
177        audit_log: Option<AuditLog>,
178        batching: CommitBatching,
179    ) -> GraphResult<Self> {
180        if audit_log.is_some() && wal_writer.is_none() {
181            return Err(GraphError::Inconsistent {
182                reason: "audit log configured without a WAL; audit mirroring requires durable WAL \
183                         state"
184                    .to_owned(),
185            });
186        }
187        let snapshot = Arc::new(ArcSwap::from_pointee(graph.clone()));
188        let has_wal = wal_writer.is_some();
189        let durable = wal_writer
190            .map(DurableState::new)
191            .map(|durable| match audit_log {
192                Some(audit) => durable.with_audit_log(audit),
193                None => durable,
194            });
195        let core = CoreProvider::new_for_live_with_wal(Arc::clone(&snapshot), durable);
196        let mut all_providers = Vec::with_capacity(providers.len() + 1);
197        all_providers.push(core.clone() as Arc<dyn IndexProvider>);
198        all_providers.extend(providers);
199        if has_wal {
200            durable_providers.push(core as Arc<dyn DurableProvider>);
201        }
202        validate_unique_provider_tags(&all_providers)?;
203        Self::from_graph_parts_and_snapshot(
204            graph,
205            all_providers,
206            durable_providers,
207            snapshot,
208            batching,
209        )
210    }
211
212    pub(crate) fn from_graph_parts_and_snapshot(
213        graph: SeleneGraph,
214        providers: Vec<Arc<dyn IndexProvider>>,
215        durable_providers: Vec<Arc<dyn DurableProvider>>,
216        snapshot: Arc<ArcSwap<SeleneGraph>>,
217        batching: CommitBatching,
218    ) -> GraphResult<Self> {
219        validate_unique_provider_tags(&providers)?;
220        // Freeze the registry into one shared allocation: the committer and
221        // every `begin_write` transaction clone the `Arc`, not the `Vec`.
222        let providers: Arc<[Arc<dyn IndexProvider>]> = providers.into();
223        let mut graph = graph;
224        rebuild_derived_state(&mut graph)?;
225        crate::property_index::rebuild_property_indexes(&mut graph)?;
226        crate::composite_property_index::rebuild_composite_property_indexes(&mut graph)?;
227        crate::vector_index::rebuild_vector_indexes(&mut graph)?;
228        crate::text_index::rebuild_text_indexes(&mut graph)?;
229        if let Some(type_def) = graph.meta.bound_type.as_deref() {
230            // Why: GraphMeta is publicly constructible, so SharedGraph::from_graph
231            // can land a malformed bound_type that bypassed builder().bound_to()'s
232            // validate(). Re-check self-consistency here so every constructor
233            // arrives at the same closed-graph admissibility contract.
234            type_def.validate_ref()?;
235            crate::type_validator::validate_entity_state(&graph, type_def)?;
236        }
237
238        let node_floor = (graph.node_store.labels.len() as u64).saturating_add(1);
239        let edge_floor = (graph.edge_store.label.len() as u64).saturating_add(1);
240        let allocator = IdAllocator::from_meta_with_floors(&graph.meta, node_floor, edge_floor);
241
242        // Debug-only structural net on the snapshot-load / recovery path: the
243        // rebuild_* helpers above re-derive all indexes from columns, so a
244        // rebuild bug would otherwise surface only as silent query
245        // corruption. Highest-value placement — verify the rebuilt snapshot
246        // before it is ever published. Compiled out in release builds.
247        #[cfg(debug_assertions)]
248        if let Err(reason) = graph.assert_indexes_consistent() {
249            return Err(GraphError::Inconsistent {
250                reason: format!("rebuilt snapshot failed index consistency check: {reason}"),
251            });
252        }
253
254        let graph = Arc::new(graph);
255        snapshot.store(Arc::clone(&graph));
256        let shared = Arc::new(RwLock::new(graph));
257        let schema_version = Arc::new(AtomicU64::new(0));
258        let allocator = Arc::new(Mutex::new(allocator));
259        // Spawn the single per-graph committer thread. It captures clones of
260        // every handle it needs to publish + compact; it is the sole writer of
261        // `snapshot`. All commit/compact/index-DDL publishes route through it.
262        let committer =
263            crate::committer::CommitterThread::spawn(crate::committer::CommitterHandles {
264                snapshot: Arc::clone(&snapshot),
265                schema_version: Arc::clone(&schema_version),
266                providers: Arc::clone(&providers),
267                durable_providers: durable_providers.clone(),
268                batching,
269            });
270        Ok(Self {
271            shared,
272            snapshot,
273            schema_version,
274            allocator,
275            providers,
276            durable_providers,
277            committer,
278        })
279    }
280
281    /// Load the current immutable snapshot without taking the write lock.
282    #[must_use]
283    pub fn read(&self) -> Arc<SeleneGraph> {
284        self.snapshot.load_full()
285    }
286
287    /// Return compaction pressure for the current published snapshot.
288    ///
289    /// This is a lock-free read of row counts and liveness counters. It does not
290    /// compact, rebuild indexes, or take the writer lock.
291    #[must_use]
292    pub fn compaction_stats(&self) -> crate::compaction::CompactionStats {
293        self.read().compaction_stats()
294    }
295
296    /// Compact the live graph in place: reclaim every dead / hole row, renumber
297    /// rows dense, and atomically republish the result so the RAM held by deleted
298    /// rows is reclaimed immediately (BRIEF-Item-4c — the live-densify half of
299    /// snapshot-time compaction).
300    ///
301    /// This is pure space reclamation: it changes only the internal row layout,
302    /// never external `NodeId`/`EdgeId`, properties, or labels, so it emits **no**
303    /// [`Change`] and writes **no** WAL entry. Durability
304    /// comes from the next snapshot, which encodes the now-dense live graph (the
305    /// CORE provider reads the same `snapshot` cell this method publishes into). A
306    /// crash before that snapshot simply reloads the pre-compaction state and
307    /// recompacts later — compaction can never lose data.
308    ///
309    /// The dense graph is built under the write lock on the calling thread
310    /// (seal-and-handover, exactly like a commit), and is allocated a publish
311    /// `seal_seq` under that same lock; the single committer then swaps it into
312    /// the published `snapshot` cell strictly in `seal_seq` order. So compaction
313    /// serializes with writers exactly like a commit and can never be reordered
314    /// ahead of an earlier-sealed commit (which would let that commit's stale,
315    /// non-dense frozen snapshot clobber the dense one). Lock-free readers keep
316    /// observing the old snapshot until the dense graph is published. The
317    /// monotonic allocator high-water marks are preserved (the live allocator is
318    /// untouched, and [`compact_core`](crate::compact_core) carries `GraphMeta`
319    /// verbatim — and the allocator is kept in sync with `GraphMeta` on every
320    /// commit), so no external id is ever reused after a later recovery.
321    ///
322    /// # Errors
323    ///
324    /// Returns [`GraphError`] if the graph's id↔row mapping is corrupt or the
325    /// recompacted graph fails its consistency check (see
326    /// [`compact_core`](crate::compact_core)).
327    pub fn compact(&self) -> GraphResult<crate::CompactionReport> {
328        // Seal-and-handover for compaction (v1.2 BRIEF 1, P1 fix): build the
329        // dense graph HERE, on the caller thread, under the write lock — exactly
330        // like a commit seals under the lock — then hand the committer a
331        // pre-built dense snapshot to publish in seal_seq order. This keeps the
332        // committer off the write lock entirely (no deadlock surface) and, more
333        // importantly, ties compaction's publish position to a seal_seq taken
334        // under the same lock as commits, so a compact can never be reordered
335        // ahead of an earlier-sealed commit (which would otherwise let an
336        // earlier commit's stale, non-dense frozen snapshot clobber the dense
337        // one in the published cell).
338        //
339        // Ordering under the lock is load-bearing for the reorder buffer's
340        // gap-free invariant: densify FIRST (the only fallible step), and only
341        // THEN allocate the seal_seq, so a failed compaction consumes no
342        // sequence number (which would otherwise wedge the committer waiting for
343        // a seq that never arrives).
344        let committer = self.committer.handle();
345        let (seal_seq, dense, report) = {
346            let mut guard = self.shared.write();
347            let compacted = crate::compaction::compact_core(&guard)?;
348            let dense = Arc::new(compacted.graph);
349            // Allocate the publish-order key under the lock, after the fallible
350            // densify, so seal_seq order == lock-acquisition order and no seq is
351            // ever burned by a failed compaction.
352            let seal_seq = committer.next_seal_seq();
353            *guard = Arc::clone(&dense);
354            (seal_seq, dense, compacted.report)
355            // Lock released here, before the (blocking) enqueue + recv — the
356            // committer never needs the write lock, but releasing here also
357            // means a compactor never holds the lock while blocked on the
358            // committer.
359        };
360        committer.submit_compact(seal_seq, dense, report)
361    }
362
363    /// Rebuild every registered vector index from primary node values.
364    ///
365    /// HNSW indexes retain stale deleted entries after vector update/delete so
366    /// in-flight search can still traverse the neighbor graph safely. This
367    /// maintenance path reclaims those stale entries by rebuilding only the
368    /// derived vector-index state; it does not change graph data, emit
369    /// [`Change`], write a WAL entry, bump schema epoch, or
370    /// notify providers. The HNSW graph is derived, not durable: snapshots and
371    /// recovery persist only vector-index registrations plus primary values, so
372    /// a reopen rebuilds the index from that authoritative state.
373    ///
374    /// The rebuild is strict on live data: if an indexed row no longer satisfies
375    /// the registered vector dimension/metric invariant, this method returns an
376    /// error instead of silently dropping the row from the index.
377    pub fn rebuild_vector_indexes(&self) -> GraphResult<VectorIndexRebuildReport> {
378        let committer = self.committer.handle();
379        let (seal_seq, rebuilt, report) = {
380            let mut guard = self.shared.write();
381            let mut rebuilt = guard.as_ref().clone();
382            let report = crate::vector_index::rebuild_vector_indexes_strict(&mut rebuilt)?;
383            let rebuilt = Arc::new(rebuilt);
384            let seal_seq = committer.next_seal_seq();
385            *guard = Arc::clone(&rebuilt);
386            (seal_seq, rebuilt, report)
387        };
388        committer.submit_vector_index_rebuild(seal_seq, rebuilt, report)
389    }
390
391    /// Rebuild only vector indexes whose diagnostics recommend maintenance.
392    ///
393    /// This is the bounded maintenance variant for IVF drift: it uses each index's current
394    /// [`ivf_rebuild_recommended`](crate::vector_index::VectorIndexMemoryUsage::ivf_rebuild_recommended)
395    /// value to decide whether to rebuild that derived index. Indexes that do not recommend rebuild
396    /// are left untouched, and a no-op call returns an empty report without publishing a maintenance
397    /// item.
398    ///
399    /// The rebuild is strict on live data for selected indexes, matching
400    /// [`Self::rebuild_vector_indexes`].
401    pub fn rebuild_recommended_vector_indexes(&self) -> GraphResult<VectorIndexRebuildReport> {
402        self.maintain_vector_indexes(VectorIndexMaintenancePolicy::recommended())
403    }
404
405    /// Maintain recommended vector indexes under a caller-supplied policy.
406    ///
407    /// This is the explicit orchestration API for amortized vector-index maintenance. It rebuilds
408    /// only indexes whose diagnostics currently recommend maintenance and applies the policy cap
409    /// after ordering recommended indexes by pending IVF retrain pressure. It remains a
410    /// maintenance-tier operation: reads never trigger it, and a no-op call returns an empty report
411    /// without publishing a derived-state replacement.
412    ///
413    /// The rebuild is strict on live data for selected indexes, matching
414    /// [`Self::rebuild_vector_indexes`].
415    pub fn maintain_vector_indexes(
416        &self,
417        policy: VectorIndexMaintenancePolicy,
418    ) -> GraphResult<VectorIndexRebuildReport> {
419        let committer = self.committer.handle();
420        let (seal_seq, rebuilt, report) = {
421            let mut guard = self.shared.write();
422            let mut rebuilt = guard.as_ref().clone();
423            let report = crate::vector_index::maintain_vector_indexes_strict(&mut rebuilt, policy)?;
424            if report.entries.is_empty() {
425                return Ok(report);
426            }
427            let rebuilt = Arc::new(rebuilt);
428            let seal_seq = committer.next_seal_seq();
429            *guard = Arc::clone(&rebuilt);
430            (seal_seq, rebuilt, report)
431        };
432        committer.submit_vector_index_rebuild(seal_seq, rebuilt, report)
433    }
434
435    /// Return the runtime schema-version epoch used for plan-cache invalidation.
436    ///
437    /// The epoch starts at zero for each [`SharedGraph`] instance and advances
438    /// only after a successful commit whose change set contains
439    /// [`Change::SchemaChanged`].
440    #[must_use]
441    pub fn schema_version(&self) -> u64 {
442        self.schema_version.load(Ordering::Acquire)
443    }
444
445    /// Return the bound graph type, if this is a closed graph.
446    #[must_use]
447    pub fn graph_type(&self) -> Option<Arc<GraphTypeDef>> {
448        self.read().meta.bound_type.as_ref().map(Arc::clone)
449    }
450
451    /// Return true when this graph is bound to a closed graph type.
452    #[must_use]
453    pub fn is_closed(&self) -> bool {
454        self.read().meta.bound_type.is_some()
455    }
456
457    /// Look up a registered provider by tag.
458    #[must_use]
459    pub fn index_provider_by_tag(&self, tag: ProviderTag) -> Option<Arc<dyn IndexProvider>> {
460        self.providers
461            .iter()
462            .find_map(|provider| (provider.provider_tag() == tag).then(|| Arc::clone(provider)))
463    }
464
465    /// Borrow the fixed provider registry for executor procedure contexts.
466    #[must_use]
467    pub fn index_providers(&self) -> &[Arc<dyn IndexProvider>] {
468        &self.providers
469    }
470
471    /// Borrow the fixed commit-critical durable provider registry.
472    #[must_use]
473    pub fn durable_providers(&self) -> &[Arc<dyn DurableProvider>] {
474        &self.durable_providers
475    }
476
477    /// Register a built-in node property index for `(label, property)`.
478    ///
479    /// The current node columns are scanned under the write lock and the
480    /// published snapshot is updated in one transaction.
481    ///
482    /// # Errors
483    ///
484    /// Returns [`GraphError::PropertyIndexAlreadyExists`] if the pair is
485    /// already registered, or [`GraphError::IndexValueRejected`] if any
486    /// existing node with `label` has a non-null value that does not match
487    /// `kind`.
488    pub fn create_property_index(
489        &self,
490        label: DbString,
491        property: DbString,
492        kind: TypedIndexKind,
493    ) -> GraphResult<()> {
494        self.create_property_index_named(label, property, kind, None)
495    }
496
497    /// Register a built-in node property index with optional catalog name.
498    pub fn create_property_index_named(
499        &self,
500        label: DbString,
501        property: DbString,
502        kind: TypedIndexKind,
503        name: Option<DbString>,
504    ) -> GraphResult<()> {
505        let mut txn = self.begin_write();
506        if txn
507            .read()
508            .property_index
509            .contains_key(&(label.clone(), property.clone()))
510        {
511            return Err(GraphError::PropertyIndexAlreadyExists { label, property });
512        }
513        let index = crate::property_index::build_property_index(
514            txn.read(),
515            label.clone(),
516            property.clone(),
517            kind,
518        )?;
519        txn.guard_mut().property_index.insert(
520            (label.clone(), property.clone()),
521            PropertyIndexEntry::new(index, name.clone()),
522        );
523        let graph = txn.read().graph_id();
524        txn.changes.push(Change::SchemaChanged {
525            graph,
526            change: SchemaChange::PropertyIndexCreatedNamed {
527                label,
528                property,
529                kind: schema_kind_from(kind),
530                name,
531            },
532        });
533        txn.commit()?;
534        Ok(())
535    }
536
537    /// Drop a built-in node property index.
538    ///
539    /// The operation is idempotent; dropping an absent index succeeds without
540    /// publishing a new snapshot.
541    pub fn drop_property_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
542        let mut txn = self.begin_write();
543        if !txn
544            .read()
545            .property_index
546            .contains_key(&(label.clone(), property.clone()))
547        {
548            return Ok(());
549        }
550        txn.guard_mut()
551            .property_index
552            .remove(&(label.clone(), property.clone()));
553        let graph = txn.read().graph_id();
554        txn.changes.push(Change::SchemaChanged {
555            graph,
556            change: SchemaChange::PropertyIndexDropped { label, property },
557        });
558        txn.commit()?;
559        Ok(())
560    }
561
562    /// Register a built-in node vector index for `(label, property)`.
563    ///
564    /// The current node columns are scanned under the write lock and the
565    /// published snapshot is updated in one transaction.
566    ///
567    /// # Errors
568    ///
569    /// Returns [`GraphError::VectorIndexAlreadyExists`] if the pair is already
570    /// registered, [`GraphError::VectorIndexInvalidDimension`] when `dimension`
571    /// is zero, or [`GraphError::VectorIndexValueRejected`] if any existing
572    /// node with `label` has a non-null value for `property` that is not a
573    /// vector with the declared dimension.
574    pub fn create_vector_index(
575        &self,
576        label: DbString,
577        property: DbString,
578        kind: VectorIndexKind,
579        dimension: u32,
580    ) -> GraphResult<()> {
581        self.create_vector_index_named(label, property, kind, dimension, None)
582    }
583
584    /// Register a built-in node vector index with optional catalog name.
585    pub fn create_vector_index_named(
586        &self,
587        label: DbString,
588        property: DbString,
589        kind: VectorIndexKind,
590        dimension: u32,
591        name: Option<DbString>,
592    ) -> GraphResult<()> {
593        self.create_vector_index_named_with_config(label, property, kind, dimension, name, None)
594    }
595
596    /// Register a built-in node vector index with optional HNSW construction config.
597    pub fn create_vector_index_named_with_config(
598        &self,
599        label: DbString,
600        property: DbString,
601        kind: VectorIndexKind,
602        dimension: u32,
603        name: Option<DbString>,
604        hnsw_config: Option<HnswIndexConfig>,
605    ) -> GraphResult<()> {
606        self.create_vector_index_named_with_configs(
607            label,
608            property,
609            kind,
610            dimension,
611            name,
612            VectorIndexConfig::new(hnsw_config, None),
613        )
614    }
615
616    /// Register a built-in node vector index with optional ANN construction config.
617    pub fn create_vector_index_named_with_configs(
618        &self,
619        label: DbString,
620        property: DbString,
621        kind: VectorIndexKind,
622        dimension: u32,
623        name: Option<DbString>,
624        config: VectorIndexConfig,
625    ) -> GraphResult<()> {
626        let mut txn = self.begin_write();
627        txn.mutator().create_vector_index_named_with_configs(
628            label, property, kind, dimension, name, config,
629        )?;
630        txn.commit()?;
631        Ok(())
632    }
633
634    /// Drop a built-in node vector index.
635    ///
636    /// The operation is idempotent; dropping an absent index succeeds without
637    /// publishing a new snapshot.
638    pub fn drop_vector_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
639        let mut txn = self.begin_write();
640        txn.mutator().drop_vector_index(label, property)?;
641        txn.commit()?;
642        Ok(())
643    }
644
645    /// Register a built-in node text index for `(label, property)`.
646    ///
647    /// The current node columns are scanned under the write lock and the
648    /// published snapshot is updated in one transaction.
649    ///
650    /// # Errors
651    ///
652    /// Returns [`GraphError::TextIndexAlreadyExists`] if the pair is already
653    /// registered, or [`GraphError::Inconsistent`] if index construction
654    /// observes corrupt graph columns.
655    pub fn create_text_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
656        self.create_text_index_named(label, property, None)
657    }
658
659    /// Register a built-in node text index with optional catalog name.
660    pub fn create_text_index_named(
661        &self,
662        label: DbString,
663        property: DbString,
664        name: Option<DbString>,
665    ) -> GraphResult<()> {
666        let mut txn = self.begin_write();
667        txn.mutator()
668            .create_text_index_named(label, property, name)?;
669        txn.commit()?;
670        Ok(())
671    }
672
673    /// Drop a built-in node text index.
674    ///
675    /// The operation is idempotent; dropping an absent index succeeds without
676    /// publishing a new snapshot.
677    pub fn drop_text_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
678        let mut txn = self.begin_write();
679        txn.mutator().drop_text_index(label, property)?;
680        txn.commit()?;
681        Ok(())
682    }
683
684    /// Begin a write transaction by acquiring the single graph write lock.
685    ///
686    /// Concurrent writers from other threads queue normally on the write
687    /// lock; the engine does **not** panic legitimate concurrent writes
688    /// during another commit's provider fanout.
689    ///
690    /// Since v1.2 (BRIEF 1) the actual snapshot publish happens on the single
691    /// committer thread, not here: `WriteTxn::commit` seals under this lock,
692    /// releases it, and hands the frozen bundle to the committer. Provider
693    /// fan-out therefore now runs on the **committer thread**, so the
694    /// re-entrancy guard below protects the committer thread (sound with exactly
695    /// one committer — see `reentry.rs` and the v1.2 design §7.7).
696    ///
697    /// # Panics
698    ///
699    /// Panics when called from inside an [`IndexProvider`] callback **on
700    /// the committer thread** as the active fanout. Re-entrant writes from a
701    /// provider callback are unsupported; the committer is publishing, so a
702    /// nested write would recurse indefinitely. The panic is caught by the
703    /// committer's `notify_providers` boundary; provider state may drift, but
704    /// the commit still completes.
705    ///
706    /// Cross-thread re-entry — a provider spawning a worker thread that
707    /// calls `begin_write` and waiting for it — is **documented misuse**
708    /// rather than a detectable footgun (the engine cannot trace causal
709    /// thread ancestry). See the module docs in `reentry.rs` and the
710    /// `IndexProvider` rustdoc for the contract.
711    #[must_use]
712    #[tracing::instrument(name = "selene.graph.begin_write", skip(self))]
713    pub fn begin_write(&self) -> WriteTxn<'_> {
714        if crate::reentry::in_fanout() {
715            panic!(
716                "selene-graph: SharedGraph::begin_write() called from within \
717                 a provider fan-out callback on the committer thread; \
718                 re-entrant writes from a provider callback are not supported. \
719                 The committer's fan-out boundary will catch this panic; \
720                 the commit succeeds, but the offending provider's \
721                 chained mutation does not."
722            );
723        }
724        WriteTxn::new(
725            self.shared.write(),
726            self.committer.handle(),
727            self.allocator.lock(),
728            Arc::clone(&self.providers),
729        )
730    }
731
732    #[cfg(test)]
733    pub(crate) fn locked_arc_ptr_for_test(&self) -> *const SeleneGraph {
734        let guard = self.shared.read();
735        Arc::as_ptr(&*guard)
736    }
737
738    /// Read the generation of the **live RwLock graph** (`*shared`), as opposed
739    /// to the published `ArcSwap` snapshot. Used by divergence tests to assert
740    /// the two never disagree after a failed / cancelled commit (the P0
741    /// WAL-failure + cancel rollback invariants).
742    #[cfg(test)]
743    pub(crate) fn locked_generation_for_test(&self) -> u64 {
744        self.shared.read().meta.generation
745    }
746
747    /// Submit an already-[`seal`](crate::WriteTxn::seal)ed commit straight to the
748    /// committer, blocking until it is durable + visible. Test-only seam for
749    /// exercising the BRIEF-117 cancellation cut-line (which has no production
750    /// producer yet) without re-entering `commit_with_principal`.
751    #[cfg(test)]
752    pub(crate) fn submit_sealed_for_test(
753        &self,
754        sealed: crate::write_txn::SealedCommit,
755    ) -> GraphResult<crate::CommitOutcome> {
756        self.committer.handle().submit_commit(sealed)
757    }
758
759    /// Enqueue a sealed commit and return its reply receiver without waiting.
760    #[cfg(test)]
761    pub(crate) fn submit_sealed_async_for_test(
762        &self,
763        sealed: crate::write_txn::SealedCommit,
764    ) -> GraphResult<std::sync::mpsc::Receiver<GraphResult<crate::CommitOutcome>>> {
765        self.committer.handle().submit_commit_async_for_test(sealed)
766    }
767}
768
769impl SharedGraphBuilder {
770    /// Register an index provider.
771    ///
772    /// Providers are retained in registration order, which is the order used
773    /// for committed mutation delivery.
774    #[must_use]
775    pub fn with_provider(mut self, provider: Arc<dyn IndexProvider>) -> Self {
776        self.providers.push(provider);
777        self
778    }
779
780    /// Open a WAL file and route commits through the CORE durable provider.
781    ///
782    /// The path is the WAL file path, not a directory. Callers using the
783    /// conventional layout should pass `dir.join(selene_persist::DEFAULT_WAL_FILE_NAME)`.
784    ///
785    /// # SyncPolicy is OVERRIDDEN (v1.2 BRIEF 2 — read this)
786    ///
787    /// The single per-graph committer thread is the **sole fsync caller** for the
788    /// committer-managed WAL: it appends a contiguous run of commits with fsync
789    /// deferred, then issues exactly one [`WalWriter::flush`] per run (the R1
790    /// fsync-before-publish barrier). To make that the *only* fsync path, this
791    /// method **forces `config.sync_policy` to [`SyncPolicy::OnFlushOnly`]**
792    /// before opening the WAL — **whatever policy you pass is discarded.** The
793    /// fsync cadence is instead controlled by [`Self::with_commit_batching`]:
794    /// [`CommitBatching::Off`] (the default) fsyncs once per commit (behaviorally
795    /// identical to the old `EveryN(1)`), and [`CommitBatching::On`] coalesces a
796    /// contiguous run into one fsync. `config.snapshot_seq` is passed through
797    /// verbatim. Durability is unchanged: the committer always flushes before it
798    /// publishes or acks, so a commit is durable before it is ever visible.
799    ///
800    /// # Errors
801    ///
802    /// Returns [`GraphError::Persist`] when the WAL cannot be opened, including
803    /// when another writer already holds the file lock.
804    pub fn with_wal(mut self, path: impl AsRef<Path>, mut config: WalConfig) -> GraphResult<Self> {
805        // BRIEF 2: the committer owns fsync. Force OnFlushOnly before opening so
806        // the committer's group flush is the single durability barrier. Done
807        // before WalWriter::open so open-error timing (e.g. WriterLockHeld) is
808        // unchanged for existing .unwrap() call sites.
809        config.sync_policy = SyncPolicy::OnFlushOnly;
810        self.wal_writer = Some(WalWriter::open(path.as_ref(), config)?);
811        Ok(self)
812    }
813
814    /// Set the group-commit batching policy for the committer-managed WAL
815    /// (v1.2 BRIEF 2). Default [`CommitBatching::Off`].
816    ///
817    /// With [`CommitBatching::Off`] the committer fsyncs once per commit
818    /// (behaviorally identical to BRIEF 1). With [`CommitBatching::On`] it
819    /// coalesces up to `max_commits` (capped by aggregate `max_bytes`) contiguous
820    /// commits into one fsync — higher throughput + lower tail latency under
821    /// fan-in, at the cost of grouping several commits behind one barrier (all
822    /// still durable before any of them is acked or published). Has no effect
823    /// without [`Self::with_wal`] (no durable provider to flush).
824    #[must_use]
825    pub fn with_commit_batching(mut self, batching: CommitBatching) -> Self {
826        self.commit_batching = batching;
827        self
828    }
829
830    /// Attach a durable audit log at `path` (conventionally
831    /// `dir.join(selene_persist::DEFAULT_AUDIT_FILE_NAME)`).
832    ///
833    /// Engine-owned audit events committed through this graph are mirrored to
834    /// the audit log so they survive WAL-archive pruning (Item 7 / Seam D, D24).
835    /// Requires [`Self::with_wal`]: audit mirroring is part of the durable
836    /// commit path, so [`Self::build`] errors if an audit log is configured
837    /// without a WAL.
838    ///
839    /// # Errors
840    ///
841    /// Returns [`GraphError::Persist`] when the audit log cannot be opened.
842    pub fn with_audit_log(mut self, path: impl AsRef<Path>) -> GraphResult<Self> {
843        self.audit_log = Some(AuditLog::open(path.as_ref()).map_err(GraphError::Persist)?);
844        Ok(self)
845    }
846
847    /// Bind this graph to `type_def` at construction time.
848    ///
849    /// # Errors
850    ///
851    /// Returns [`GraphError::Inconsistent`] when the builder is already bound
852    /// or when `type_def` fails self-consistency validation.
853    pub fn bound_to(mut self, type_def: GraphTypeDef) -> GraphResult<Self> {
854        if self.graph.meta.bound_type.is_some() {
855            return Err(GraphError::Inconsistent {
856                reason: "graph builder is already bound to a graph type".to_owned(),
857            });
858        }
859        self.graph.meta.bound_type = Some(Arc::new(type_def.validate()?));
860        Ok(self)
861    }
862
863    /// Build shared graph state and validate provider registration.
864    ///
865    /// # Errors
866    ///
867    /// Returns [`GraphError::Provider`] when provider tags are duplicated.
868    pub fn build(self) -> GraphResult<SharedGraph> {
869        SharedGraph::from_graph_with_core_and_durables(
870            self.graph,
871            self.providers,
872            Vec::new(),
873            self.wal_writer,
874            self.audit_log,
875            self.commit_batching,
876        )
877    }
878}
879
880mod rebuild;
881pub(crate) use rebuild::{rebuild_derived_state, validate_unique_provider_tags};
882
883#[cfg(test)]
884mod compaction_tests;
885#[cfg(test)]
886#[path = "shared_property_tests.rs"]
887mod property_tests;
888#[cfg(test)]
889mod tests;