1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
//! Shared graph wrapper implementing lock-free reads and serialized writes.
use std::path::Path;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use arc_swap::ArcSwap;
use parking_lot::{Mutex, RwLock};
use selene_core::{Change, DbString, GraphId, HnswIndexConfig, SchemaChange};
use selene_persist::{AuditLog, SyncPolicy, WalConfig, WalWriter};
use crate::adjacency::AdjacencyEdge;
use crate::committer_batch::CommitBatching;
use crate::core_provider::{CoreProvider, DurableState};
use crate::durable_provider::DurableProvider;
use crate::error::{GraphError, GraphResult};
use crate::graph::{PropertyIndexEntry, SeleneGraph};
use crate::graph_types::GraphTypeDef;
use crate::id_allocator::IdAllocator;
use crate::index_provider::{IndexProvider, ProviderError, ProviderTag};
use crate::schema_index_kind::schema_kind_from;
use crate::store::{EdgeStore, RowIndex};
use crate::typed_index::TypedIndexKind;
use crate::vector_index::{
VectorIndexConfig, VectorIndexKind, VectorIndexMaintenancePolicy, VectorIndexRebuildReport,
};
use crate::write_txn::WriteTxn;
/// Per-graph shared runtime state.
///
/// Since v1.2 (BRIEF 1) every snapshot publish is funneled through a single
/// per-graph committer thread (`CommitterThread`), which is
/// the **sole writer** of the `snapshot` [`ArcSwap`] cell. `begin_write` hands
/// each [`WriteTxn`] a cheap submit handle; `commit`/`compact` seal-and-submit
/// to the committer and block until it publishes. This single-committer +
/// sole-publisher discipline is what preserves D10 strict-serializability once
/// `seal()` drops the write lock early — it is load-bearing and NOT
/// type-enforced (a second committer or ArcSwap writer would silently break it).
pub struct SharedGraph {
shared: Arc<RwLock<Arc<SeleneGraph>>>,
snapshot: Arc<ArcSwap<SeleneGraph>>,
schema_version: Arc<AtomicU64>,
allocator: Arc<Mutex<IdAllocator>>,
/// Fixed provider registry, frozen at construction. Shared as one
/// allocation so `begin_write` hands the registry to each transaction
/// with a single refcount bump instead of a per-transaction `Vec` clone.
providers: Arc<[Arc<dyn IndexProvider>]>,
durable_providers: Vec<Arc<dyn DurableProvider>>,
/// The single per-graph committer thread; sole publisher of `snapshot`.
/// Dropped last via [`SharedGraph`]'s implicit drop order, which joins the
/// thread once every outstanding [`WriteTxn`] submit handle is gone.
committer: crate::committer::CommitterThread,
}
/// Builder for a [`SharedGraph`] and its fixed provider registry.
pub struct SharedGraphBuilder {
graph: SeleneGraph,
providers: Vec<Arc<dyn IndexProvider>>,
wal_writer: Option<WalWriter>,
audit_log: Option<AuditLog>,
commit_batching: CommitBatching,
}
impl SharedGraph {
/// Construct an empty shared graph.
#[must_use]
pub fn new(graph_id: GraphId) -> Self {
Self::from_graph(SeleneGraph::new(graph_id))
}
/// Start building an empty shared graph with optional providers.
#[must_use]
pub fn builder(graph_id: GraphId) -> SharedGraphBuilder {
SharedGraphBuilder {
graph: SeleneGraph::new(graph_id),
providers: Vec::new(),
wal_writer: None,
audit_log: None,
commit_batching: CommitBatching::Off,
}
}
/// Construct shared state from a pre-built graph snapshot.
///
/// The allocator floors are derived from storage length so that stale
/// `GraphMeta.next_*_id` values cannot allow ID reuse over rows that
/// already exist (recovery hardening — spec 02 §4 forbids ID reuse).
///
/// # Panics
///
/// Panics if the supplied graph contains more than `u32::MAX` rows in
/// either store. Selene-graph's row index is `u32` by construction;
/// `SeleneGraph::new()` always satisfies this, and any caller-built
/// fixture must too. Use [`SharedGraph::try_from_graph`] for the
/// fallible variant when validating untrusted snapshots.
#[must_use]
pub fn from_graph(graph: SeleneGraph) -> Self {
Self::try_from_graph(graph).expect("graph store row count exceeds u32::MAX")
}
/// Fallible variant of [`SharedGraph::from_graph`]. Returns
/// [`GraphError::Inconsistent`] when the graph's stores exceed the
/// `u32` row capacity.
pub fn try_from_graph(graph: SeleneGraph) -> GraphResult<Self> {
Self::from_graph_with_core(graph, Vec::new())
}
/// Construct shared state from a graph snapshot and fixed provider list.
///
/// # Errors
///
/// Returns [`GraphError::Provider`] when two providers declare the same
/// [`ProviderTag`], and [`GraphError::Inconsistent`] when the graph's
/// stores exceed the `u32` row capacity.
pub fn from_graph_with_providers(
graph: SeleneGraph,
providers: Vec<Arc<dyn IndexProvider>>,
) -> GraphResult<Self> {
Self::from_graph_with_core(graph, providers)
}
/// Construct shared state from a graph snapshot and commit-critical WAL file.
///
/// Since v1.2 (BRIEF 2) the committer is the sole fsync caller, so the WAL is
/// **always** opened in [`SyncPolicy::OnFlushOnly`] regardless of the
/// `config.sync_policy` passed (it is overwritten before
/// [`WalWriter::open`]). This non-builder constructor uses
/// [`CommitBatching::Off`], so the committer still fsyncs once per commit —
/// behaviorally identical to BRIEF 1's `EveryN(1)`.
///
/// # Errors
///
/// Returns [`GraphError::Persist`] when the WAL cannot be opened, plus the
/// same consistency and provider-registration errors as [`Self::try_from_graph`].
pub fn from_graph_with_wal(
graph: SeleneGraph,
path: impl AsRef<Path>,
mut config: WalConfig,
) -> GraphResult<Self> {
// BRIEF 2: the committer owns fsync via flush_durables(); force the
// committer-managed WAL into OnFlushOnly before opening it (overwriting
// any caller policy), keeping open-error timing unchanged.
config.sync_policy = SyncPolicy::OnFlushOnly;
let writer = WalWriter::open(path.as_ref(), config)?;
Self::from_graph_with_core_and_durables(
graph,
Vec::new(),
Vec::new(),
Some(writer),
None,
CommitBatching::Off,
)
}
fn from_graph_with_core(
graph: SeleneGraph,
providers: Vec<Arc<dyn IndexProvider>>,
) -> GraphResult<Self> {
Self::from_graph_with_core_and_durables(
graph,
providers,
Vec::new(),
None,
None,
CommitBatching::Off,
)
}
pub(crate) fn from_graph_with_core_and_durables(
graph: SeleneGraph,
providers: Vec<Arc<dyn IndexProvider>>,
mut durable_providers: Vec<Arc<dyn DurableProvider>>,
wal_writer: Option<WalWriter>,
audit_log: Option<AuditLog>,
batching: CommitBatching,
) -> GraphResult<Self> {
if audit_log.is_some() && wal_writer.is_none() {
return Err(GraphError::Inconsistent {
reason: "audit log configured without a WAL; audit mirroring requires durable WAL \
state"
.to_owned(),
});
}
let snapshot = Arc::new(ArcSwap::from_pointee(graph.clone()));
let has_wal = wal_writer.is_some();
let durable = wal_writer
.map(DurableState::new)
.map(|durable| match audit_log {
Some(audit) => durable.with_audit_log(audit),
None => durable,
});
let core = CoreProvider::new_for_live_with_wal(Arc::clone(&snapshot), durable);
let mut all_providers = Vec::with_capacity(providers.len() + 1);
all_providers.push(core.clone() as Arc<dyn IndexProvider>);
all_providers.extend(providers);
if has_wal {
durable_providers.push(core as Arc<dyn DurableProvider>);
}
validate_unique_provider_tags(&all_providers)?;
Self::from_graph_parts_and_snapshot(
graph,
all_providers,
durable_providers,
snapshot,
batching,
)
}
pub(crate) fn from_graph_parts_and_snapshot(
graph: SeleneGraph,
providers: Vec<Arc<dyn IndexProvider>>,
durable_providers: Vec<Arc<dyn DurableProvider>>,
snapshot: Arc<ArcSwap<SeleneGraph>>,
batching: CommitBatching,
) -> GraphResult<Self> {
validate_unique_provider_tags(&providers)?;
// Freeze the registry into one shared allocation: the committer and
// every `begin_write` transaction clone the `Arc`, not the `Vec`.
let providers: Arc<[Arc<dyn IndexProvider>]> = providers.into();
let mut graph = graph;
rebuild_derived_state(&mut graph)?;
crate::property_index::rebuild_property_indexes(&mut graph)?;
crate::composite_property_index::rebuild_composite_property_indexes(&mut graph)?;
crate::vector_index::rebuild_vector_indexes(&mut graph)?;
crate::text_index::rebuild_text_indexes(&mut graph)?;
if let Some(type_def) = graph.meta.bound_type.as_deref() {
// Why: GraphMeta is publicly constructible, so SharedGraph::from_graph
// can land a malformed bound_type that bypassed builder().bound_to()'s
// validate(). Re-check self-consistency here so every constructor
// arrives at the same closed-graph admissibility contract.
type_def.validate_ref()?;
crate::type_validator::validate_entity_state(&graph, type_def)?;
}
let node_floor = (graph.node_store.labels.len() as u64).saturating_add(1);
let edge_floor = (graph.edge_store.label.len() as u64).saturating_add(1);
let allocator = IdAllocator::from_meta_with_floors(&graph.meta, node_floor, edge_floor);
// Debug-only structural net on the snapshot-load / recovery path: the
// rebuild_* helpers above re-derive all indexes from columns, so a
// rebuild bug would otherwise surface only as silent query
// corruption. Highest-value placement — verify the rebuilt snapshot
// before it is ever published. Compiled out in release builds.
#[cfg(debug_assertions)]
if let Err(reason) = graph.assert_indexes_consistent() {
return Err(GraphError::Inconsistent {
reason: format!("rebuilt snapshot failed index consistency check: {reason}"),
});
}
let graph = Arc::new(graph);
snapshot.store(Arc::clone(&graph));
let shared = Arc::new(RwLock::new(graph));
let schema_version = Arc::new(AtomicU64::new(0));
let allocator = Arc::new(Mutex::new(allocator));
// Spawn the single per-graph committer thread. It captures clones of
// every handle it needs to publish + compact; it is the sole writer of
// `snapshot`. All commit/compact/index-DDL publishes route through it.
let committer =
crate::committer::CommitterThread::spawn(crate::committer::CommitterHandles {
snapshot: Arc::clone(&snapshot),
schema_version: Arc::clone(&schema_version),
providers: Arc::clone(&providers),
durable_providers: durable_providers.clone(),
batching,
});
Ok(Self {
shared,
snapshot,
schema_version,
allocator,
providers,
durable_providers,
committer,
})
}
/// Load the current immutable snapshot without taking the write lock.
#[must_use]
pub fn read(&self) -> Arc<SeleneGraph> {
self.snapshot.load_full()
}
/// Return compaction pressure for the current published snapshot.
///
/// This is a lock-free read of row counts and liveness counters. It does not
/// compact, rebuild indexes, or take the writer lock.
#[must_use]
pub fn compaction_stats(&self) -> crate::compaction::CompactionStats {
self.read().compaction_stats()
}
/// Compact the live graph in place: reclaim every dead / hole row, renumber
/// rows dense, and atomically republish the result so the RAM held by deleted
/// rows is reclaimed immediately (BRIEF-Item-4c — the live-densify half of
/// snapshot-time compaction).
///
/// This is pure space reclamation: it changes only the internal row layout,
/// never external `NodeId`/`EdgeId`, properties, or labels, so it emits **no**
/// [`Change`] and writes **no** WAL entry. Durability
/// comes from the next snapshot, which encodes the now-dense live graph (the
/// CORE provider reads the same `snapshot` cell this method publishes into). A
/// crash before that snapshot simply reloads the pre-compaction state and
/// recompacts later — compaction can never lose data.
///
/// The dense graph is built under the write lock on the calling thread
/// (seal-and-handover, exactly like a commit), and is allocated a publish
/// `seal_seq` under that same lock; the single committer then swaps it into
/// the published `snapshot` cell strictly in `seal_seq` order. So compaction
/// serializes with writers exactly like a commit and can never be reordered
/// ahead of an earlier-sealed commit (which would let that commit's stale,
/// non-dense frozen snapshot clobber the dense one). Lock-free readers keep
/// observing the old snapshot until the dense graph is published. The
/// monotonic allocator high-water marks are preserved (the live allocator is
/// untouched, and [`compact_core`](crate::compact_core) carries `GraphMeta`
/// verbatim — and the allocator is kept in sync with `GraphMeta` on every
/// commit), so no external id is ever reused after a later recovery.
///
/// # Errors
///
/// Returns [`GraphError`] if the graph's id↔row mapping is corrupt or the
/// recompacted graph fails its consistency check (see
/// [`compact_core`](crate::compact_core)).
pub fn compact(&self) -> GraphResult<crate::CompactionReport> {
// Seal-and-handover for compaction (v1.2 BRIEF 1, P1 fix): build the
// dense graph HERE, on the caller thread, under the write lock — exactly
// like a commit seals under the lock — then hand the committer a
// pre-built dense snapshot to publish in seal_seq order. This keeps the
// committer off the write lock entirely (no deadlock surface) and, more
// importantly, ties compaction's publish position to a seal_seq taken
// under the same lock as commits, so a compact can never be reordered
// ahead of an earlier-sealed commit (which would otherwise let an
// earlier commit's stale, non-dense frozen snapshot clobber the dense
// one in the published cell).
//
// Ordering under the lock is load-bearing for the reorder buffer's
// gap-free invariant: densify FIRST (the only fallible step), and only
// THEN allocate the seal_seq, so a failed compaction consumes no
// sequence number (which would otherwise wedge the committer waiting for
// a seq that never arrives).
let committer = self.committer.handle();
let (seal_seq, dense, report) = {
let mut guard = self.shared.write();
let compacted = crate::compaction::compact_core(&guard)?;
let dense = Arc::new(compacted.graph);
// Allocate the publish-order key under the lock, after the fallible
// densify, so seal_seq order == lock-acquisition order and no seq is
// ever burned by a failed compaction.
let seal_seq = committer.next_seal_seq();
*guard = Arc::clone(&dense);
(seal_seq, dense, compacted.report)
// Lock released here, before the (blocking) enqueue + recv — the
// committer never needs the write lock, but releasing here also
// means a compactor never holds the lock while blocked on the
// committer.
};
committer.submit_compact(seal_seq, dense, report)
}
/// Rebuild every registered vector index from primary node values.
///
/// HNSW indexes retain stale deleted entries after vector update/delete so
/// in-flight search can still traverse the neighbor graph safely. This
/// maintenance path reclaims those stale entries by rebuilding only the
/// derived vector-index state; it does not change graph data, emit
/// [`Change`], write a WAL entry, bump schema epoch, or
/// notify providers. The HNSW graph is derived, not durable: snapshots and
/// recovery persist only vector-index registrations plus primary values, so
/// a reopen rebuilds the index from that authoritative state.
///
/// The rebuild is strict on live data: if an indexed row no longer satisfies
/// the registered vector dimension/metric invariant, this method returns an
/// error instead of silently dropping the row from the index.
pub fn rebuild_vector_indexes(&self) -> GraphResult<VectorIndexRebuildReport> {
let committer = self.committer.handle();
let (seal_seq, rebuilt, report) = {
let mut guard = self.shared.write();
let mut rebuilt = guard.as_ref().clone();
let report = crate::vector_index::rebuild_vector_indexes_strict(&mut rebuilt)?;
let rebuilt = Arc::new(rebuilt);
let seal_seq = committer.next_seal_seq();
*guard = Arc::clone(&rebuilt);
(seal_seq, rebuilt, report)
};
committer.submit_vector_index_rebuild(seal_seq, rebuilt, report)
}
/// Rebuild only vector indexes whose diagnostics recommend maintenance.
///
/// This is the bounded maintenance variant for IVF drift: it uses each index's current
/// [`ivf_rebuild_recommended`](crate::vector_index::VectorIndexMemoryUsage::ivf_rebuild_recommended)
/// value to decide whether to rebuild that derived index. Indexes that do not recommend rebuild
/// are left untouched, and a no-op call returns an empty report without publishing a maintenance
/// item.
///
/// The rebuild is strict on live data for selected indexes, matching
/// [`Self::rebuild_vector_indexes`].
pub fn rebuild_recommended_vector_indexes(&self) -> GraphResult<VectorIndexRebuildReport> {
self.maintain_vector_indexes(VectorIndexMaintenancePolicy::recommended())
}
/// Maintain recommended vector indexes under a caller-supplied policy.
///
/// This is the explicit orchestration API for amortized vector-index maintenance. It rebuilds
/// only indexes whose diagnostics currently recommend maintenance and applies the policy cap
/// after ordering recommended indexes by pending IVF retrain pressure. It remains a
/// maintenance-tier operation: reads never trigger it, and a no-op call returns an empty report
/// without publishing a derived-state replacement.
///
/// The rebuild is strict on live data for selected indexes, matching
/// [`Self::rebuild_vector_indexes`].
pub fn maintain_vector_indexes(
&self,
policy: VectorIndexMaintenancePolicy,
) -> GraphResult<VectorIndexRebuildReport> {
let committer = self.committer.handle();
let (seal_seq, rebuilt, report) = {
let mut guard = self.shared.write();
let mut rebuilt = guard.as_ref().clone();
let report = crate::vector_index::maintain_vector_indexes_strict(&mut rebuilt, policy)?;
if report.entries.is_empty() {
return Ok(report);
}
let rebuilt = Arc::new(rebuilt);
let seal_seq = committer.next_seal_seq();
*guard = Arc::clone(&rebuilt);
(seal_seq, rebuilt, report)
};
committer.submit_vector_index_rebuild(seal_seq, rebuilt, report)
}
/// Return the runtime schema-version epoch used for plan-cache invalidation.
///
/// The epoch starts at zero for each [`SharedGraph`] instance and advances
/// only after a successful commit whose change set contains
/// [`Change::SchemaChanged`].
#[must_use]
pub fn schema_version(&self) -> u64 {
self.schema_version.load(Ordering::Acquire)
}
/// Return the bound graph type, if this is a closed graph.
#[must_use]
pub fn graph_type(&self) -> Option<Arc<GraphTypeDef>> {
self.read().meta.bound_type.as_ref().map(Arc::clone)
}
/// Return true when this graph is bound to a closed graph type.
#[must_use]
pub fn is_closed(&self) -> bool {
self.read().meta.bound_type.is_some()
}
/// Look up a registered provider by tag.
#[must_use]
pub fn index_provider_by_tag(&self, tag: ProviderTag) -> Option<Arc<dyn IndexProvider>> {
self.providers
.iter()
.find_map(|provider| (provider.provider_tag() == tag).then(|| Arc::clone(provider)))
}
/// Borrow the fixed provider registry for executor procedure contexts.
#[must_use]
pub fn index_providers(&self) -> &[Arc<dyn IndexProvider>] {
&self.providers
}
/// Borrow the fixed commit-critical durable provider registry.
#[must_use]
pub fn durable_providers(&self) -> &[Arc<dyn DurableProvider>] {
&self.durable_providers
}
/// Register a built-in node property index for `(label, property)`.
///
/// The current node columns are scanned under the write lock and the
/// published snapshot is updated in one transaction.
///
/// # Errors
///
/// Returns [`GraphError::PropertyIndexAlreadyExists`] if the pair is
/// already registered, or [`GraphError::IndexValueRejected`] if any
/// existing node with `label` has a non-null value that does not match
/// `kind`.
pub fn create_property_index(
&self,
label: DbString,
property: DbString,
kind: TypedIndexKind,
) -> GraphResult<()> {
self.create_property_index_named(label, property, kind, None)
}
/// Register a built-in node property index with optional catalog name.
pub fn create_property_index_named(
&self,
label: DbString,
property: DbString,
kind: TypedIndexKind,
name: Option<DbString>,
) -> GraphResult<()> {
let mut txn = self.begin_write();
if txn
.read()
.property_index
.contains_key(&(label.clone(), property.clone()))
{
return Err(GraphError::PropertyIndexAlreadyExists { label, property });
}
let index = crate::property_index::build_property_index(
txn.read(),
label.clone(),
property.clone(),
kind,
)?;
txn.guard_mut().property_index.insert(
(label.clone(), property.clone()),
PropertyIndexEntry::new(index, name.clone()),
);
let graph = txn.read().graph_id();
txn.changes.push(Change::SchemaChanged {
graph,
change: SchemaChange::PropertyIndexCreatedNamed {
label,
property,
kind: schema_kind_from(kind),
name,
},
});
txn.commit()?;
Ok(())
}
/// Drop a built-in node property index.
///
/// The operation is idempotent; dropping an absent index succeeds without
/// publishing a new snapshot.
pub fn drop_property_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
let mut txn = self.begin_write();
if !txn
.read()
.property_index
.contains_key(&(label.clone(), property.clone()))
{
return Ok(());
}
txn.guard_mut()
.property_index
.remove(&(label.clone(), property.clone()));
let graph = txn.read().graph_id();
txn.changes.push(Change::SchemaChanged {
graph,
change: SchemaChange::PropertyIndexDropped { label, property },
});
txn.commit()?;
Ok(())
}
/// Register a built-in node vector index for `(label, property)`.
///
/// The current node columns are scanned under the write lock and the
/// published snapshot is updated in one transaction.
///
/// # Errors
///
/// Returns [`GraphError::VectorIndexAlreadyExists`] if the pair is already
/// registered, [`GraphError::VectorIndexInvalidDimension`] when `dimension`
/// is zero, or [`GraphError::VectorIndexValueRejected`] if any existing
/// node with `label` has a non-null value for `property` that is not a
/// vector with the declared dimension.
pub fn create_vector_index(
&self,
label: DbString,
property: DbString,
kind: VectorIndexKind,
dimension: u32,
) -> GraphResult<()> {
self.create_vector_index_named(label, property, kind, dimension, None)
}
/// Register a built-in node vector index with optional catalog name.
pub fn create_vector_index_named(
&self,
label: DbString,
property: DbString,
kind: VectorIndexKind,
dimension: u32,
name: Option<DbString>,
) -> GraphResult<()> {
self.create_vector_index_named_with_config(label, property, kind, dimension, name, None)
}
/// Register a built-in node vector index with optional HNSW construction config.
pub fn create_vector_index_named_with_config(
&self,
label: DbString,
property: DbString,
kind: VectorIndexKind,
dimension: u32,
name: Option<DbString>,
hnsw_config: Option<HnswIndexConfig>,
) -> GraphResult<()> {
self.create_vector_index_named_with_configs(
label,
property,
kind,
dimension,
name,
VectorIndexConfig::new(hnsw_config, None),
)
}
/// Register a built-in node vector index with optional ANN construction config.
pub fn create_vector_index_named_with_configs(
&self,
label: DbString,
property: DbString,
kind: VectorIndexKind,
dimension: u32,
name: Option<DbString>,
config: VectorIndexConfig,
) -> GraphResult<()> {
let mut txn = self.begin_write();
txn.mutator().create_vector_index_named_with_configs(
label, property, kind, dimension, name, config,
)?;
txn.commit()?;
Ok(())
}
/// Drop a built-in node vector index.
///
/// The operation is idempotent; dropping an absent index succeeds without
/// publishing a new snapshot.
pub fn drop_vector_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
let mut txn = self.begin_write();
txn.mutator().drop_vector_index(label, property)?;
txn.commit()?;
Ok(())
}
/// Register a built-in node text index for `(label, property)`.
///
/// The current node columns are scanned under the write lock and the
/// published snapshot is updated in one transaction.
///
/// # Errors
///
/// Returns [`GraphError::TextIndexAlreadyExists`] if the pair is already
/// registered, or [`GraphError::Inconsistent`] if index construction
/// observes corrupt graph columns.
pub fn create_text_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
self.create_text_index_named(label, property, None)
}
/// Register a built-in node text index with optional catalog name.
pub fn create_text_index_named(
&self,
label: DbString,
property: DbString,
name: Option<DbString>,
) -> GraphResult<()> {
let mut txn = self.begin_write();
txn.mutator()
.create_text_index_named(label, property, name)?;
txn.commit()?;
Ok(())
}
/// Drop a built-in node text index.
///
/// The operation is idempotent; dropping an absent index succeeds without
/// publishing a new snapshot.
pub fn drop_text_index(&self, label: DbString, property: DbString) -> GraphResult<()> {
let mut txn = self.begin_write();
txn.mutator().drop_text_index(label, property)?;
txn.commit()?;
Ok(())
}
/// Begin a write transaction by acquiring the single graph write lock.
///
/// Concurrent writers from other threads queue normally on the write
/// lock; the engine does **not** panic legitimate concurrent writes
/// during another commit's provider fanout.
///
/// Since v1.2 (BRIEF 1) the actual snapshot publish happens on the single
/// committer thread, not here: `WriteTxn::commit` seals under this lock,
/// releases it, and hands the frozen bundle to the committer. Provider
/// fan-out therefore now runs on the **committer thread**, so the
/// re-entrancy guard below protects the committer thread (sound with exactly
/// one committer — see `reentry.rs` and the v1.2 design §7.7).
///
/// # Panics
///
/// Panics when called from inside an [`IndexProvider`] callback **on
/// the committer thread** as the active fanout. Re-entrant writes from a
/// provider callback are unsupported; the committer is publishing, so a
/// nested write would recurse indefinitely. The panic is caught by the
/// committer's `notify_providers` boundary; provider state may drift, but
/// the commit still completes.
///
/// Cross-thread re-entry — a provider spawning a worker thread that
/// calls `begin_write` and waiting for it — is **documented misuse**
/// rather than a detectable footgun (the engine cannot trace causal
/// thread ancestry). See the module docs in `reentry.rs` and the
/// `IndexProvider` rustdoc for the contract.
#[must_use]
#[tracing::instrument(name = "selene.graph.begin_write", skip(self))]
pub fn begin_write(&self) -> WriteTxn<'_> {
if crate::reentry::in_fanout() {
panic!(
"selene-graph: SharedGraph::begin_write() called from within \
a provider fan-out callback on the committer thread; \
re-entrant writes from a provider callback are not supported. \
The committer's fan-out boundary will catch this panic; \
the commit succeeds, but the offending provider's \
chained mutation does not."
);
}
WriteTxn::new(
self.shared.write(),
self.committer.handle(),
self.allocator.lock(),
Arc::clone(&self.providers),
)
}
#[cfg(test)]
pub(crate) fn locked_arc_ptr_for_test(&self) -> *const SeleneGraph {
let guard = self.shared.read();
Arc::as_ptr(&*guard)
}
/// Read the generation of the **live RwLock graph** (`*shared`), as opposed
/// to the published `ArcSwap` snapshot. Used by divergence tests to assert
/// the two never disagree after a failed / cancelled commit (the P0
/// WAL-failure + cancel rollback invariants).
#[cfg(test)]
pub(crate) fn locked_generation_for_test(&self) -> u64 {
self.shared.read().meta.generation
}
/// Submit an already-[`seal`](crate::WriteTxn::seal)ed commit straight to the
/// committer, blocking until it is durable + visible. Test-only seam for
/// exercising the BRIEF-117 cancellation cut-line (which has no production
/// producer yet) without re-entering `commit_with_principal`.
#[cfg(test)]
pub(crate) fn submit_sealed_for_test(
&self,
sealed: crate::write_txn::SealedCommit,
) -> GraphResult<crate::CommitOutcome> {
self.committer.handle().submit_commit(sealed)
}
/// Enqueue a sealed commit and return its reply receiver without waiting.
#[cfg(test)]
pub(crate) fn submit_sealed_async_for_test(
&self,
sealed: crate::write_txn::SealedCommit,
) -> GraphResult<std::sync::mpsc::Receiver<GraphResult<crate::CommitOutcome>>> {
self.committer.handle().submit_commit_async_for_test(sealed)
}
}
impl SharedGraphBuilder {
/// Register an index provider.
///
/// Providers are retained in registration order, which is the order used
/// for committed mutation delivery.
#[must_use]
pub fn with_provider(mut self, provider: Arc<dyn IndexProvider>) -> Self {
self.providers.push(provider);
self
}
/// Open a WAL file and route commits through the CORE durable provider.
///
/// The path is the WAL file path, not a directory. Callers using the
/// conventional layout should pass `dir.join(selene_persist::DEFAULT_WAL_FILE_NAME)`.
///
/// # SyncPolicy is OVERRIDDEN (v1.2 BRIEF 2 — read this)
///
/// The single per-graph committer thread is the **sole fsync caller** for the
/// committer-managed WAL: it appends a contiguous run of commits with fsync
/// deferred, then issues exactly one [`WalWriter::flush`] per run (the R1
/// fsync-before-publish barrier). To make that the *only* fsync path, this
/// method **forces `config.sync_policy` to [`SyncPolicy::OnFlushOnly`]**
/// before opening the WAL — **whatever policy you pass is discarded.** The
/// fsync cadence is instead controlled by [`Self::with_commit_batching`]:
/// [`CommitBatching::Off`] (the default) fsyncs once per commit (behaviorally
/// identical to the old `EveryN(1)`), and [`CommitBatching::On`] coalesces a
/// contiguous run into one fsync. `config.snapshot_seq` is passed through
/// verbatim. Durability is unchanged: the committer always flushes before it
/// publishes or acks, so a commit is durable before it is ever visible.
///
/// # Errors
///
/// Returns [`GraphError::Persist`] when the WAL cannot be opened, including
/// when another writer already holds the file lock.
pub fn with_wal(mut self, path: impl AsRef<Path>, mut config: WalConfig) -> GraphResult<Self> {
// BRIEF 2: the committer owns fsync. Force OnFlushOnly before opening so
// the committer's group flush is the single durability barrier. Done
// before WalWriter::open so open-error timing (e.g. WriterLockHeld) is
// unchanged for existing .unwrap() call sites.
config.sync_policy = SyncPolicy::OnFlushOnly;
self.wal_writer = Some(WalWriter::open(path.as_ref(), config)?);
Ok(self)
}
/// Set the group-commit batching policy for the committer-managed WAL
/// (v1.2 BRIEF 2). Default [`CommitBatching::Off`].
///
/// With [`CommitBatching::Off`] the committer fsyncs once per commit
/// (behaviorally identical to BRIEF 1). With [`CommitBatching::On`] it
/// coalesces up to `max_commits` (capped by aggregate `max_bytes`) contiguous
/// commits into one fsync — higher throughput + lower tail latency under
/// fan-in, at the cost of grouping several commits behind one barrier (all
/// still durable before any of them is acked or published). Has no effect
/// without [`Self::with_wal`] (no durable provider to flush).
#[must_use]
pub fn with_commit_batching(mut self, batching: CommitBatching) -> Self {
self.commit_batching = batching;
self
}
/// Attach a durable audit log at `path` (conventionally
/// `dir.join(selene_persist::DEFAULT_AUDIT_FILE_NAME)`).
///
/// Engine-owned audit events committed through this graph are mirrored to
/// the audit log so they survive WAL-archive pruning (Item 7 / Seam D, D24).
/// Requires [`Self::with_wal`]: audit mirroring is part of the durable
/// commit path, so [`Self::build`] errors if an audit log is configured
/// without a WAL.
///
/// # Errors
///
/// Returns [`GraphError::Persist`] when the audit log cannot be opened.
pub fn with_audit_log(mut self, path: impl AsRef<Path>) -> GraphResult<Self> {
self.audit_log = Some(AuditLog::open(path.as_ref()).map_err(GraphError::Persist)?);
Ok(self)
}
/// Bind this graph to `type_def` at construction time.
///
/// # Errors
///
/// Returns [`GraphError::Inconsistent`] when the builder is already bound
/// or when `type_def` fails self-consistency validation.
pub fn bound_to(mut self, type_def: GraphTypeDef) -> GraphResult<Self> {
if self.graph.meta.bound_type.is_some() {
return Err(GraphError::Inconsistent {
reason: "graph builder is already bound to a graph type".to_owned(),
});
}
self.graph.meta.bound_type = Some(Arc::new(type_def.validate()?));
Ok(self)
}
/// Build shared graph state and validate provider registration.
///
/// # Errors
///
/// Returns [`GraphError::Provider`] when provider tags are duplicated.
pub fn build(self) -> GraphResult<SharedGraph> {
SharedGraph::from_graph_with_core_and_durables(
self.graph,
self.providers,
Vec::new(),
self.wal_writer,
self.audit_log,
self.commit_batching,
)
}
}
mod rebuild;
pub(crate) use rebuild::{rebuild_derived_state, validate_unique_provider_tags};
#[cfg(test)]
mod compaction_tests;
#[cfg(test)]
#[path = "shared_property_tests.rs"]
mod property_tests;
#[cfg(test)]
mod tests;