Skip to main content

lora_store/memory/
graph.rs

1//! The [`InMemoryGraph`] data structure: slot-indexed node/relationship
2//! storage, adjacency lists, label/type indexes, and the inherent
3//! helpers that the trait impls in `super::impls` delegate to.
4
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12    LoraPoint, MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue,
13    RelationshipId, RelationshipRecord,
14};
15
16use super::constraint_catalog::{
17    ConstraintCatalog, ConstraintRequest, CreateConstraintError, CreateConstraintOutcome,
18    DropConstraintError, DropConstraintOutcome,
19};
20use super::entity_index_store::IndexBundle;
21use super::fulltext_index::FulltextRegistry;
22use super::hnsw::HnswParams;
23use super::index_catalog::{
24    CreateIndexError, CreateIndexOutcome, DropIndexError, DropIndexOutcome, IndexCatalog,
25    IndexDefinition, IndexRequest, StoredIndexEntity, StoredIndexKind, StoredIndexState,
26};
27use super::point_index::PointRegistry;
28#[cfg(test)]
29use super::property_index::PropertyIndexState;
30use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
31use super::secondary_index_maintenance::SecondaryIndexMutation;
32use super::sorted_property_index::SortedPropertyIndex;
33use super::stats::GraphStats;
34use super::text_index::TrigramRegistry;
35use super::vector_index::{VectorIndexProvider, VectorIndexRegistry, VectorSimilarity};
36
37#[derive(Default)]
38pub struct InMemoryGraph {
39    pub(super) next_node_id: NodeId,
40    pub(super) next_rel_id: RelationshipId,
41
42    /// Slot-indexed node storage: `nodes[id as usize]` is the record at `id`.
43    /// `None` slots are tombstones from deletes (we don't compact). Because
44    /// `next_node_id` is monotonic the slot at `id` is initialized exactly
45    /// when `id < next_node_id` — same identity guarantee the previous
46    /// `BTreeMap<NodeId, NodeRecord>` had, just with O(1) lookup and
47    /// cache-coherent layout.
48    ///
49    /// Records are wrapped in `Arc` so [`Self::clone`] (called on every
50    /// auto-commit write to build a working copy) is `O(N)` atomic
51    /// refcount bumps instead of `O(N)` deep record clones — for a
52    /// 100k-node graph the difference is microseconds vs. tens of
53    /// milliseconds. Mutating a record uses `Arc::make_mut`, which
54    /// clones in place when the refcount is 1 (no concurrent reader)
55    /// and falls back to a single-record clone-on-write when readers
56    /// still hold a snapshot.
57    pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
58    pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
59    /// Live (non-tombstoned) counts kept in sync with `put_*`/`take_*` so
60    /// `node_count` / `relationship_count` stay O(1) — without a counter
61    /// they'd have to scan the slab.
62    pub(super) live_node_count: usize,
63    pub(super) live_rel_count: usize,
64
65    /// Adjacency keyed by NodeId. `outgoing[id]` is the list of relationship
66    /// ids that leave `id`; mirrored on `incoming[id]`. Inner `Vec` instead
67    /// of `BTreeSet` because edges are inserted exactly once and traversal
68    /// only needs sequential iteration; the cache-friendly contiguous layout
69    /// shows up on every traversal hop.
70    pub(super) outgoing: Vec<Vec<RelationshipId>>,
71    pub(super) incoming: Vec<Vec<RelationshipId>>,
72
73    // secondary indexes
74    /// Label -> the (unique, monotonic) node ids that carry it. The inner
75    /// `Vec` instead of `BTreeSet` because every node id is inserted at most
76    /// once per label (no dedup needed) and every consumer iterates the
77    /// whole list anyway — contiguous storage iterates faster than a
78    /// tree-of-pointers, and removes via `swap_remove` stay O(degree-of-label).
79    pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
80    pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
81
82    /// All index machinery — the declared-index catalog, hash-bucket
83    /// property registry, and the per-entity-kind secondary index
84    /// registries (text, sorted, point, fulltext) plus their active
85    /// counters — collapsed into one bundle. See [`IndexBundle`] for
86    /// the rationale. The bundle is a packaging-only abstraction:
87    /// every field accessed through `self.indexes.<x>` lives at the
88    /// same address it would have as a top-level field.
89    pub(super) indexes: IndexBundle,
90
91    /// Catalog of explicitly-created constraints (CREATE CONSTRAINT).
92    /// Deliberately not part of [`IndexBundle`] — constraints describe
93    /// data invariants, not indexed access. The fact that uniqueness /
94    /// key constraints back range indexes is handled in the
95    /// constraint code path, not by the bundle's layout.
96    pub(super) constraint_catalog: RwLock<ConstraintCatalog>,
97    /// Fast-path counter for mutation-time constraint checks. Most
98    /// workloads have no constraints installed; this lets the executor
99    /// skip taking the catalog lock in that case.
100    pub(super) active_constraints: AtomicUsize,
101
102    /// Optional mutation observer. When `Some`, every committed mutation
103    /// fans out to this recorder *after* the in-memory state has been
104    /// updated. The recorder is not part of the graph's identity, so Clone
105    /// and snapshot restore both reset it to `None`.
106    pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
107}
108
109impl std::fmt::Debug for InMemoryGraph {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("InMemoryGraph")
112            .field("next_node_id", &self.next_node_id)
113            .field("next_rel_id", &self.next_rel_id)
114            .field("nodes", &self.nodes)
115            .field("relationships", &self.relationships)
116            .field("outgoing", &self.outgoing)
117            .field("incoming", &self.incoming)
118            .field("nodes_by_label", &self.nodes_by_label)
119            .field("relationships_by_type", &self.relationships_by_type)
120            .field("indexes", &self.indexes)
121            .field(
122                "active_node_property_indexes",
123                &self.active_node_property_index_count(),
124            )
125            .field(
126                "active_relationship_property_indexes",
127                &self.active_relationship_property_index_count(),
128            )
129            .field(
130                "index_catalog_entries",
131                &self
132                    .indexes
133                    .catalog
134                    .read()
135                    .map(|c| c.list().len())
136                    .unwrap_or(0),
137            )
138            .field("active_constraints", &self.active_constraint_count())
139            .field(
140                "active_fulltext_indexes",
141                &self.active_fulltext_index_count(),
142            )
143            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
144            .finish()
145    }
146}
147
148impl Clone for InMemoryGraph {
149    fn clone(&self) -> Self {
150        // Deliberately drop the recorder on clone: a cloned store is a
151        // separate identity; it should not silently share the observer.
152        Self {
153            next_node_id: self.next_node_id,
154            next_rel_id: self.next_rel_id,
155            nodes: self.nodes.clone(),
156            relationships: self.relationships.clone(),
157            live_node_count: self.live_node_count,
158            live_rel_count: self.live_rel_count,
159            outgoing: self.outgoing.clone(),
160            incoming: self.incoming.clone(),
161            nodes_by_label: self.nodes_by_label.clone(),
162            relationships_by_type: self.relationships_by_type.clone(),
163            // IndexBundle::clone deep-copies every owned registry under
164            // its locks, mirroring what the old per-field clones did.
165            // The hash-bucket registry skip-on-empty optimisation is
166            // preserved: `PropertyIndexRegistry::clone` itself is cheap
167            // when no entries exist.
168            indexes: self.indexes.clone(),
169            constraint_catalog: RwLock::new(self.constraint_catalog_read().clone()),
170            active_constraints: AtomicUsize::new(self.active_constraint_count()),
171            recorder: None,
172        }
173    }
174}
175
176impl InMemoryGraph {
177    pub fn new() -> Self {
178        Self::default()
179    }
180
181    pub fn with_capacity_hint(nodes: usize, relationships: usize) -> Self {
182        Self {
183            nodes: Vec::with_capacity(nodes),
184            relationships: Vec::with_capacity(relationships),
185            outgoing: Vec::with_capacity(nodes),
186            incoming: Vec::with_capacity(nodes),
187            ..Self::default()
188        }
189    }
190
191    pub fn contains_node(&self, node_id: NodeId) -> bool {
192        self.node_at(node_id).is_some()
193    }
194
195    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
196        self.rel_at(rel_id).is_some()
197    }
198
199    /// Install (or clear) the mutation recorder. Passing `None` detaches any
200    /// currently-installed recorder. The recorder observes every committed
201    /// mutation *after* it has been applied.
202    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
203        self.recorder = recorder;
204    }
205
206    /// Handle to the currently-installed recorder, if any.
207    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
208        self.recorder.as_ref()
209    }
210
211    /// Emit a mutation event only if a recorder is installed. The event is
212    /// built lazily — callers pass a closure, so when no recorder is
213    /// attached we pay only a `None` check and the cost of constructing the
214    /// event (labels/properties clones) is avoided.
215    #[inline]
216    pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
217        if let Some(rec) = &self.recorder {
218            rec.record(build());
219        }
220    }
221
222    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
223        let next = id
224            .checked_add(1)
225            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
226        self.next_node_id = self.next_node_id.max(next);
227        Ok(())
228    }
229
230    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
231        let next = id
232            .checked_add(1)
233            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
234        self.next_rel_id = self.next_rel_id.max(next);
235        Ok(())
236    }
237
238    pub(super) fn try_reserve_next_node_slot(&mut self) -> Option<(NodeId, usize)> {
239        let id = self.next_node_id;
240        let idx = self.ensure_node_slot_checked(id).ok()?;
241        self.bump_next_node_id_past(id).ok()?;
242        Some((id, idx))
243    }
244
245    pub(super) fn try_reserve_next_rel_slot(&mut self) -> Option<(RelationshipId, usize)> {
246        let id = self.next_rel_id;
247        let idx = self.ensure_rel_slot_checked(id).ok()?;
248        self.bump_next_rel_id_past(id).ok()?;
249        Some((id, idx))
250    }
251
252    // ---------- Slab access helpers ----------
253    //
254    // Stand-in for the BTreeMap API the previous storage used. They keep the
255    // call sites readable while the underlying layout is positional Vec.
256
257    #[inline]
258    pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
259        self.nodes
260            .get(Self::slot_index(id)?)
261            .and_then(|s| s.as_ref())
262            .map(|arc| arc.as_ref())
263    }
264
265    /// Mutable handle to a node record, doing copy-on-write only when the
266    /// `Arc` is shared with a concurrent reader. With no readers (the
267    /// common case after a fresh write_store clone), `Arc::make_mut`
268    /// upgrades in place — no record clone.
269    #[inline]
270    pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
271        self.nodes
272            .get_mut(Self::slot_index(id)?)
273            .and_then(|s| s.as_mut())
274            .map(Arc::make_mut)
275    }
276
277    #[inline]
278    pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
279        self.relationships
280            .get(Self::slot_index(id)?)
281            .and_then(|s| s.as_ref())
282            .map(|arc| arc.as_ref())
283    }
284
285    #[inline]
286    pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
287        self.relationships
288            .get_mut(Self::slot_index(id)?)
289            .and_then(|s| s.as_mut())
290            .map(Arc::make_mut)
291    }
292
293    /// Resize the node-keyed Vecs so `id as usize` is in range. Adjacency
294    /// lists are kept in lockstep with `nodes`, so a freshly-grown slot has
295    /// empty outgoing/incoming Vecs ready to receive edges.
296    fn slot_len_for_id(id: u64, kind: &str) -> Result<usize, String> {
297        let idx = usize::try_from(id)
298            .map_err(|_| format!("{kind} id {id} does not fit in usize on this platform"))?;
299        idx.checked_add(1)
300            .ok_or_else(|| format!("{kind} id {id} leaves no valid slab slot"))
301    }
302
303    #[inline]
304    fn slot_index(id: u64) -> Option<usize> {
305        usize::try_from(id).ok()
306    }
307
308    fn ensure_node_slot_checked(&mut self, id: NodeId) -> Result<usize, String> {
309        let target = Self::slot_len_for_id(id, "node")?;
310        if self.nodes.len() < target {
311            let additional = target - self.nodes.len();
312            self.nodes.try_reserve_exact(additional).map_err(|e| {
313                format!("node id {id} requires {target} slots, but allocation failed: {e}")
314            })?;
315            self.outgoing.try_reserve_exact(additional).map_err(|e| {
316                format!(
317                    "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
318                )
319            })?;
320            self.incoming.try_reserve_exact(additional).map_err(|e| {
321                format!(
322                    "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
323                )
324            })?;
325            self.nodes.resize_with(target, || None);
326            self.outgoing.resize_with(target, Vec::new);
327            self.incoming.resize_with(target, Vec::new);
328        }
329        Ok(target - 1)
330    }
331
332    fn ensure_rel_slot_checked(&mut self, id: RelationshipId) -> Result<usize, String> {
333        let target = Self::slot_len_for_id(id, "relationship")?;
334        if self.relationships.len() < target {
335            self.relationships
336                .try_reserve_exact(target - self.relationships.len())
337                .map_err(|e| {
338                    format!(
339                        "relationship id {id} requires {target} slots, but allocation failed: {e}"
340                    )
341                })?;
342            self.relationships.resize_with(target, || None);
343        }
344        Ok(target - 1)
345    }
346
347    pub(super) fn put_node_checked(&mut self, id: NodeId, node: NodeRecord) -> Result<(), String> {
348        let idx = self.ensure_node_slot_checked(id)?;
349        self.put_node_at_slot(idx, node);
350        Ok(())
351    }
352
353    pub(super) fn put_rel_checked(
354        &mut self,
355        id: RelationshipId,
356        rel: RelationshipRecord,
357    ) -> Result<(), String> {
358        let idx = self.ensure_rel_slot_checked(id)?;
359        self.put_rel_at_slot(idx, rel);
360        Ok(())
361    }
362
363    pub(super) fn put_node_at_slot(&mut self, idx: usize, node: NodeRecord) {
364        let was_present = self.nodes[idx].is_some();
365        self.nodes[idx] = Some(Arc::new(node));
366        if !was_present {
367            self.live_node_count += 1;
368        }
369    }
370
371    pub(super) fn put_rel_at_slot(&mut self, idx: usize, rel: RelationshipRecord) {
372        let was_present = self.relationships[idx].is_some();
373        self.relationships[idx] = Some(Arc::new(rel));
374        if !was_present {
375            self.live_rel_count += 1;
376        }
377    }
378
379    pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
380        let idx = Self::slot_index(id)?;
381        let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
382        if removed.is_some() {
383            self.live_node_count -= 1;
384            // Also clear the per-id adjacency entries so the memory is reclaimed
385            // on the typical "delete every node" pattern. We deliberately do not
386            // shrink the outer Vec — leaving the slot lets new ids reuse the
387            // same index without growth churn (and `next_node_id` is monotonic
388            // anyway, so no immediate reuse).
389            if let Some(out) = self.outgoing.get_mut(idx) {
390                out.clear();
391            }
392            if let Some(inc) = self.incoming.get_mut(idx) {
393                inc.clear();
394            }
395        }
396        // Unwrap the Arc — `try_unwrap` returns the inner `NodeRecord`
397        // without cloning when our slab held the only reference, falling
398        // back to a clone only when concurrent readers still hold a
399        // snapshot Arc.
400        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
401    }
402
403    pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
404        let idx = Self::slot_index(id)?;
405        let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
406        if removed.is_some() {
407            self.live_rel_count -= 1;
408        }
409        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
410    }
411
412    #[inline]
413    pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
414        self.outgoing.get(Self::slot_index(id)?).map(Vec::as_slice)
415    }
416
417    #[inline]
418    pub(super) fn incoming_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
419        self.incoming.get(Self::slot_index(id)?).map(Vec::as_slice)
420    }
421
422    #[inline]
423    fn try_for_each_adjacent_slice<F, E>(
424        &self,
425        node_id: NodeId,
426        types: &[String],
427        adj: &[RelationshipId],
428        skip_self_loops: bool,
429        visit: &mut F,
430    ) -> Result<(), E>
431    where
432        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
433    {
434        let single_type = match types {
435            [single] => Some(single.as_str()),
436            _ => None,
437        };
438        let has_type_filter = !types.is_empty();
439
440        for &rel_id in adj {
441            let Some(rel) = self.rel_at(rel_id) else {
442                continue;
443            };
444            if skip_self_loops && rel.src == node_id && rel.dst == node_id {
445                continue;
446            }
447            if let Some(single) = single_type {
448                if rel.rel_type != single {
449                    continue;
450                }
451            } else if has_type_filter && !types.iter().any(|t| t == &rel.rel_type) {
452                continue;
453            }
454            let Some(other_id) = Self::other_endpoint(rel, node_id) else {
455                continue;
456            };
457            visit(rel_id, other_id)?;
458        }
459        Ok(())
460    }
461
462    #[inline]
463    pub(super) fn try_for_each_adjacent_id_unchecked<F, E>(
464        &self,
465        node_id: NodeId,
466        direction: Direction,
467        types: &[String],
468        mut visit: F,
469    ) -> Result<(), E>
470    where
471        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
472    {
473        match direction {
474            Direction::Right => {
475                if let Some(adj) = self.outgoing_at(node_id) {
476                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
477                }
478            }
479            Direction::Left => {
480                if let Some(adj) = self.incoming_at(node_id) {
481                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
482                }
483            }
484            Direction::Undirected => {
485                if let Some(adj) = self.outgoing_at(node_id) {
486                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
487                }
488                if let Some(adj) = self.incoming_at(node_id) {
489                    self.try_for_each_adjacent_slice(node_id, types, adj, true, &mut visit)?;
490                }
491            }
492        }
493
494        Ok(())
495    }
496
497    #[inline]
498    pub(super) fn try_for_each_adjacent_id<F, E>(
499        &self,
500        node_id: NodeId,
501        direction: Direction,
502        types: &[String],
503        visit: F,
504    ) -> Result<(), E>
505    where
506        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
507    {
508        if self.node_at(node_id).is_none() {
509            return Ok(());
510        }
511        self.try_for_each_adjacent_id_unchecked(node_id, direction, types, visit)
512    }
513
514    pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
515        self.nodes
516            .iter()
517            .enumerate()
518            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
519    }
520
521    pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
522        self.nodes
523            .iter()
524            .filter_map(|s| s.as_ref())
525            .map(|arc| arc.as_ref())
526    }
527
528    pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
529        self.relationships
530            .iter()
531            .enumerate()
532            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
533    }
534
535    pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
536        self.relationships
537            .iter()
538            .filter_map(|s| s.as_ref())
539            .map(|arc| arc.as_ref())
540    }
541
542    pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
543        self.nodes
544            .iter()
545            .enumerate()
546            .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
547    }
548
549    pub(super) fn iter_rels(
550        &self,
551    ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
552        self.relationships
553            .iter()
554            .enumerate()
555            .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
556    }
557
558    /// Add `rel_id` to `node_id`'s outgoing list. Relies on the monotonic-id
559    /// invariant: relationship ids are allocated once and never re-used, so
560    /// the bucket can never see a duplicate.
561    fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
562        if let Ok(idx) = self.ensure_node_slot_checked(node_id) {
563            self.outgoing[idx].push(rel_id);
564        }
565    }
566
567    fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
568        if let Ok(idx) = self.ensure_node_slot_checked(node_id) {
569            self.incoming[idx].push(rel_id);
570        }
571    }
572
573    /// Remove `rel_id` from `node_id`'s outgoing list. `swap_remove` keeps
574    /// the operation O(1) — adjacency order doesn't carry semantic meaning.
575    fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
576        if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.outgoing.get_mut(idx)) {
577            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
578                v.swap_remove(pos);
579            }
580        }
581    }
582
583    fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
584        if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.incoming.get_mut(idx)) {
585            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
586                v.swap_remove(pos);
587            }
588        }
589    }
590
591    pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
592        let mut seen = BTreeSet::new();
593
594        labels
595            .into_iter()
596            .map(|s| s.trim().to_string())
597            .filter(|s| !s.is_empty())
598            .filter(|s| seen.insert(s.clone()))
599            .collect()
600    }
601
602    pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
603        // Hot path: skip the `String` alloc when the label bucket already
604        // exists. The monotonic-id invariant on the create path guarantees
605        // `node_id` is unique, so we push unconditionally; the previous
606        // `contains` guard turned bulk CREATE into O(n²).
607        if let Some(bucket) = self.nodes_by_label.get_mut(label) {
608            bucket.push(node_id);
609        } else {
610            self.nodes_by_label.insert(label.to_string(), vec![node_id]);
611        }
612    }
613
614    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
615        if let Some(ids) = self.nodes_by_label.get_mut(label) {
616            if let Some(pos) = ids.iter().position(|&id| id == node_id) {
617                ids.swap_remove(pos);
618            }
619            if ids.is_empty() {
620                self.nodes_by_label.remove(label);
621            }
622        }
623    }
624
625    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
626        // See `insert_node_label_index` for the same hot-path rationale.
627        if let Some(bucket) = self.relationships_by_type.get_mut(rel_type) {
628            bucket.push(rel_id);
629        } else {
630            self.relationships_by_type
631                .insert(rel_type.to_string(), vec![rel_id]);
632        }
633    }
634
635    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
636        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
637            if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
638                ids.swap_remove(pos);
639            }
640            if ids.is_empty() {
641                self.relationships_by_type.remove(rel_type);
642            }
643        }
644    }
645
646    pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
647        self.indexes
648            .properties
649            .read()
650            .unwrap_or_else(|poisoned| poisoned.into_inner())
651    }
652
653    pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
654        self.indexes
655            .properties
656            .write()
657            .unwrap_or_else(|poisoned| poisoned.into_inner())
658    }
659
660    pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
661        self.indexes
662            .properties
663            .get_mut()
664            .unwrap_or_else(|poisoned| poisoned.into_inner())
665    }
666
667    #[inline]
668    pub(super) fn active_node_property_index_count(&self) -> usize {
669        self.indexes
670            .active_node_property_indexes
671            .load(Ordering::Relaxed)
672    }
673
674    #[inline]
675    pub(super) fn active_relationship_property_index_count(&self) -> usize {
676        self.indexes
677            .active_relationship_property_indexes
678            .load(Ordering::Relaxed)
679    }
680
681    #[inline]
682    pub(super) fn active_constraint_count(&self) -> usize {
683        self.active_constraints.load(Ordering::Relaxed)
684    }
685
686    #[inline]
687    pub(super) fn has_active_constraints(&self) -> bool {
688        self.active_constraint_count() != 0
689    }
690
691    #[inline]
692    pub(super) fn active_fulltext_index_count(&self) -> usize {
693        self.indexes.active_fulltext_indexes.load(Ordering::Relaxed)
694    }
695
696    #[inline]
697    pub(super) fn has_active_fulltext_indexes(&self) -> bool {
698        self.active_fulltext_index_count() != 0
699    }
700
701    pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
702        self.active_node_property_index_count() != 0
703            && self.indexes_mut().node_properties.is_active(key)
704    }
705
706    pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
707        self.active_relationship_property_index_count() != 0
708            && self.indexes_mut().relationship_properties.is_active(key)
709    }
710
711    pub(super) fn ensure_node_property_index(&self, key: &str) {
712        {
713            let indexes = self.indexes_read();
714            if indexes.node_properties.is_active(key) {
715                return;
716            }
717        }
718
719        let mut indexes = self.indexes_write();
720        if indexes.node_properties.is_active(key) {
721            return;
722        }
723
724        for (id, node) in self.iter_nodes() {
725            if let Some(value) = node.properties.get(key) {
726                indexes.node_properties.insert_with_scopes(
727                    id,
728                    node.labels.iter().map(String::as_str),
729                    key,
730                    value,
731                );
732            }
733        }
734        if indexes.node_properties.activate(key) {
735            self.indexes
736                .active_node_property_indexes
737                .fetch_add(1, Ordering::Relaxed);
738        }
739    }
740
741    pub(super) fn ensure_relationship_property_index(&self, key: &str) {
742        {
743            let indexes = self.indexes_read();
744            if indexes.relationship_properties.is_active(key) {
745                return;
746            }
747        }
748
749        let mut indexes = self.indexes_write();
750        if indexes.relationship_properties.is_active(key) {
751            return;
752        }
753
754        for (id, rel) in self.iter_rels() {
755            if let Some(value) = rel.properties.get(key) {
756                indexes.relationship_properties.insert_with_scopes(
757                    id,
758                    [rel.rel_type.as_str()],
759                    key,
760                    value,
761                );
762            }
763        }
764        if indexes.relationship_properties.activate(key) {
765            self.indexes
766                .active_relationship_property_indexes
767                .fetch_add(1, Ordering::Relaxed);
768        }
769    }
770
771    pub(super) fn index_catalog_read(&self) -> std::sync::RwLockReadGuard<'_, IndexCatalog> {
772        self.indexes
773            .catalog
774            .read()
775            .unwrap_or_else(|poisoned| poisoned.into_inner())
776    }
777
778    pub(super) fn index_catalog_write(&self) -> RwLockWriteGuard<'_, IndexCatalog> {
779        self.indexes
780            .catalog
781            .write()
782            .unwrap_or_else(|poisoned| poisoned.into_inner())
783    }
784
785    pub(super) fn constraint_catalog_read(
786        &self,
787    ) -> std::sync::RwLockReadGuard<'_, ConstraintCatalog> {
788        self.constraint_catalog
789            .read()
790            .unwrap_or_else(|poisoned| poisoned.into_inner())
791    }
792
793    pub(super) fn constraint_catalog_write(&self) -> RwLockWriteGuard<'_, ConstraintCatalog> {
794        self.constraint_catalog
795            .write()
796            .unwrap_or_else(|poisoned| poisoned.into_inner())
797    }
798
799    /// Register an explicitly-declared index in the catalog and, when
800    /// applicable, force the underlying property-index buckets to be
801    /// populated so equality lookups can use them immediately.
802    ///
803    /// Named with a `register_` prefix to avoid colliding with the
804    /// trait method `GraphStorageMut::create_index` — the trait impl
805    /// in `impls.rs` delegates here.
806    #[allow(clippy::result_large_err)]
807    pub(super) fn register_index(
808        &self,
809        request: IndexRequest,
810        if_not_exists: bool,
811    ) -> Result<CreateIndexOutcome, CreateIndexError> {
812        self.register_index_with_recording(request, if_not_exists, true)
813    }
814
815    #[allow(clippy::result_large_err)]
816    fn register_index_with_recording(
817        &self,
818        request: IndexRequest,
819        if_not_exists: bool,
820        record_event: bool,
821    ) -> Result<CreateIndexOutcome, CreateIndexError> {
822        let request_for_event = record_event.then(|| request.clone());
823        let outcome = {
824            let mut catalog = self.index_catalog_write();
825            catalog.try_create(request, if_not_exists)?
826        };
827
828        if let CreateIndexOutcome::Created(def) = &outcome {
829            self.populate_index_data(def);
830        }
831
832        // Both Created and NoOpExists are committed catalog states; we
833        // log only Created because NoOpExists implies a redundant DDL
834        // that adds nothing to durable state.
835        if matches!(outcome, CreateIndexOutcome::Created(_)) {
836            if let Some(request_for_event) = request_for_event {
837                self.emit(|| crate::MutationEvent::CreateIndex {
838                    request: request_for_event,
839                    if_not_exists,
840                });
841            }
842        }
843
844        Ok(outcome)
845    }
846
847    /// Replay a CreateIndex event against an empty graph. Mirrors the
848    /// `replay_create_node` shape: callers must invoke before installing
849    /// a recorder so we don't re-emit during recovery.
850    #[doc(hidden)]
851    pub fn replay_create_index(
852        &mut self,
853        request: IndexRequest,
854        if_not_exists: bool,
855    ) -> Result<(), String> {
856        if self.recorder.is_some() {
857            return Err("cannot replay create_index while a mutation recorder is installed".into());
858        }
859        self.register_index(request, if_not_exists)
860            .map(|_| ())
861            .map_err(|e| e.to_string())
862    }
863
864    /// Replay a DropIndex event.
865    #[doc(hidden)]
866    pub fn replay_drop_index(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
867        if self.recorder.is_some() {
868            return Err("cannot replay drop_index while a mutation recorder is installed".into());
869        }
870        self.drop_named_index(name, if_exists)
871            .map(|_| ())
872            .map_err(|e| e.to_string())
873    }
874
875    /// Register a constraint. For uniqueness/key kinds this also
876    /// registers a backing RANGE index in the index catalog under the
877    /// same name. Validation of existing data is the caller's
878    /// responsibility (the enforcement layer runs a pre-create scan
879    /// before this method commits).
880    pub(super) fn register_constraint(
881        &self,
882        request: ConstraintRequest,
883        if_not_exists: bool,
884    ) -> Result<CreateConstraintOutcome, CreateConstraintError> {
885        // Constraint-level conflicts (22N65/66/67) take precedence over
886        // index-catalog conflicts: if the request collides with an
887        // existing *constraint* shape or name, we never get to the
888        // backing-index step.
889        {
890            let constraint_catalog = self.constraint_catalog_read();
891            if let Some(existing) = constraint_catalog.find_equivalent(&request) {
892                let cloned = existing.clone();
893                drop(constraint_catalog);
894                if if_not_exists {
895                    return Ok(CreateConstraintOutcome::NoOpExists(cloned));
896                }
897                return Err(CreateConstraintError::EquivalentConstraintExists(
898                    cloned.name,
899                ));
900            }
901            if let Some(existing) = constraint_catalog.get(&request.name) {
902                let cloned = existing.clone();
903                drop(constraint_catalog);
904                if if_not_exists {
905                    return Ok(CreateConstraintOutcome::NoOpExists(cloned));
906                }
907                return Err(CreateConstraintError::DuplicateName(cloned.name));
908            }
909            if let Some(existing) = constraint_catalog.find_same_schema(&request) {
910                let cloned = existing.clone();
911                drop(constraint_catalog);
912                if super::constraint_catalog::kinds_conflict_for_validation(
913                    &cloned.kind,
914                    &request.kind,
915                ) {
916                    if if_not_exists {
917                        return Ok(CreateConstraintOutcome::NoOpExists(cloned));
918                    }
919                    return Err(CreateConstraintError::ConflictingConstraint(cloned.name));
920                }
921            }
922        }
923
924        // Index-catalog conflicts only matter for constraints that need
925        // a backing range index. The catalog won't yet own one for this
926        // request — that registration happens below — so any existing
927        // entry under the same name or schema is from a foreign index.
928        if request.kind.requires_backing_index() {
929            let idx_catalog = self.index_catalog_read();
930            if idx_catalog.get(&request.name).is_some() {
931                return Err(CreateConstraintError::DuplicateIndexName(
932                    request.name.clone(),
933                ));
934            }
935            let conflict = idx_catalog.list().into_iter().find(|def| {
936                def.kind == StoredIndexKind::Range
937                    && def.entity == request.entity
938                    && def.label.as_deref() == Some(request.label.as_str())
939                    && def.properties == request.properties
940                    && def.name != request.name
941            });
942            drop(idx_catalog);
943            if let Some(def) = conflict {
944                return Err(CreateConstraintError::BackingIndexConflict(format!(
945                    "(:{} {{{}}}) already covered by index `{}`",
946                    request.label,
947                    request.properties.join(", "),
948                    def.name,
949                )));
950            }
951        }
952
953        let owns_backing = request.kind.requires_backing_index();
954        let request_for_event = request.clone();
955        let outcome = {
956            let mut catalog = self.constraint_catalog_write();
957            catalog.try_create(request, if_not_exists)?
958        };
959
960        if let CreateConstraintOutcome::Created(def) = &outcome {
961            // Pre-create data scan: if the live graph already violates
962            // the constraint, fail and roll back the catalog write.
963            // Creating constraints when conflicting data exists fails
964            // before the catalog change is retained (22N77/79/80).
965            if let Err(violation) = self.validate_existing_data_for_constraint(def) {
966                let mut catalog = self.constraint_catalog_write();
967                let _ = catalog.try_drop(&def.name, true);
968                return Err(CreateConstraintError::DataViolation(violation.to_string()));
969            }
970        }
971
972        if let CreateConstraintOutcome::Created(def) = &outcome {
973            if owns_backing {
974                // Register a backing RANGE index under the same name. This
975                // is an implementation detail of the constraint, so WAL and
976                // snapshot replay record only the constraint mutation.
977                let idx_request = IndexRequest {
978                    explicit_name: Some(def.name.clone()),
979                    kind: StoredIndexKind::Range,
980                    entity: def.entity,
981                    label: Some(def.label.clone()),
982                    additional_labels: Vec::new(),
983                    properties: def.properties.clone(),
984                    options: Default::default(),
985                };
986                // Errors from the backing-index registration unwind the
987                // constraint registration to keep the two catalogs in
988                // step.
989                if let Err(err) = self.register_index_with_recording(idx_request, true, false) {
990                    let mut catalog = self.constraint_catalog_write();
991                    let _ = catalog.try_drop(&def.name, true);
992                    return Err(CreateConstraintError::BackingIndexConflict(err.to_string()));
993                }
994            }
995            self.emit(|| crate::MutationEvent::CreateConstraint {
996                request: request_for_event,
997                if_not_exists,
998            });
999            self.active_constraints.fetch_add(1, Ordering::Relaxed);
1000        }
1001
1002        Ok(outcome)
1003    }
1004
1005    /// Replay a CreateConstraint event against a recorder-detached graph.
1006    #[doc(hidden)]
1007    pub fn replay_create_constraint(
1008        &mut self,
1009        request: ConstraintRequest,
1010        if_not_exists: bool,
1011    ) -> Result<(), String> {
1012        if self.recorder.is_some() {
1013            return Err(
1014                "cannot replay create_constraint while a mutation recorder is installed".into(),
1015            );
1016        }
1017        self.register_constraint(request, if_not_exists)
1018            .map(|_| ())
1019            .map_err(|e| e.to_string())
1020    }
1021
1022    /// Replay a DropConstraint event.
1023    #[doc(hidden)]
1024    pub fn replay_drop_constraint(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
1025        if self.recorder.is_some() {
1026            return Err(
1027                "cannot replay drop_constraint while a mutation recorder is installed".into(),
1028            );
1029        }
1030        self.drop_named_constraint(name, if_exists)
1031            .map(|_| ())
1032            .map_err(|e| e.to_string())
1033    }
1034
1035    /// Inverse of [`Self::register_constraint`]. Cascades to the backing
1036    /// range index when one is owned.
1037    pub(super) fn drop_named_constraint(
1038        &self,
1039        name: &str,
1040        if_exists: bool,
1041    ) -> Result<DropConstraintOutcome, DropConstraintError> {
1042        let outcome = {
1043            let mut catalog = self.constraint_catalog_write();
1044            catalog.try_drop(name, if_exists)?
1045        };
1046        if let DropConstraintOutcome::Dropped(def) = &outcome {
1047            if let Some(index_name) = def.owned_index.as_deref() {
1048                // The backing index is owned exclusively by the
1049                // constraint, so dropping it is unconditional.
1050                let _ = self.drop_named_index_inner(index_name, true, false);
1051            }
1052            self.active_constraints.fetch_sub(1, Ordering::Relaxed);
1053            self.emit(|| crate::MutationEvent::DropConstraint {
1054                name: name.to_string(),
1055                if_exists,
1056            });
1057        }
1058        Ok(outcome)
1059    }
1060
1061    /// Inverse of [`Self::register_index`]. Removes the catalog entry
1062    /// and (for RANGE) leaves the underlying property-index buckets in
1063    /// place — they may still be needed for lazy-activation lookups
1064    /// even after the explicit DDL declaration is gone.
1065    pub(super) fn drop_named_index(
1066        &self,
1067        name: &str,
1068        if_exists: bool,
1069    ) -> Result<DropIndexOutcome, DropIndexError> {
1070        self.drop_named_index_inner(name, if_exists, true)
1071    }
1072
1073    fn drop_named_index_inner(
1074        &self,
1075        name: &str,
1076        if_exists: bool,
1077        emit_event: bool,
1078    ) -> Result<DropIndexOutcome, DropIndexError> {
1079        if let Some(owner) = self
1080            .constraint_catalog_read()
1081            .constraint_owning_index(name)
1082            .cloned()
1083        {
1084            return Err(DropIndexError::ConstraintOwned {
1085                index: name.to_string(),
1086                constraint: owner.name,
1087            });
1088        }
1089
1090        let outcome = {
1091            let mut catalog = self.index_catalog_write();
1092            catalog.try_drop(name, if_exists)?
1093        };
1094        if let DropIndexOutcome::Dropped(def) = &outcome {
1095            // Release backing structures keyed off the dropped def.
1096            match def.kind {
1097                StoredIndexKind::Text => {
1098                    if let Some(label) = def.label.as_deref() {
1099                        for prop in &def.properties {
1100                            self.deactivate_text_scope(def.entity, label, prop);
1101                        }
1102                    }
1103                }
1104                StoredIndexKind::Range => {
1105                    if let Some(label) = def.label.as_deref() {
1106                        for prop in &def.properties {
1107                            self.deactivate_sorted_scope(def.entity, label, prop);
1108                        }
1109                    }
1110                }
1111                StoredIndexKind::Point => {
1112                    if let Some(label) = def.label.as_deref() {
1113                        for prop in &def.properties {
1114                            self.deactivate_point_scope(def.entity, label, prop);
1115                        }
1116                    }
1117                }
1118                StoredIndexKind::Lookup => {
1119                    // Lookup rides on the eagerly-maintained label/type
1120                    // indexes; nothing to release.
1121                }
1122                StoredIndexKind::Vector => {
1123                    self.deactivate_vector_index(def.entity, &def.name);
1124                }
1125                StoredIndexKind::Fulltext => {
1126                    self.deactivate_fulltext_index(def.entity, &def.name);
1127                }
1128            }
1129            if emit_event {
1130                self.emit(|| crate::MutationEvent::DropIndex {
1131                    name: name.to_string(),
1132                    if_exists,
1133                });
1134            }
1135        }
1136        Ok(outcome)
1137    }
1138
1139    fn populate_index_data(&self, def: &IndexDefinition) {
1140        // RANGE: piggy-back on the existing lazy property-index buckets.
1141        // TEXT: build a trigram inverted index over the existing entity
1142        //       data for the (label, property) tuple.
1143        // POINT: build a grid-bucket spatial index over the existing
1144        //        entity data.
1145        // LOOKUP: catalog-only; existing label/type indexes already
1146        //         answer the predicates.
1147        match def.kind {
1148            StoredIndexKind::Range => {
1149                for key in &def.properties {
1150                    match def.entity {
1151                        StoredIndexEntity::Node => self.ensure_node_property_index(key),
1152                        StoredIndexEntity::Relationship => {
1153                            self.ensure_relationship_property_index(key)
1154                        }
1155                    }
1156                    if let Some(label) = def.label.as_deref() {
1157                        self.activate_sorted_scope(def.entity, label, key);
1158                    }
1159                }
1160            }
1161            StoredIndexKind::Text => {
1162                let label = match def.label.as_deref() {
1163                    Some(l) => l,
1164                    None => return,
1165                };
1166                for property in &def.properties {
1167                    self.activate_text_scope(def.entity, label, property);
1168                }
1169            }
1170            StoredIndexKind::Point => {
1171                let label = match def.label.as_deref() {
1172                    Some(l) => l,
1173                    None => return,
1174                };
1175                let cell_size = PointRegistry::cell_size_from_options(&def.options);
1176                for property in &def.properties {
1177                    self.activate_point_scope(def.entity, label, property, cell_size);
1178                }
1179            }
1180            StoredIndexKind::Fulltext => {
1181                let labels: Vec<String> = def.all_labels().map(String::from).collect();
1182                if labels.is_empty() {
1183                    return;
1184                }
1185                self.activate_fulltext_index(def.entity, &def.name, &labels, &def.properties);
1186            }
1187            StoredIndexKind::Vector => {
1188                let label = match def.label.as_deref() {
1189                    Some(l) => l,
1190                    None => return,
1191                };
1192                let property = match def.properties.first() {
1193                    Some(p) => p.as_str(),
1194                    None => return,
1195                };
1196                let similarity = VectorSimilarity::from_options(&def.options)
1197                    .unwrap_or(VectorSimilarity::Cosine);
1198                let provider = VectorIndexProvider::from_options(&def.options)
1199                    .unwrap_or(VectorIndexProvider::Flat);
1200                let hnsw = HnswParams::from_options(&def.options);
1201                let lazy = matches!(
1202                    def.options.get("vector.populate.async"),
1203                    Some(super::index_catalog::IndexConfigValue::Bool(true))
1204                );
1205                self.activate_vector_index(
1206                    def.entity, &def.name, label, property, similarity, provider, hnsw, lazy,
1207                );
1208                if lazy {
1209                    self.index_catalog_write()
1210                        .set_state(&def.name, StoredIndexState::Populating);
1211                }
1212            }
1213            // LOOKUP rides on the label/type indexes maintained eagerly.
1214            StoredIndexKind::Lookup => {}
1215        }
1216    }
1217
1218    pub(super) fn text_indexes_read(
1219        &self,
1220        entity: StoredIndexEntity,
1221    ) -> std::sync::RwLockReadGuard<'_, TrigramRegistry> {
1222        self.indexes.text.read(entity)
1223    }
1224
1225    pub(super) fn text_indexes_write(
1226        &self,
1227        entity: StoredIndexEntity,
1228    ) -> RwLockWriteGuard<'_, TrigramRegistry> {
1229        self.indexes.text.write(entity)
1230    }
1231
1232    pub(super) fn fulltext_indexes_read(
1233        &self,
1234        entity: StoredIndexEntity,
1235    ) -> std::sync::RwLockReadGuard<'_, FulltextRegistry> {
1236        self.indexes.fulltext.read(entity)
1237    }
1238
1239    #[allow(dead_code)]
1240    pub(super) fn fulltext_indexes_write(
1241        &self,
1242        entity: StoredIndexEntity,
1243    ) -> RwLockWriteGuard<'_, FulltextRegistry> {
1244        self.indexes.fulltext.write(entity)
1245    }
1246
1247    fn activate_text_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1248        if !self.text_indexes_write(entity).add_scope(label, property) {
1249            return;
1250        }
1251
1252        let backfill: Vec<(u64, String)> = match entity {
1253            StoredIndexEntity::Node => self
1254                .iter_nodes()
1255                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1256                .filter_map(|(id, node)| match node.properties.get(property) {
1257                    Some(PropertyValue::String(value)) => Some((id, value.clone())),
1258                    _ => None,
1259                })
1260                .collect(),
1261            StoredIndexEntity::Relationship => self
1262                .iter_rels()
1263                .filter(|(_, rel)| rel.rel_type == label)
1264                .filter_map(|(id, rel)| match rel.properties.get(property) {
1265                    Some(PropertyValue::String(value)) => Some((id, value.clone())),
1266                    _ => None,
1267                })
1268                .collect(),
1269        };
1270
1271        let mut registry = self.text_indexes_write(entity);
1272        for (id, value) in backfill {
1273            registry.insert(label, property, id, &value);
1274        }
1275    }
1276
1277    /// Drop a (label, property) text scope, decrementing the refcount.
1278    pub(super) fn deactivate_text_scope(
1279        &self,
1280        entity: StoredIndexEntity,
1281        label: &str,
1282        property: &str,
1283    ) {
1284        self.text_indexes_write(entity)
1285            .remove_scope(label, property);
1286    }
1287
1288    fn activate_fulltext_index(
1289        &self,
1290        entity: StoredIndexEntity,
1291        name: &str,
1292        labels: &[String],
1293        properties: &[String],
1294    ) {
1295        use super::fulltext_index::{term_counts_for_properties, TermCounts};
1296
1297        {
1298            let mut registry = self.fulltext_indexes_write(entity);
1299            registry.register(name.to_string(), labels.to_vec(), properties.to_vec());
1300        }
1301        self.indexes
1302            .active_fulltext_indexes
1303            .fetch_add(1, Ordering::Relaxed);
1304
1305        // Backfill: walk every entity matching any label, tokenise covered
1306        // string properties, install one posting batch per entity.
1307        let backfill: Vec<(u64, TermCounts)> = match entity {
1308            StoredIndexEntity::Node => self
1309                .iter_nodes()
1310                .filter(|(_, node)| {
1311                    labels
1312                        .iter()
1313                        .any(|wanted| node.labels.iter().any(|l| l == wanted))
1314                })
1315                .map(|(id, node)| {
1316                    let counts = term_counts_for_properties(&node.properties, properties);
1317                    (id, counts)
1318                })
1319                .filter(|(_, c)| !c.is_empty())
1320                .collect(),
1321            StoredIndexEntity::Relationship => self
1322                .iter_rels()
1323                .filter(|(_, rel)| labels.iter().any(|wanted| wanted == &rel.rel_type))
1324                .map(|(id, rel)| {
1325                    let counts = term_counts_for_properties(&rel.properties, properties);
1326                    (id, counts)
1327                })
1328                .filter(|(_, c)| !c.is_empty())
1329                .collect(),
1330        };
1331
1332        let mut registry = self.fulltext_indexes_write(entity);
1333        if let Some(index) = registry.get_mut(name) {
1334            for (id, counts) in backfill {
1335                index.reindex_entity(id, counts);
1336            }
1337        }
1338    }
1339
1340    pub(super) fn deactivate_fulltext_index(&self, entity: StoredIndexEntity, name: &str) {
1341        self.fulltext_indexes_write(entity).deregister(name);
1342        self.indexes
1343            .active_fulltext_indexes
1344            .fetch_sub(1, Ordering::Relaxed);
1345    }
1346
1347    pub(super) fn sorted_indexes_read(
1348        &self,
1349        entity: StoredIndexEntity,
1350    ) -> std::sync::RwLockReadGuard<'_, SortedPropertyIndex> {
1351        self.indexes.sorted.read(entity)
1352    }
1353
1354    pub(super) fn sorted_indexes_write(
1355        &self,
1356        entity: StoredIndexEntity,
1357    ) -> RwLockWriteGuard<'_, SortedPropertyIndex> {
1358        self.indexes.sorted.write(entity)
1359    }
1360
1361    fn activate_sorted_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1362        if !self.sorted_indexes_write(entity).add_scope(label, property) {
1363            return;
1364        }
1365
1366        let backfill: Vec<(u64, PropertyValue)> = match entity {
1367            StoredIndexEntity::Node => self
1368                .iter_nodes()
1369                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1370                .filter_map(|(id, node)| {
1371                    node.properties
1372                        .get(property)
1373                        .map(|value| (id, value.clone()))
1374                })
1375                .collect(),
1376            StoredIndexEntity::Relationship => self
1377                .iter_rels()
1378                .filter(|(_, rel)| rel.rel_type == label)
1379                .filter_map(|(id, rel)| {
1380                    rel.properties
1381                        .get(property)
1382                        .map(|value| (id, value.clone()))
1383                })
1384                .collect(),
1385        };
1386
1387        let mut registry = self.sorted_indexes_write(entity);
1388        for (id, value) in backfill {
1389            registry.insert(label, property, id, &value);
1390        }
1391    }
1392
1393    pub(super) fn deactivate_sorted_scope(
1394        &self,
1395        entity: StoredIndexEntity,
1396        label: &str,
1397        property: &str,
1398    ) {
1399        self.sorted_indexes_write(entity)
1400            .remove_scope(label, property);
1401    }
1402
1403    pub(super) fn point_indexes_read(
1404        &self,
1405        entity: StoredIndexEntity,
1406    ) -> std::sync::RwLockReadGuard<'_, PointRegistry> {
1407        self.indexes.point.read(entity)
1408    }
1409
1410    pub(super) fn point_indexes_write(
1411        &self,
1412        entity: StoredIndexEntity,
1413    ) -> RwLockWriteGuard<'_, PointRegistry> {
1414        self.indexes.point.write(entity)
1415    }
1416
1417    fn activate_point_scope(
1418        &self,
1419        entity: StoredIndexEntity,
1420        label: &str,
1421        property: &str,
1422        cell_size: Option<f64>,
1423    ) {
1424        if !self
1425            .point_indexes_write(entity)
1426            .add_scope(label, property, cell_size)
1427        {
1428            return;
1429        }
1430
1431        let backfill: Vec<(u64, LoraPoint)> = match entity {
1432            StoredIndexEntity::Node => self
1433                .iter_nodes()
1434                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1435                .filter_map(|(id, node)| match node.properties.get(property) {
1436                    Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1437                    _ => None,
1438                })
1439                .collect(),
1440            StoredIndexEntity::Relationship => self
1441                .iter_rels()
1442                .filter(|(_, rel)| rel.rel_type == label)
1443                .filter_map(|(id, rel)| match rel.properties.get(property) {
1444                    Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1445                    _ => None,
1446                })
1447                .collect(),
1448        };
1449
1450        let mut registry = self.point_indexes_write(entity);
1451        for (id, point) in backfill {
1452            registry.insert(label, property, id, point);
1453        }
1454    }
1455
1456    pub(super) fn deactivate_point_scope(
1457        &self,
1458        entity: StoredIndexEntity,
1459        label: &str,
1460        property: &str,
1461    ) {
1462        self.point_indexes_write(entity)
1463            .remove_scope(label, property);
1464    }
1465
1466    pub(super) fn vector_indexes_read(
1467        &self,
1468        entity: StoredIndexEntity,
1469    ) -> std::sync::RwLockReadGuard<'_, VectorIndexRegistry> {
1470        self.indexes.vector.read(entity)
1471    }
1472
1473    pub(super) fn vector_indexes_write(
1474        &self,
1475        entity: StoredIndexEntity,
1476    ) -> RwLockWriteGuard<'_, VectorIndexRegistry> {
1477        self.indexes.vector.write(entity)
1478    }
1479
1480    #[allow(clippy::too_many_arguments)]
1481    fn activate_vector_index(
1482        &self,
1483        entity: StoredIndexEntity,
1484        name: &str,
1485        label: &str,
1486        property: &str,
1487        similarity: VectorSimilarity,
1488        provider: VectorIndexProvider,
1489        hnsw: HnswParams,
1490        lazy: bool,
1491    ) {
1492        {
1493            let mut registry = self.vector_indexes_write(entity);
1494            registry.register(
1495                name.to_string(),
1496                label.to_string(),
1497                property.to_string(),
1498                similarity,
1499                provider,
1500                hnsw,
1501            );
1502        }
1503
1504        if lazy {
1505            // Skip the initial backfill — the catalog state is flipped
1506            // to Populating by the caller and the first query routed
1507            // to this index triggers `lazy_populate_vector_index`.
1508            // Mutations between CREATE and first query still feed the
1509            // registry via the maintenance hook, so the lazy phase
1510            // only handles vectors that existed before CREATE.
1511            return;
1512        }
1513
1514        self.backfill_vector_index(entity, label, property);
1515    }
1516
1517    /// Walk the property store for vectors matching this index's
1518    /// `(label, property)` scope and replay them into the registry.
1519    /// Shared by sync CREATE and lazy-populate flows.
1520    fn backfill_vector_index(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1521        let backfill: Vec<(u64, crate::LoraVector)> = match entity {
1522            StoredIndexEntity::Node => self
1523                .iter_nodes()
1524                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1525                .filter_map(|(id, node)| match node.properties.get(property) {
1526                    Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1527                    _ => None,
1528                })
1529                .collect(),
1530            StoredIndexEntity::Relationship => self
1531                .iter_rels()
1532                .filter(|(_, rel)| rel.rel_type == label)
1533                .filter_map(|(id, rel)| match rel.properties.get(property) {
1534                    Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1535                    _ => None,
1536                })
1537                .collect(),
1538        };
1539
1540        let mut registry = self.vector_indexes_write(entity);
1541        for (id, vector) in backfill {
1542            registry.insert_for(label, property, id, &vector);
1543        }
1544    }
1545
1546    /// Lazy-populate a `Populating` vector index: backfill from the
1547    /// property store, then flip catalog state to `Online`. Called
1548    /// from `GraphStorage::vector_search` on the first request that
1549    /// hits a still-populating index. Idempotent: a second concurrent
1550    /// caller finds the state already `Online` and does no work.
1551    pub(super) fn lazy_populate_vector_index(&self, name: &str) {
1552        let def = match self.index_catalog_read().get(name).cloned() {
1553            Some(d) => d,
1554            None => return,
1555        };
1556        if def.state != StoredIndexState::Populating || def.kind != StoredIndexKind::Vector {
1557            return;
1558        }
1559        let label = match def.label.as_deref() {
1560            Some(l) => l,
1561            None => return,
1562        };
1563        let property = match def.properties.first() {
1564            Some(p) => p.as_str(),
1565            None => return,
1566        };
1567        self.backfill_vector_index(def.entity, label, property);
1568        self.index_catalog_write()
1569            .set_state(name, StoredIndexState::Online);
1570    }
1571
1572    pub(super) fn deactivate_vector_index(&self, entity: StoredIndexEntity, name: &str) {
1573        self.vector_indexes_write(entity).deregister(name);
1574    }
1575
1576    /// Snapshot of cardinality stats. Cheap: derived from already-tracked
1577    /// `nodes_by_label` / `relationships_by_type` lengths and the active
1578    /// property-index buckets. The cost model uses this to populate
1579    /// `estimated_rows` on plan-tree nodes.
1580    pub fn graph_stats(&self) -> GraphStats {
1581        let mut stats = GraphStats {
1582            node_count: self.live_node_count,
1583            relationship_count: self.live_rel_count,
1584            ..Default::default()
1585        };
1586        for (label, ids) in &self.nodes_by_label {
1587            stats.nodes_by_label.insert(label.clone(), ids.len());
1588        }
1589        for (rel_type, ids) in &self.relationships_by_type {
1590            stats
1591                .relationships_by_type
1592                .insert(rel_type.clone(), ids.len());
1593        }
1594        // Distinct values per (label, property): pulled from the
1595        // property-index scoped buckets, where we already track the
1596        // per-scope value distribution. Empty for properties without
1597        // an active hash-index — the cost model falls back to a
1598        // conservative estimate in that case.
1599        let prop_indexes = self.indexes_read();
1600        for (scope, props) in &prop_indexes.node_properties.scoped_values {
1601            for (key, values) in props {
1602                stats
1603                    .node_distinct_values
1604                    .insert((scope.clone(), key.clone()), values.len());
1605            }
1606        }
1607        for (scope, props) in &prop_indexes.relationship_properties.scoped_values {
1608            for (key, values) in props {
1609                stats
1610                    .relationship_distinct_values
1611                    .insert((scope.clone(), key.clone()), values.len());
1612            }
1613        }
1614
1615        for def in self.index_catalog_read().list() {
1616            if def.state != StoredIndexState::Online {
1617                continue;
1618            }
1619            let Some(label) = def.label else {
1620                continue;
1621            };
1622            for property in def.properties {
1623                let scope = (label.clone(), property);
1624                match (def.entity, def.kind) {
1625                    (StoredIndexEntity::Node, StoredIndexKind::Range) => {
1626                        stats.node_range_indexes.insert(scope);
1627                    }
1628                    (StoredIndexEntity::Node, StoredIndexKind::Text) => {
1629                        stats.node_text_indexes.insert(scope);
1630                    }
1631                    (StoredIndexEntity::Node, StoredIndexKind::Point) => {
1632                        stats.node_point_indexes.insert(scope);
1633                    }
1634                    (StoredIndexEntity::Relationship, StoredIndexKind::Range) => {
1635                        stats.relationship_range_indexes.insert(scope);
1636                    }
1637                    (StoredIndexEntity::Relationship, StoredIndexKind::Text) => {
1638                        stats.relationship_text_indexes.insert(scope);
1639                    }
1640                    (StoredIndexEntity::Relationship, StoredIndexKind::Point) => {
1641                        stats.relationship_point_indexes.insert(scope);
1642                    }
1643                    (StoredIndexEntity::Node, StoredIndexKind::Vector) => {
1644                        stats.node_vector_indexes.insert(scope);
1645                    }
1646                    (StoredIndexEntity::Relationship, StoredIndexKind::Vector) => {
1647                        stats.relationship_vector_indexes.insert(scope);
1648                    }
1649                    (_, StoredIndexKind::Lookup | StoredIndexKind::Fulltext) => {}
1650                }
1651            }
1652        }
1653        stats
1654    }
1655
1656    /// Approximate retained-heap breakdown of this graph. See
1657    /// [`super::MemoryReport`] for the methodology and per-component
1658    /// fields. Intended for benches and the `mem_probe*` examples;
1659    /// not on a hot path.
1660    pub fn memory_estimate(&self) -> super::MemoryReport {
1661        super::mem_report::estimate(self)
1662    }
1663
1664    pub(super) fn rebuild_property_indexes(&mut self) {
1665        let mut indexes = PropertyIndexRegistry::default();
1666
1667        for (id, node) in self.iter_nodes() {
1668            for (key, value) in &node.properties {
1669                if PropertyIndexKey::from_value(value).is_some() {
1670                    indexes.node_properties.activate(key);
1671                    indexes.node_properties.insert_with_scopes(
1672                        id,
1673                        node.labels.iter().map(String::as_str),
1674                        key,
1675                        value,
1676                    );
1677                }
1678            }
1679        }
1680
1681        for (id, rel) in self.iter_rels() {
1682            for (key, value) in &rel.properties {
1683                if PropertyIndexKey::from_value(value).is_some() {
1684                    indexes.relationship_properties.activate(key);
1685                    indexes.relationship_properties.insert_with_scopes(
1686                        id,
1687                        [rel.rel_type.as_str()],
1688                        key,
1689                        value,
1690                    );
1691                }
1692            }
1693        }
1694
1695        let node_index_count = indexes.node_properties.active_keys.len();
1696        let relationship_index_count = indexes.relationship_properties.active_keys.len();
1697        *self.indexes_mut() = indexes;
1698        self.indexes
1699            .active_node_property_indexes
1700            .store(node_index_count, Ordering::Relaxed);
1701        self.indexes
1702            .active_relationship_property_indexes
1703            .store(relationship_index_count, Ordering::Relaxed);
1704    }
1705
1706    pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
1707        for label in &node.labels {
1708            self.insert_node_label_index(node.id, label);
1709        }
1710        self.index_node_properties_if_active(
1711            node.id,
1712            node.labels.iter().map(String::as_str),
1713            &node.properties,
1714        );
1715        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1716    }
1717
1718    pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
1719        for label in &node.labels {
1720            self.insert_node_label_index(node.id, label);
1721        }
1722        self.index_node_properties_eager(
1723            node.id,
1724            node.labels.iter().map(String::as_str),
1725            &node.properties,
1726        );
1727        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1728    }
1729
1730    pub(super) fn on_node_property_set(
1731        &mut self,
1732        node_id: NodeId,
1733        key: &str,
1734        old: Option<&PropertyValue>,
1735        new: &PropertyValue,
1736    ) {
1737        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1738            return;
1739        };
1740
1741        if self.node_property_index_is_active(key) {
1742            if let Some(old) = old {
1743                self.unindex_node_property_if_active(
1744                    node_id,
1745                    labels.iter().map(String::as_str),
1746                    key,
1747                    old,
1748                );
1749            }
1750            self.index_node_property_if_active(
1751                node_id,
1752                labels.iter().map(String::as_str),
1753                key,
1754                new,
1755            );
1756        }
1757
1758        self.update_secondary_property(
1759            StoredIndexEntity::Node,
1760            labels.iter().map(String::as_str),
1761            node_id,
1762            key,
1763            old,
1764            Some(new),
1765        );
1766    }
1767
1768    pub(super) fn on_node_property_removed(
1769        &mut self,
1770        node_id: NodeId,
1771        key: &str,
1772        old: &PropertyValue,
1773    ) {
1774        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1775            return;
1776        };
1777        if self.node_property_index_is_active(key) {
1778            self.unindex_node_property_if_active(
1779                node_id,
1780                labels.iter().map(String::as_str),
1781                key,
1782                old,
1783            );
1784        }
1785        self.update_secondary_property(
1786            StoredIndexEntity::Node,
1787            labels.iter().map(String::as_str),
1788            node_id,
1789            key,
1790            Some(old),
1791            None,
1792        );
1793    }
1794
1795    pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
1796        self.insert_node_label_index(node_id, label);
1797
1798        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1799            return;
1800        };
1801        if self.active_node_property_index_count() != 0 {
1802            self.index_node_scope_properties_if_active(node_id, label, &properties);
1803        }
1804        for (key, value) in &properties {
1805            self.update_secondary_property(
1806                StoredIndexEntity::Node,
1807                [label],
1808                node_id,
1809                key,
1810                None,
1811                Some(value),
1812            );
1813        }
1814    }
1815
1816    pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
1817        self.remove_node_label_index(node_id, label);
1818
1819        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1820            return;
1821        };
1822        if self.active_node_property_index_count() != 0 {
1823            self.unindex_node_scope_properties_if_active(node_id, label, &properties);
1824        }
1825        for (key, value) in &properties {
1826            self.update_secondary_property(
1827                StoredIndexEntity::Node,
1828                [label],
1829                node_id,
1830                key,
1831                Some(value),
1832                None,
1833            );
1834        }
1835    }
1836
1837    pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
1838        for label in &node.labels {
1839            self.remove_node_label_index(node.id, label);
1840        }
1841        self.unindex_active_node_properties(
1842            node.id,
1843            node.labels.iter().map(String::as_str),
1844            &node.properties,
1845        );
1846        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Remove);
1847    }
1848
1849    pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
1850        self.attach_relationship(rel);
1851        self.index_relationship_properties_if_active(
1852            rel.id,
1853            [rel.rel_type.as_str()],
1854            &rel.properties,
1855        );
1856        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1857    }
1858
1859    pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
1860        self.attach_relationship(rel);
1861        self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
1862        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1863    }
1864
1865    pub(super) fn on_relationship_property_set(
1866        &mut self,
1867        rel_id: RelationshipId,
1868        key: &str,
1869        old: Option<&PropertyValue>,
1870        new: &PropertyValue,
1871    ) {
1872        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1873            return;
1874        };
1875
1876        if self.relationship_property_index_is_active(key) {
1877            if let Some(old) = old {
1878                self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1879            }
1880            self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
1881        }
1882
1883        self.update_secondary_property(
1884            StoredIndexEntity::Relationship,
1885            [rel_type.as_str()],
1886            rel_id,
1887            key,
1888            old,
1889            Some(new),
1890        );
1891    }
1892
1893    pub(super) fn on_relationship_property_removed(
1894        &mut self,
1895        rel_id: RelationshipId,
1896        key: &str,
1897        old: &PropertyValue,
1898    ) {
1899        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1900            return;
1901        };
1902        if self.relationship_property_index_is_active(key) {
1903            self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1904        }
1905        self.update_secondary_property(
1906            StoredIndexEntity::Relationship,
1907            [rel_type.as_str()],
1908            rel_id,
1909            key,
1910            Some(old),
1911            None,
1912        );
1913    }
1914
1915    pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
1916        self.detach_relationship_indexes(rel);
1917        self.unindex_active_relationship_properties(
1918            rel.id,
1919            [rel.rel_type.as_str()],
1920            &rel.properties,
1921        );
1922        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Remove);
1923    }
1924
1925    fn index_node_property_eager<'a>(
1926        &mut self,
1927        node_id: NodeId,
1928        labels: impl IntoIterator<Item = &'a str>,
1929        key: &str,
1930        value: &PropertyValue,
1931    ) {
1932        if PropertyIndexKey::from_value(value).is_none() {
1933            return;
1934        }
1935
1936        let activated = {
1937            let indexes = self.indexes_mut();
1938            let activated = indexes.node_properties.activate(key);
1939            indexes
1940                .node_properties
1941                .insert_with_scopes(node_id, labels, key, value);
1942            activated
1943        };
1944        if activated {
1945            self.indexes
1946                .active_node_property_indexes
1947                .fetch_add(1, Ordering::Relaxed);
1948        }
1949    }
1950
1951    fn index_relationship_property_eager<'a>(
1952        &mut self,
1953        rel_id: RelationshipId,
1954        scopes: impl IntoIterator<Item = &'a str>,
1955        key: &str,
1956        value: &PropertyValue,
1957    ) {
1958        if PropertyIndexKey::from_value(value).is_none() {
1959            return;
1960        }
1961
1962        let activated = {
1963            let indexes = self.indexes_mut();
1964            let activated = indexes.relationship_properties.activate(key);
1965            indexes
1966                .relationship_properties
1967                .insert_with_scopes(rel_id, scopes, key, value);
1968            activated
1969        };
1970        if activated {
1971            self.indexes
1972                .active_relationship_property_indexes
1973                .fetch_add(1, Ordering::Relaxed);
1974        }
1975    }
1976
1977    fn index_node_properties_eager<'a>(
1978        &mut self,
1979        node_id: NodeId,
1980        labels: impl IntoIterator<Item = &'a str> + Clone,
1981        properties: &Properties,
1982    ) {
1983        for (key, value) in properties {
1984            self.index_node_property_eager(node_id, labels.clone(), key, value);
1985        }
1986    }
1987
1988    fn index_relationship_properties_eager<'a>(
1989        &mut self,
1990        rel_id: RelationshipId,
1991        scopes: impl IntoIterator<Item = &'a str> + Clone,
1992        properties: &Properties,
1993    ) {
1994        for (key, value) in properties {
1995            self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
1996        }
1997    }
1998
1999    fn index_node_property_if_active<'a>(
2000        &mut self,
2001        node_id: NodeId,
2002        labels: impl IntoIterator<Item = &'a str>,
2003        key: &str,
2004        value: &PropertyValue,
2005    ) {
2006        if self.active_node_property_index_count() == 0 {
2007            return;
2008        }
2009        let indexes = self.indexes_mut();
2010        if indexes.node_properties.is_active(key) {
2011            indexes
2012                .node_properties
2013                .insert_with_scopes(node_id, labels, key, value);
2014        }
2015    }
2016
2017    fn index_node_properties_if_active<'a>(
2018        &mut self,
2019        node_id: NodeId,
2020        labels: impl IntoIterator<Item = &'a str> + Clone,
2021        properties: &Properties,
2022    ) {
2023        if self.active_node_property_index_count() == 0 {
2024            return;
2025        }
2026        let indexes = self.indexes_mut();
2027        for (key, value) in properties {
2028            if indexes.node_properties.is_active(key) {
2029                indexes
2030                    .node_properties
2031                    .insert_with_scopes(node_id, labels.clone(), key, value);
2032            }
2033        }
2034    }
2035
2036    fn unindex_node_property_if_active<'a>(
2037        &mut self,
2038        node_id: NodeId,
2039        labels: impl IntoIterator<Item = &'a str>,
2040        key: &str,
2041        value: &PropertyValue,
2042    ) {
2043        if self.active_node_property_index_count() == 0 {
2044            return;
2045        }
2046        let indexes = self.indexes_mut();
2047        if indexes.node_properties.is_active(key) {
2048            indexes
2049                .node_properties
2050                .remove_with_scopes(node_id, labels, key, value);
2051        }
2052    }
2053
2054    fn index_node_scope_properties_if_active(
2055        &mut self,
2056        node_id: NodeId,
2057        scope: &str,
2058        properties: &Properties,
2059    ) {
2060        if self.active_node_property_index_count() == 0 {
2061            return;
2062        }
2063        let indexes = self.indexes_mut();
2064        for (key, value) in properties {
2065            if indexes.node_properties.is_active(key) {
2066                indexes
2067                    .node_properties
2068                    .insert_scoped(node_id, scope, key, value);
2069            }
2070        }
2071    }
2072
2073    fn unindex_node_scope_properties_if_active(
2074        &mut self,
2075        node_id: NodeId,
2076        scope: &str,
2077        properties: &Properties,
2078    ) {
2079        if self.active_node_property_index_count() == 0 {
2080            return;
2081        }
2082        let indexes = self.indexes_mut();
2083        for (key, value) in properties {
2084            if indexes.node_properties.is_active(key) {
2085                indexes
2086                    .node_properties
2087                    .remove_scoped(node_id, scope, key, value);
2088            }
2089        }
2090    }
2091
2092    fn unindex_active_node_properties<'a>(
2093        &mut self,
2094        node_id: NodeId,
2095        labels: impl IntoIterator<Item = &'a str> + Clone,
2096        properties: &Properties,
2097    ) {
2098        if self.active_node_property_index_count() == 0 {
2099            return;
2100        }
2101        let indexes = self.indexes_mut();
2102        for (key, value) in properties {
2103            if indexes.node_properties.is_active(key) {
2104                indexes
2105                    .node_properties
2106                    .remove_with_scopes(node_id, labels.clone(), key, value);
2107            }
2108        }
2109    }
2110
2111    fn index_relationship_property_if_active<'a>(
2112        &mut self,
2113        rel_id: RelationshipId,
2114        scopes: impl IntoIterator<Item = &'a str>,
2115        key: &str,
2116        value: &PropertyValue,
2117    ) {
2118        if self.active_relationship_property_index_count() == 0 {
2119            return;
2120        }
2121        let indexes = self.indexes_mut();
2122        if indexes.relationship_properties.is_active(key) {
2123            indexes
2124                .relationship_properties
2125                .insert_with_scopes(rel_id, scopes, key, value);
2126        }
2127    }
2128
2129    fn index_relationship_properties_if_active<'a>(
2130        &mut self,
2131        rel_id: RelationshipId,
2132        scopes: impl IntoIterator<Item = &'a str> + Clone,
2133        properties: &Properties,
2134    ) {
2135        if self.active_relationship_property_index_count() == 0 {
2136            return;
2137        }
2138        let indexes = self.indexes_mut();
2139        for (key, value) in properties {
2140            if indexes.relationship_properties.is_active(key) {
2141                indexes.relationship_properties.insert_with_scopes(
2142                    rel_id,
2143                    scopes.clone(),
2144                    key,
2145                    value,
2146                );
2147            }
2148        }
2149    }
2150
2151    fn unindex_relationship_property_if_active<'a>(
2152        &mut self,
2153        rel_id: RelationshipId,
2154        scopes: impl IntoIterator<Item = &'a str>,
2155        key: &str,
2156        value: &PropertyValue,
2157    ) {
2158        if self.active_relationship_property_index_count() == 0 {
2159            return;
2160        }
2161        let indexes = self.indexes_mut();
2162        if indexes.relationship_properties.is_active(key) {
2163            indexes
2164                .relationship_properties
2165                .remove_with_scopes(rel_id, scopes, key, value);
2166        }
2167    }
2168
2169    fn unindex_active_relationship_properties<'a>(
2170        &mut self,
2171        rel_id: RelationshipId,
2172        scopes: impl IntoIterator<Item = &'a str> + Clone,
2173        properties: &Properties,
2174    ) {
2175        if self.active_relationship_property_index_count() == 0 {
2176            return;
2177        }
2178        let indexes = self.indexes_mut();
2179        for (key, value) in properties {
2180            if indexes.relationship_properties.is_active(key) {
2181                indexes.relationship_properties.remove_with_scopes(
2182                    rel_id,
2183                    scopes.clone(),
2184                    key,
2185                    value,
2186                );
2187            }
2188        }
2189    }
2190
2191    pub(super) fn scan_nodes_by_property(
2192        &self,
2193        label: Option<&str>,
2194        key: &str,
2195        value: &PropertyValue,
2196    ) -> Vec<NodeRecord> {
2197        match label {
2198            Some(label) => self
2199                .nodes_by_label
2200                .get(label)
2201                .into_iter()
2202                .flat_map(|ids| ids.iter())
2203                .filter_map(|&id| self.node_at(id))
2204                .filter(|node| node.properties.get(key) == Some(value))
2205                .cloned()
2206                .collect(),
2207            None => self
2208                .iter_node_records()
2209                .filter(|node| node.properties.get(key) == Some(value))
2210                .cloned()
2211                .collect(),
2212        }
2213    }
2214
2215    pub(super) fn scan_node_ids_by_property(
2216        &self,
2217        label: Option<&str>,
2218        key: &str,
2219        value: &PropertyValue,
2220    ) -> Vec<NodeId> {
2221        match label {
2222            Some(label) => self
2223                .nodes_by_label
2224                .get(label)
2225                .into_iter()
2226                .flat_map(|ids| ids.iter())
2227                .filter_map(|&id| {
2228                    (self.node_at(id)?.properties.get(key) == Some(value)).then_some(id)
2229                })
2230                .collect(),
2231            None => self
2232                .iter_nodes()
2233                .filter_map(|(id, node)| (node.properties.get(key) == Some(value)).then_some(id))
2234                .collect(),
2235        }
2236    }
2237
2238    pub(super) fn any_node_by_property(
2239        &self,
2240        label: &str,
2241        key: &str,
2242        value: &PropertyValue,
2243    ) -> bool {
2244        self.nodes_by_label
2245            .get(label)
2246            .into_iter()
2247            .flat_map(|ids| ids.iter())
2248            .filter_map(|&id| self.node_at(id))
2249            .any(|node| node.properties.get(key) == Some(value))
2250    }
2251
2252    pub(super) fn scan_relationships_by_property(
2253        &self,
2254        rel_type: Option<&str>,
2255        key: &str,
2256        value: &PropertyValue,
2257    ) -> Vec<RelationshipRecord> {
2258        match rel_type {
2259            Some(rel_type) => self
2260                .relationships_by_type
2261                .get(rel_type)
2262                .into_iter()
2263                .flat_map(|ids| ids.iter())
2264                .filter_map(|&id| self.rel_at(id))
2265                .filter(|rel| rel.properties.get(key) == Some(value))
2266                .cloned()
2267                .collect(),
2268            None => self
2269                .iter_rel_records()
2270                .filter(|rel| rel.properties.get(key) == Some(value))
2271                .cloned()
2272                .collect(),
2273        }
2274    }
2275
2276    pub(super) fn scan_relationship_ids_by_property(
2277        &self,
2278        rel_type: Option<&str>,
2279        key: &str,
2280        value: &PropertyValue,
2281    ) -> Vec<RelationshipId> {
2282        match rel_type {
2283            Some(rel_type) => self
2284                .relationships_by_type
2285                .get(rel_type)
2286                .into_iter()
2287                .flat_map(|ids| ids.iter())
2288                .filter_map(|&id| {
2289                    (self.rel_at(id)?.properties.get(key) == Some(value)).then_some(id)
2290                })
2291                .collect(),
2292            None => self
2293                .iter_rels()
2294                .filter_map(|(id, rel)| (rel.properties.get(key) == Some(value)).then_some(id))
2295                .collect(),
2296        }
2297    }
2298
2299    pub(super) fn any_relationship_by_property(
2300        &self,
2301        rel_type: &str,
2302        key: &str,
2303        value: &PropertyValue,
2304    ) -> bool {
2305        self.relationships_by_type
2306            .get(rel_type)
2307            .into_iter()
2308            .flat_map(|ids| ids.iter())
2309            .filter_map(|&id| self.rel_at(id))
2310            .any(|rel| rel.properties.get(key) == Some(value))
2311    }
2312
2313    pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
2314        self.outgoing_push(rel.src, rel.id);
2315        self.incoming_push(rel.dst, rel.id);
2316        self.insert_relationship_type_index(rel.id, &rel.rel_type);
2317    }
2318
2319    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
2320        // Adjacency is now positional `Vec<Vec<RelationshipId>>` — clearing
2321        // the inner Vec leaves the slot in place (the slot is sized for the
2322        // node's lifetime, not the edge's).
2323        self.outgoing_remove(rel.src, rel.id);
2324        self.incoming_remove(rel.dst, rel.id);
2325
2326        self.remove_relationship_type_index(rel.id, &rel.rel_type);
2327    }
2328
2329    pub(super) fn relationship_ids_for_direction(
2330        &self,
2331        node_id: NodeId,
2332        direction: Direction,
2333    ) -> Vec<RelationshipId> {
2334        match direction {
2335            Direction::Left => self
2336                .incoming_at(node_id)
2337                .map(<[_]>::to_vec)
2338                .unwrap_or_default(),
2339
2340            Direction::Right => self
2341                .outgoing_at(node_id)
2342                .map(<[_]>::to_vec)
2343                .unwrap_or_default(),
2344
2345            Direction::Undirected => {
2346                let out = self.outgoing_at(node_id);
2347                let inc = self.incoming_at(node_id);
2348                let mut ids = Vec::with_capacity(
2349                    out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0),
2350                );
2351
2352                if let Some(out) = out {
2353                    ids.extend(out.iter().copied());
2354                }
2355                if let Some(inc) = inc {
2356                    for &rel_id in inc {
2357                        let Some(rel) = self.rel_at(rel_id) else {
2358                            continue;
2359                        };
2360                        if rel.src == node_id && rel.dst == node_id {
2361                            continue;
2362                        }
2363                        ids.push(rel_id);
2364                    }
2365                }
2366
2367                ids
2368            }
2369        }
2370    }
2371
2372    pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
2373        if rel.src == node_id {
2374            Some(rel.dst)
2375        } else if rel.dst == node_id {
2376            Some(rel.src)
2377        } else {
2378            None
2379        }
2380    }
2381
2382    pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
2383        self.outgoing_at(node_id)
2384            .map(|ids| !ids.is_empty())
2385            .unwrap_or(false)
2386            || self
2387                .incoming_at(node_id)
2388                .map(|ids| !ids.is_empty())
2389                .unwrap_or(false)
2390    }
2391
2392    pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> Vec<RelationshipId> {
2393        let out = self.outgoing_at(node_id);
2394        let inc = self.incoming_at(node_id);
2395        let mut rel_ids =
2396            Vec::with_capacity(out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0));
2397
2398        if let Some(ids) = out {
2399            rel_ids.extend(ids.iter().copied());
2400        }
2401        if let Some(ids) = inc {
2402            for &rel_id in ids {
2403                let Some(rel) = self.rel_at(rel_id) else {
2404                    continue;
2405                };
2406                if rel.src == node_id && rel.dst == node_id {
2407                    continue;
2408                }
2409                rel_ids.push(rel_id);
2410            }
2411        }
2412
2413        rel_ids
2414    }
2415
2416    /// Replay a node creation using the id captured in a durable mutation
2417    /// event. This intentionally does not emit a new mutation event: callers
2418    /// must invoke it before installing a recorder on the graph.
2419    #[doc(hidden)]
2420    pub fn replay_create_node(
2421        &mut self,
2422        id: NodeId,
2423        labels: Vec<String>,
2424        properties: Properties,
2425    ) -> Result<NodeRecord, String> {
2426        if self.recorder.is_some() {
2427            return Err(
2428                "cannot replay node creation while a mutation recorder is installed".into(),
2429            );
2430        }
2431        if self.node_at(id).is_some() {
2432            return Err(format!("node id {id} already exists"));
2433        }
2434        let idx = self.ensure_node_slot_checked(id)?;
2435        self.bump_next_node_id_past(id)?;
2436
2437        let labels = Self::normalize_labels(labels);
2438        let node = NodeRecord {
2439            id,
2440            labels: labels.clone(),
2441            properties,
2442        };
2443
2444        self.put_node_at_slot(idx, node.clone());
2445        self.on_node_replayed(&node);
2446
2447        Ok(node)
2448    }
2449
2450    /// Replay a relationship creation using the id captured in a durable
2451    /// mutation event. This intentionally does not emit a new mutation event:
2452    /// callers must invoke it before installing a recorder on the graph.
2453    #[doc(hidden)]
2454    pub fn replay_create_relationship(
2455        &mut self,
2456        id: RelationshipId,
2457        src: NodeId,
2458        dst: NodeId,
2459        rel_type: &str,
2460        properties: Properties,
2461    ) -> Result<RelationshipRecord, String> {
2462        if self.recorder.is_some() {
2463            return Err(
2464                "cannot replay relationship creation while a mutation recorder is installed".into(),
2465            );
2466        }
2467        if self.rel_at(id).is_some() {
2468            return Err(format!("relationship id {id} already exists"));
2469        }
2470        if self.node_at(src).is_none() {
2471            return Err(format!(
2472                "relationship {id} references missing source node {src}"
2473            ));
2474        }
2475        if self.node_at(dst).is_none() {
2476            return Err(format!(
2477                "relationship {id} references missing target node {dst}"
2478            ));
2479        }
2480
2481        let trimmed = rel_type.trim();
2482        if trimmed.is_empty() {
2483            return Err(format!("relationship {id} has an empty type"));
2484        }
2485        let idx = self.ensure_rel_slot_checked(id)?;
2486        self.bump_next_rel_id_past(id)?;
2487
2488        let rel = RelationshipRecord {
2489            id,
2490            src,
2491            dst,
2492            rel_type: trimmed.to_string(),
2493            properties,
2494        };
2495
2496        self.put_rel_at_slot(idx, rel.clone());
2497        self.on_relationship_replayed(&rel);
2498
2499        Ok(rel)
2500    }
2501
2502    #[cfg(test)]
2503    pub(super) fn assert_property_indexes_match_scan(&self) {
2504        let indexes = self.indexes_read();
2505        assert_eq!(
2506            indexes.node_properties.active_keys.len(),
2507            self.active_node_property_index_count(),
2508            "node property index counter diverged from active key set"
2509        );
2510        assert_eq!(
2511            indexes.relationship_properties.active_keys.len(),
2512            self.active_relationship_property_index_count(),
2513            "relationship property index counter diverged from active key set"
2514        );
2515
2516        let mut expected_nodes = PropertyIndexState {
2517            active_keys: indexes.node_properties.active_keys.clone(),
2518            ..PropertyIndexState::default()
2519        };
2520        for (id, node) in self.iter_nodes() {
2521            for (key, value) in &node.properties {
2522                if expected_nodes.is_active(key) {
2523                    expected_nodes.insert_with_scopes(
2524                        id,
2525                        node.labels.iter().map(String::as_str),
2526                        key,
2527                        value,
2528                    );
2529                }
2530            }
2531        }
2532        assert_eq!(
2533            indexes.node_properties.values, expected_nodes.values,
2534            "node property index values diverged from scan"
2535        );
2536        assert_eq!(
2537            indexes.node_properties.scoped_values, expected_nodes.scoped_values,
2538            "node property scoped index values diverged from scan"
2539        );
2540
2541        let mut expected_relationships = PropertyIndexState {
2542            active_keys: indexes.relationship_properties.active_keys.clone(),
2543            ..PropertyIndexState::default()
2544        };
2545        for (id, rel) in self.iter_rels() {
2546            for (key, value) in &rel.properties {
2547                if expected_relationships.is_active(key) {
2548                    expected_relationships.insert_with_scopes(
2549                        id,
2550                        [rel.rel_type.as_str()],
2551                        key,
2552                        value,
2553                    );
2554                }
2555            }
2556        }
2557        assert_eq!(
2558            indexes.relationship_properties.values, expected_relationships.values,
2559            "relationship property index values diverged from scan"
2560        );
2561        assert_eq!(
2562            indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
2563            "relationship property scoped index values diverged from scan"
2564        );
2565    }
2566}