Skip to main content

lora_store/memory/
graph.rs

1//! The [`InMemoryGraph`] data structure: slot-indexed node/relationship
2//! storage, adjacency lists, label/type indexes, and the inherent
3//! helpers that the trait impls in `super::impls` delegate to.
4
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12    LoraPoint, MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue,
13    RelationshipId, RelationshipRecord,
14};
15
16use super::constraint_catalog::{
17    ConstraintCatalog, ConstraintRequest, CreateConstraintError, CreateConstraintOutcome,
18    DropConstraintError, DropConstraintOutcome,
19};
20use super::entity_index_store::IndexBundle;
21use super::fulltext_index::FulltextRegistry;
22use super::hnsw::HnswParams;
23use super::index_catalog::{
24    CreateIndexError, CreateIndexOutcome, DropIndexError, DropIndexOutcome, IndexCatalog,
25    IndexDefinition, IndexRequest, StoredIndexEntity, StoredIndexKind, StoredIndexState,
26};
27use super::point_index::PointRegistry;
28#[cfg(test)]
29use super::property_index::PropertyIndexState;
30use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
31use super::secondary_index_maintenance::SecondaryIndexMutation;
32use super::sorted_property_index::SortedPropertyIndex;
33use super::stats::GraphStats;
34use super::text_index::TrigramRegistry;
35use super::vector_index::{VectorIndexProvider, VectorIndexRegistry, VectorSimilarity};
36
37#[derive(Default)]
38pub struct InMemoryGraph {
39    pub(super) next_node_id: NodeId,
40    pub(super) next_rel_id: RelationshipId,
41
42    /// Slot-indexed node storage: `nodes[id as usize]` is the record at `id`.
43    /// `None` slots are tombstones from deletes (we don't compact). Because
44    /// `next_node_id` is monotonic the slot at `id` is initialized exactly
45    /// when `id < next_node_id` — same identity guarantee the previous
46    /// `BTreeMap<NodeId, NodeRecord>` had, just with O(1) lookup and
47    /// cache-coherent layout.
48    ///
49    /// Records are wrapped in `Arc` so [`Self::clone`] (called on every
50    /// auto-commit write to build a working copy) is `O(N)` atomic
51    /// refcount bumps instead of `O(N)` deep record clones — for a
52    /// 100k-node graph the difference is microseconds vs. tens of
53    /// milliseconds. Mutating a record uses `Arc::make_mut`, which
54    /// clones in place when the refcount is 1 (no concurrent reader)
55    /// and falls back to a single-record clone-on-write when readers
56    /// still hold a snapshot.
57    pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
58    pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
59    /// Live (non-tombstoned) counts kept in sync with `put_*`/`take_*` so
60    /// `node_count` / `relationship_count` stay O(1) — without a counter
61    /// they'd have to scan the slab.
62    pub(super) live_node_count: usize,
63    pub(super) live_rel_count: usize,
64
65    /// Adjacency keyed by NodeId. `outgoing[id]` is the list of relationship
66    /// ids that leave `id`; mirrored on `incoming[id]`. Inner `Vec` instead
67    /// of `BTreeSet` because edges are inserted exactly once and traversal
68    /// only needs sequential iteration; the cache-friendly contiguous layout
69    /// shows up on every traversal hop.
70    pub(super) outgoing: Vec<Vec<RelationshipId>>,
71    pub(super) incoming: Vec<Vec<RelationshipId>>,
72
73    // secondary indexes
74    /// Label -> the (unique, monotonic) node ids that carry it. The inner
75    /// `Vec` instead of `BTreeSet` because every node id is inserted at most
76    /// once per label (no dedup needed) and every consumer iterates the
77    /// whole list anyway — contiguous storage iterates faster than a
78    /// tree-of-pointers, and removes via `swap_remove` stay O(degree-of-label).
79    pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
80    pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
81
82    /// All index machinery — the declared-index catalog, hash-bucket
83    /// property registry, and the per-entity-kind secondary index
84    /// registries (text, sorted, point, fulltext) plus their active
85    /// counters — collapsed into one bundle. See [`IndexBundle`] for
86    /// the rationale. The bundle is a packaging-only abstraction:
87    /// every field accessed through `self.indexes.<x>` lives at the
88    /// same address it would have as a top-level field.
89    pub(super) indexes: IndexBundle,
90
91    /// Catalog of explicitly-created constraints (CREATE CONSTRAINT).
92    /// Deliberately not part of [`IndexBundle`] — constraints describe
93    /// data invariants, not indexed access. The fact that uniqueness /
94    /// key constraints back range indexes is handled in the
95    /// constraint code path, not by the bundle's layout.
96    pub(super) constraint_catalog: RwLock<ConstraintCatalog>,
97    /// Fast-path counter for mutation-time constraint checks. Most
98    /// workloads have no constraints installed; this lets the executor
99    /// skip taking the catalog lock in that case.
100    pub(super) active_constraints: AtomicUsize,
101
102    /// Optional mutation observer. When `Some`, every committed mutation
103    /// fans out to this recorder *after* the in-memory state has been
104    /// updated. The recorder is not part of the graph's identity, so Clone
105    /// and snapshot restore both reset it to `None`.
106    pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
107}
108
109impl std::fmt::Debug for InMemoryGraph {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("InMemoryGraph")
112            .field("next_node_id", &self.next_node_id)
113            .field("next_rel_id", &self.next_rel_id)
114            .field("nodes", &self.nodes)
115            .field("relationships", &self.relationships)
116            .field("outgoing", &self.outgoing)
117            .field("incoming", &self.incoming)
118            .field("nodes_by_label", &self.nodes_by_label)
119            .field("relationships_by_type", &self.relationships_by_type)
120            .field("indexes", &self.indexes)
121            .field(
122                "active_node_property_indexes",
123                &self.active_node_property_index_count(),
124            )
125            .field(
126                "active_relationship_property_indexes",
127                &self.active_relationship_property_index_count(),
128            )
129            .field(
130                "index_catalog_entries",
131                &self
132                    .indexes
133                    .catalog
134                    .read()
135                    .map(|c| c.list().len())
136                    .unwrap_or(0),
137            )
138            .field("active_constraints", &self.active_constraint_count())
139            .field(
140                "active_fulltext_indexes",
141                &self.active_fulltext_index_count(),
142            )
143            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
144            .finish()
145    }
146}
147
148impl Clone for InMemoryGraph {
149    fn clone(&self) -> Self {
150        // Deliberately drop the recorder on clone: a cloned store is a
151        // separate identity; it should not silently share the observer.
152        Self {
153            next_node_id: self.next_node_id,
154            next_rel_id: self.next_rel_id,
155            nodes: self.nodes.clone(),
156            relationships: self.relationships.clone(),
157            live_node_count: self.live_node_count,
158            live_rel_count: self.live_rel_count,
159            outgoing: self.outgoing.clone(),
160            incoming: self.incoming.clone(),
161            nodes_by_label: self.nodes_by_label.clone(),
162            relationships_by_type: self.relationships_by_type.clone(),
163            // IndexBundle::clone deep-copies every owned registry under
164            // its locks, mirroring what the old per-field clones did.
165            // The hash-bucket registry skip-on-empty optimisation is
166            // preserved: `PropertyIndexRegistry::clone` itself is cheap
167            // when no entries exist.
168            indexes: self.indexes.clone(),
169            constraint_catalog: RwLock::new(self.constraint_catalog_read().clone()),
170            active_constraints: AtomicUsize::new(self.active_constraint_count()),
171            recorder: None,
172        }
173    }
174}
175
176impl InMemoryGraph {
177    pub fn new() -> Self {
178        Self::default()
179    }
180
181    pub fn with_capacity_hint(nodes: usize, relationships: usize) -> Self {
182        Self {
183            nodes: Vec::with_capacity(nodes),
184            relationships: Vec::with_capacity(relationships),
185            outgoing: Vec::with_capacity(nodes),
186            incoming: Vec::with_capacity(nodes),
187            ..Self::default()
188        }
189    }
190
191    pub fn contains_node(&self, node_id: NodeId) -> bool {
192        self.node_at(node_id).is_some()
193    }
194
195    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
196        self.rel_at(rel_id).is_some()
197    }
198
199    /// Install (or clear) the mutation recorder. Passing `None` detaches any
200    /// currently-installed recorder. The recorder observes every committed
201    /// mutation *after* it has been applied.
202    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
203        self.recorder = recorder;
204    }
205
206    /// Handle to the currently-installed recorder, if any.
207    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
208        self.recorder.as_ref()
209    }
210
211    /// Emit a mutation event only if a recorder is installed. The event is
212    /// built lazily — callers pass a closure, so when no recorder is
213    /// attached we pay only a `None` check and the cost of constructing the
214    /// event (labels/properties clones) is avoided.
215    #[inline]
216    pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
217        if let Some(rec) = &self.recorder {
218            rec.record(build());
219        }
220    }
221
222    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
223        let next = id
224            .checked_add(1)
225            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
226        self.next_node_id = self.next_node_id.max(next);
227        Ok(())
228    }
229
230    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
231        let next = id
232            .checked_add(1)
233            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
234        self.next_rel_id = self.next_rel_id.max(next);
235        Ok(())
236    }
237
238    pub(super) fn reserve_next_node_slot(&mut self) -> (NodeId, usize) {
239        let id = self.next_node_id;
240        let idx = self
241            .ensure_node_slot_checked(id)
242            .expect("next node id should fit in memory-backed slab");
243        self.bump_next_node_id_past(id)
244            .expect("next node id should leave a valid successor");
245        (id, idx)
246    }
247
248    pub(super) fn try_reserve_next_rel_slot(&mut self) -> Option<(RelationshipId, usize)> {
249        let id = self.next_rel_id;
250        let idx = self.ensure_rel_slot_checked(id).ok()?;
251        self.bump_next_rel_id_past(id).ok()?;
252        Some((id, idx))
253    }
254
255    // ---------- Slab access helpers ----------
256    //
257    // Stand-in for the BTreeMap API the previous storage used. They keep the
258    // call sites readable while the underlying layout is positional Vec.
259
260    #[inline]
261    pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
262        self.nodes
263            .get(Self::slot_index(id)?)
264            .and_then(|s| s.as_ref())
265            .map(|arc| arc.as_ref())
266    }
267
268    /// Mutable handle to a node record, doing copy-on-write only when the
269    /// `Arc` is shared with a concurrent reader. With no readers (the
270    /// common case after a fresh write_store clone), `Arc::make_mut`
271    /// upgrades in place — no record clone.
272    #[inline]
273    pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
274        self.nodes
275            .get_mut(Self::slot_index(id)?)
276            .and_then(|s| s.as_mut())
277            .map(Arc::make_mut)
278    }
279
280    #[inline]
281    pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
282        self.relationships
283            .get(Self::slot_index(id)?)
284            .and_then(|s| s.as_ref())
285            .map(|arc| arc.as_ref())
286    }
287
288    #[inline]
289    pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
290        self.relationships
291            .get_mut(Self::slot_index(id)?)
292            .and_then(|s| s.as_mut())
293            .map(Arc::make_mut)
294    }
295
296    /// Resize the node-keyed Vecs so `id as usize` is in range. Adjacency
297    /// lists are kept in lockstep with `nodes`, so a freshly-grown slot has
298    /// empty outgoing/incoming Vecs ready to receive edges.
299    fn slot_len_for_id(id: u64, kind: &str) -> Result<usize, String> {
300        let idx = usize::try_from(id)
301            .map_err(|_| format!("{kind} id {id} does not fit in usize on this platform"))?;
302        idx.checked_add(1)
303            .ok_or_else(|| format!("{kind} id {id} leaves no valid slab slot"))
304    }
305
306    #[inline]
307    fn slot_index(id: u64) -> Option<usize> {
308        usize::try_from(id).ok()
309    }
310
311    fn ensure_node_slot_checked(&mut self, id: NodeId) -> Result<usize, String> {
312        let target = Self::slot_len_for_id(id, "node")?;
313        if self.nodes.len() < target {
314            let additional = target - self.nodes.len();
315            self.nodes.try_reserve_exact(additional).map_err(|e| {
316                format!("node id {id} requires {target} slots, but allocation failed: {e}")
317            })?;
318            self.outgoing.try_reserve_exact(additional).map_err(|e| {
319                format!(
320                    "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
321                )
322            })?;
323            self.incoming.try_reserve_exact(additional).map_err(|e| {
324                format!(
325                    "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
326                )
327            })?;
328            self.nodes.resize_with(target, || None);
329            self.outgoing.resize_with(target, Vec::new);
330            self.incoming.resize_with(target, Vec::new);
331        }
332        Ok(target - 1)
333    }
334
335    fn ensure_rel_slot_checked(&mut self, id: RelationshipId) -> Result<usize, String> {
336        let target = Self::slot_len_for_id(id, "relationship")?;
337        if self.relationships.len() < target {
338            self.relationships
339                .try_reserve_exact(target - self.relationships.len())
340                .map_err(|e| {
341                    format!(
342                        "relationship id {id} requires {target} slots, but allocation failed: {e}"
343                    )
344                })?;
345            self.relationships.resize_with(target, || None);
346        }
347        Ok(target - 1)
348    }
349
350    fn ensure_node_slot(&mut self, id: NodeId) -> usize {
351        self.ensure_node_slot_checked(id)
352            .expect("node id should fit in memory-backed slab")
353    }
354
355    pub(super) fn put_node_checked(&mut self, id: NodeId, node: NodeRecord) -> Result<(), String> {
356        let idx = self.ensure_node_slot_checked(id)?;
357        self.put_node_at_slot(idx, node);
358        Ok(())
359    }
360
361    pub(super) fn put_rel_checked(
362        &mut self,
363        id: RelationshipId,
364        rel: RelationshipRecord,
365    ) -> Result<(), String> {
366        let idx = self.ensure_rel_slot_checked(id)?;
367        self.put_rel_at_slot(idx, rel);
368        Ok(())
369    }
370
371    pub(super) fn put_node_at_slot(&mut self, idx: usize, node: NodeRecord) {
372        let was_present = self.nodes[idx].is_some();
373        self.nodes[idx] = Some(Arc::new(node));
374        if !was_present {
375            self.live_node_count += 1;
376        }
377    }
378
379    pub(super) fn put_rel_at_slot(&mut self, idx: usize, rel: RelationshipRecord) {
380        let was_present = self.relationships[idx].is_some();
381        self.relationships[idx] = Some(Arc::new(rel));
382        if !was_present {
383            self.live_rel_count += 1;
384        }
385    }
386
387    pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
388        let idx = Self::slot_index(id)?;
389        let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
390        if removed.is_some() {
391            self.live_node_count -= 1;
392            // Also clear the per-id adjacency entries so the memory is reclaimed
393            // on the typical "delete every node" pattern. We deliberately do not
394            // shrink the outer Vec — leaving the slot lets new ids reuse the
395            // same index without growth churn (and `next_node_id` is monotonic
396            // anyway, so no immediate reuse).
397            if let Some(out) = self.outgoing.get_mut(idx) {
398                out.clear();
399            }
400            if let Some(inc) = self.incoming.get_mut(idx) {
401                inc.clear();
402            }
403        }
404        // Unwrap the Arc — `try_unwrap` returns the inner `NodeRecord`
405        // without cloning when our slab held the only reference, falling
406        // back to a clone only when concurrent readers still hold a
407        // snapshot Arc.
408        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
409    }
410
411    pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
412        let idx = Self::slot_index(id)?;
413        let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
414        if removed.is_some() {
415            self.live_rel_count -= 1;
416        }
417        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
418    }
419
420    #[inline]
421    pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
422        self.outgoing.get(Self::slot_index(id)?).map(Vec::as_slice)
423    }
424
425    #[inline]
426    pub(super) fn incoming_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
427        self.incoming.get(Self::slot_index(id)?).map(Vec::as_slice)
428    }
429
430    #[inline]
431    fn try_for_each_adjacent_slice<F, E>(
432        &self,
433        node_id: NodeId,
434        types: &[String],
435        adj: &[RelationshipId],
436        skip_self_loops: bool,
437        visit: &mut F,
438    ) -> Result<(), E>
439    where
440        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
441    {
442        let single_type = match types {
443            [single] => Some(single.as_str()),
444            _ => None,
445        };
446        let has_type_filter = !types.is_empty();
447
448        for &rel_id in adj {
449            let Some(rel) = self.rel_at(rel_id) else {
450                continue;
451            };
452            if skip_self_loops && rel.src == node_id && rel.dst == node_id {
453                continue;
454            }
455            if let Some(single) = single_type {
456                if rel.rel_type != single {
457                    continue;
458                }
459            } else if has_type_filter && !types.iter().any(|t| t == &rel.rel_type) {
460                continue;
461            }
462            let Some(other_id) = Self::other_endpoint(rel, node_id) else {
463                continue;
464            };
465            visit(rel_id, other_id)?;
466        }
467        Ok(())
468    }
469
470    #[inline]
471    pub(super) fn try_for_each_adjacent_id_unchecked<F, E>(
472        &self,
473        node_id: NodeId,
474        direction: Direction,
475        types: &[String],
476        mut visit: F,
477    ) -> Result<(), E>
478    where
479        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
480    {
481        match direction {
482            Direction::Right => {
483                if let Some(adj) = self.outgoing_at(node_id) {
484                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
485                }
486            }
487            Direction::Left => {
488                if let Some(adj) = self.incoming_at(node_id) {
489                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
490                }
491            }
492            Direction::Undirected => {
493                if let Some(adj) = self.outgoing_at(node_id) {
494                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
495                }
496                if let Some(adj) = self.incoming_at(node_id) {
497                    self.try_for_each_adjacent_slice(node_id, types, adj, true, &mut visit)?;
498                }
499            }
500        }
501
502        Ok(())
503    }
504
505    #[inline]
506    pub(super) fn try_for_each_adjacent_id<F, E>(
507        &self,
508        node_id: NodeId,
509        direction: Direction,
510        types: &[String],
511        visit: F,
512    ) -> Result<(), E>
513    where
514        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
515    {
516        if self.node_at(node_id).is_none() {
517            return Ok(());
518        }
519        self.try_for_each_adjacent_id_unchecked(node_id, direction, types, visit)
520    }
521
522    pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
523        self.nodes
524            .iter()
525            .enumerate()
526            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
527    }
528
529    pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
530        self.nodes
531            .iter()
532            .filter_map(|s| s.as_ref())
533            .map(|arc| arc.as_ref())
534    }
535
536    pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
537        self.relationships
538            .iter()
539            .enumerate()
540            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
541    }
542
543    pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
544        self.relationships
545            .iter()
546            .filter_map(|s| s.as_ref())
547            .map(|arc| arc.as_ref())
548    }
549
550    pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
551        self.nodes
552            .iter()
553            .enumerate()
554            .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
555    }
556
557    pub(super) fn iter_rels(
558        &self,
559    ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
560        self.relationships
561            .iter()
562            .enumerate()
563            .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
564    }
565
566    /// Add `rel_id` to `node_id`'s outgoing list. Relies on the monotonic-id
567    /// invariant: relationship ids are allocated once and never re-used, so
568    /// the bucket can never see a duplicate.
569    fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
570        let idx = self.ensure_node_slot(node_id);
571        self.outgoing[idx].push(rel_id);
572    }
573
574    fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
575        let idx = self.ensure_node_slot(node_id);
576        self.incoming[idx].push(rel_id);
577    }
578
579    /// Remove `rel_id` from `node_id`'s outgoing list. `swap_remove` keeps
580    /// the operation O(1) — adjacency order doesn't carry semantic meaning.
581    fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
582        if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.outgoing.get_mut(idx)) {
583            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
584                v.swap_remove(pos);
585            }
586        }
587    }
588
589    fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
590        if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.incoming.get_mut(idx)) {
591            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
592                v.swap_remove(pos);
593            }
594        }
595    }
596
597    pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
598        let mut seen = BTreeSet::new();
599
600        labels
601            .into_iter()
602            .map(|s| s.trim().to_string())
603            .filter(|s| !s.is_empty())
604            .filter(|s| seen.insert(s.clone()))
605            .collect()
606    }
607
608    pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
609        // Hot path: skip the `String` alloc when the label bucket already
610        // exists. The monotonic-id invariant on the create path guarantees
611        // `node_id` is unique, so we push unconditionally; the previous
612        // `contains` guard turned bulk CREATE into O(n²).
613        if let Some(bucket) = self.nodes_by_label.get_mut(label) {
614            bucket.push(node_id);
615        } else {
616            self.nodes_by_label.insert(label.to_string(), vec![node_id]);
617        }
618    }
619
620    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
621        if let Some(ids) = self.nodes_by_label.get_mut(label) {
622            if let Some(pos) = ids.iter().position(|&id| id == node_id) {
623                ids.swap_remove(pos);
624            }
625            if ids.is_empty() {
626                self.nodes_by_label.remove(label);
627            }
628        }
629    }
630
631    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
632        // See `insert_node_label_index` for the same hot-path rationale.
633        if let Some(bucket) = self.relationships_by_type.get_mut(rel_type) {
634            bucket.push(rel_id);
635        } else {
636            self.relationships_by_type
637                .insert(rel_type.to_string(), vec![rel_id]);
638        }
639    }
640
641    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
642        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
643            if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
644                ids.swap_remove(pos);
645            }
646            if ids.is_empty() {
647                self.relationships_by_type.remove(rel_type);
648            }
649        }
650    }
651
652    pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
653        self.indexes
654            .properties
655            .read()
656            .unwrap_or_else(|poisoned| poisoned.into_inner())
657    }
658
659    pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
660        self.indexes
661            .properties
662            .write()
663            .unwrap_or_else(|poisoned| poisoned.into_inner())
664    }
665
666    pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
667        self.indexes
668            .properties
669            .get_mut()
670            .unwrap_or_else(|poisoned| poisoned.into_inner())
671    }
672
673    #[inline]
674    pub(super) fn active_node_property_index_count(&self) -> usize {
675        self.indexes
676            .active_node_property_indexes
677            .load(Ordering::Relaxed)
678    }
679
680    #[inline]
681    pub(super) fn active_relationship_property_index_count(&self) -> usize {
682        self.indexes
683            .active_relationship_property_indexes
684            .load(Ordering::Relaxed)
685    }
686
687    #[inline]
688    pub(super) fn active_constraint_count(&self) -> usize {
689        self.active_constraints.load(Ordering::Relaxed)
690    }
691
692    #[inline]
693    pub(super) fn has_active_constraints(&self) -> bool {
694        self.active_constraint_count() != 0
695    }
696
697    #[inline]
698    pub(super) fn active_fulltext_index_count(&self) -> usize {
699        self.indexes.active_fulltext_indexes.load(Ordering::Relaxed)
700    }
701
702    #[inline]
703    pub(super) fn has_active_fulltext_indexes(&self) -> bool {
704        self.active_fulltext_index_count() != 0
705    }
706
707    pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
708        self.active_node_property_index_count() != 0
709            && self.indexes_mut().node_properties.is_active(key)
710    }
711
712    pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
713        self.active_relationship_property_index_count() != 0
714            && self.indexes_mut().relationship_properties.is_active(key)
715    }
716
717    pub(super) fn ensure_node_property_index(&self, key: &str) {
718        {
719            let indexes = self.indexes_read();
720            if indexes.node_properties.is_active(key) {
721                return;
722            }
723        }
724
725        let mut indexes = self.indexes_write();
726        if indexes.node_properties.is_active(key) {
727            return;
728        }
729
730        for (id, node) in self.iter_nodes() {
731            if let Some(value) = node.properties.get(key) {
732                indexes.node_properties.insert_with_scopes(
733                    id,
734                    node.labels.iter().map(String::as_str),
735                    key,
736                    value,
737                );
738            }
739        }
740        if indexes.node_properties.activate(key) {
741            self.indexes
742                .active_node_property_indexes
743                .fetch_add(1, Ordering::Relaxed);
744        }
745    }
746
747    pub(super) fn ensure_relationship_property_index(&self, key: &str) {
748        {
749            let indexes = self.indexes_read();
750            if indexes.relationship_properties.is_active(key) {
751                return;
752            }
753        }
754
755        let mut indexes = self.indexes_write();
756        if indexes.relationship_properties.is_active(key) {
757            return;
758        }
759
760        for (id, rel) in self.iter_rels() {
761            if let Some(value) = rel.properties.get(key) {
762                indexes.relationship_properties.insert_with_scopes(
763                    id,
764                    [rel.rel_type.as_str()],
765                    key,
766                    value,
767                );
768            }
769        }
770        if indexes.relationship_properties.activate(key) {
771            self.indexes
772                .active_relationship_property_indexes
773                .fetch_add(1, Ordering::Relaxed);
774        }
775    }
776
777    pub(super) fn index_catalog_read(&self) -> std::sync::RwLockReadGuard<'_, IndexCatalog> {
778        self.indexes
779            .catalog
780            .read()
781            .unwrap_or_else(|poisoned| poisoned.into_inner())
782    }
783
784    pub(super) fn index_catalog_write(&self) -> RwLockWriteGuard<'_, IndexCatalog> {
785        self.indexes
786            .catalog
787            .write()
788            .unwrap_or_else(|poisoned| poisoned.into_inner())
789    }
790
791    pub(super) fn constraint_catalog_read(
792        &self,
793    ) -> std::sync::RwLockReadGuard<'_, ConstraintCatalog> {
794        self.constraint_catalog
795            .read()
796            .unwrap_or_else(|poisoned| poisoned.into_inner())
797    }
798
799    pub(super) fn constraint_catalog_write(&self) -> RwLockWriteGuard<'_, ConstraintCatalog> {
800        self.constraint_catalog
801            .write()
802            .unwrap_or_else(|poisoned| poisoned.into_inner())
803    }
804
805    /// Register an explicitly-declared index in the catalog and, when
806    /// applicable, force the underlying property-index buckets to be
807    /// populated so equality lookups can use them immediately.
808    ///
809    /// Named with a `register_` prefix to avoid colliding with the
810    /// trait method `GraphStorageMut::create_index` — the trait impl
811    /// in `impls.rs` delegates here.
812    #[allow(clippy::result_large_err)]
813    pub(super) fn register_index(
814        &self,
815        request: IndexRequest,
816        if_not_exists: bool,
817    ) -> Result<CreateIndexOutcome, CreateIndexError> {
818        self.register_index_with_recording(request, if_not_exists, true)
819    }
820
821    #[allow(clippy::result_large_err)]
822    fn register_index_with_recording(
823        &self,
824        request: IndexRequest,
825        if_not_exists: bool,
826        record_event: bool,
827    ) -> Result<CreateIndexOutcome, CreateIndexError> {
828        let request_for_event = record_event.then(|| request.clone());
829        let outcome = {
830            let mut catalog = self.index_catalog_write();
831            catalog.try_create(request, if_not_exists)?
832        };
833
834        if let CreateIndexOutcome::Created(def) = &outcome {
835            self.populate_index_data(def);
836        }
837
838        // Both Created and NoOpExists are committed catalog states; we
839        // log only Created because NoOpExists implies a redundant DDL
840        // that adds nothing to durable state.
841        if matches!(outcome, CreateIndexOutcome::Created(_)) {
842            if let Some(request_for_event) = request_for_event {
843                self.emit(|| crate::MutationEvent::CreateIndex {
844                    request: request_for_event,
845                    if_not_exists,
846                });
847            }
848        }
849
850        Ok(outcome)
851    }
852
853    /// Replay a CreateIndex event against an empty graph. Mirrors the
854    /// `replay_create_node` shape: callers must invoke before installing
855    /// a recorder so we don't re-emit during recovery.
856    #[doc(hidden)]
857    pub fn replay_create_index(
858        &mut self,
859        request: IndexRequest,
860        if_not_exists: bool,
861    ) -> Result<(), String> {
862        if self.recorder.is_some() {
863            return Err("cannot replay create_index while a mutation recorder is installed".into());
864        }
865        self.register_index(request, if_not_exists)
866            .map(|_| ())
867            .map_err(|e| e.to_string())
868    }
869
870    /// Replay a DropIndex event.
871    #[doc(hidden)]
872    pub fn replay_drop_index(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
873        if self.recorder.is_some() {
874            return Err("cannot replay drop_index while a mutation recorder is installed".into());
875        }
876        self.drop_named_index(name, if_exists)
877            .map(|_| ())
878            .map_err(|e| e.to_string())
879    }
880
881    /// Register a constraint. For uniqueness/key kinds this also
882    /// registers a backing RANGE index in the index catalog under the
883    /// same name. Validation of existing data is the caller's
884    /// responsibility (the enforcement layer runs a pre-create scan
885    /// before this method commits).
886    pub(super) fn register_constraint(
887        &self,
888        request: ConstraintRequest,
889        if_not_exists: bool,
890    ) -> Result<CreateConstraintOutcome, CreateConstraintError> {
891        // Constraint-level conflicts (22N65/66/67) take precedence over
892        // index-catalog conflicts: if the request collides with an
893        // existing *constraint* shape or name, we never get to the
894        // backing-index step.
895        {
896            let constraint_catalog = self.constraint_catalog_read();
897            if let Some(existing) = constraint_catalog.find_equivalent(&request) {
898                let cloned = existing.clone();
899                drop(constraint_catalog);
900                if if_not_exists {
901                    return Ok(CreateConstraintOutcome::NoOpExists(cloned));
902                }
903                return Err(CreateConstraintError::EquivalentConstraintExists(
904                    cloned.name,
905                ));
906            }
907            if let Some(existing) = constraint_catalog.get(&request.name) {
908                let cloned = existing.clone();
909                drop(constraint_catalog);
910                if if_not_exists {
911                    return Ok(CreateConstraintOutcome::NoOpExists(cloned));
912                }
913                return Err(CreateConstraintError::DuplicateName(cloned.name));
914            }
915            if let Some(existing) = constraint_catalog.find_same_schema(&request) {
916                let cloned = existing.clone();
917                drop(constraint_catalog);
918                if super::constraint_catalog::kinds_conflict_for_validation(
919                    &cloned.kind,
920                    &request.kind,
921                ) {
922                    if if_not_exists {
923                        return Ok(CreateConstraintOutcome::NoOpExists(cloned));
924                    }
925                    return Err(CreateConstraintError::ConflictingConstraint(cloned.name));
926                }
927            }
928        }
929
930        // Index-catalog conflicts only matter for constraints that need
931        // a backing range index. The catalog won't yet own one for this
932        // request — that registration happens below — so any existing
933        // entry under the same name or schema is from a foreign index.
934        if request.kind.requires_backing_index() {
935            let idx_catalog = self.index_catalog_read();
936            if idx_catalog.get(&request.name).is_some() {
937                return Err(CreateConstraintError::DuplicateIndexName(
938                    request.name.clone(),
939                ));
940            }
941            let conflict = idx_catalog.list().into_iter().find(|def| {
942                def.kind == StoredIndexKind::Range
943                    && def.entity == request.entity
944                    && def.label.as_deref() == Some(request.label.as_str())
945                    && def.properties == request.properties
946                    && def.name != request.name
947            });
948            drop(idx_catalog);
949            if let Some(def) = conflict {
950                return Err(CreateConstraintError::BackingIndexConflict(format!(
951                    "(:{} {{{}}}) already covered by index `{}`",
952                    request.label,
953                    request.properties.join(", "),
954                    def.name,
955                )));
956            }
957        }
958
959        let owns_backing = request.kind.requires_backing_index();
960        let request_for_event = request.clone();
961        let outcome = {
962            let mut catalog = self.constraint_catalog_write();
963            catalog.try_create(request, if_not_exists)?
964        };
965
966        if let CreateConstraintOutcome::Created(def) = &outcome {
967            // Pre-create data scan: if the live graph already violates
968            // the constraint, fail and roll back the catalog write.
969            // Creating constraints when conflicting data exists fails
970            // before the catalog change is retained (22N77/79/80).
971            if let Err(violation) = self.validate_existing_data_for_constraint(def) {
972                let mut catalog = self.constraint_catalog_write();
973                let _ = catalog.try_drop(&def.name, true);
974                return Err(CreateConstraintError::DataViolation(violation.to_string()));
975            }
976        }
977
978        if let CreateConstraintOutcome::Created(def) = &outcome {
979            if owns_backing {
980                // Register a backing RANGE index under the same name. This
981                // is an implementation detail of the constraint, so WAL and
982                // snapshot replay record only the constraint mutation.
983                let idx_request = IndexRequest {
984                    explicit_name: Some(def.name.clone()),
985                    kind: StoredIndexKind::Range,
986                    entity: def.entity,
987                    label: Some(def.label.clone()),
988                    additional_labels: Vec::new(),
989                    properties: def.properties.clone(),
990                    options: Default::default(),
991                };
992                // Errors from the backing-index registration unwind the
993                // constraint registration to keep the two catalogs in
994                // step.
995                if let Err(err) = self.register_index_with_recording(idx_request, true, false) {
996                    let mut catalog = self.constraint_catalog_write();
997                    let _ = catalog.try_drop(&def.name, true);
998                    return Err(CreateConstraintError::BackingIndexConflict(err.to_string()));
999                }
1000            }
1001            self.emit(|| crate::MutationEvent::CreateConstraint {
1002                request: request_for_event,
1003                if_not_exists,
1004            });
1005            self.active_constraints.fetch_add(1, Ordering::Relaxed);
1006        }
1007
1008        Ok(outcome)
1009    }
1010
1011    /// Replay a CreateConstraint event against a recorder-detached graph.
1012    #[doc(hidden)]
1013    pub fn replay_create_constraint(
1014        &mut self,
1015        request: ConstraintRequest,
1016        if_not_exists: bool,
1017    ) -> Result<(), String> {
1018        if self.recorder.is_some() {
1019            return Err(
1020                "cannot replay create_constraint while a mutation recorder is installed".into(),
1021            );
1022        }
1023        self.register_constraint(request, if_not_exists)
1024            .map(|_| ())
1025            .map_err(|e| e.to_string())
1026    }
1027
1028    /// Replay a DropConstraint event.
1029    #[doc(hidden)]
1030    pub fn replay_drop_constraint(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
1031        if self.recorder.is_some() {
1032            return Err(
1033                "cannot replay drop_constraint while a mutation recorder is installed".into(),
1034            );
1035        }
1036        self.drop_named_constraint(name, if_exists)
1037            .map(|_| ())
1038            .map_err(|e| e.to_string())
1039    }
1040
1041    /// Inverse of [`Self::register_constraint`]. Cascades to the backing
1042    /// range index when one is owned.
1043    pub(super) fn drop_named_constraint(
1044        &self,
1045        name: &str,
1046        if_exists: bool,
1047    ) -> Result<DropConstraintOutcome, DropConstraintError> {
1048        let outcome = {
1049            let mut catalog = self.constraint_catalog_write();
1050            catalog.try_drop(name, if_exists)?
1051        };
1052        if let DropConstraintOutcome::Dropped(def) = &outcome {
1053            if let Some(index_name) = def.owned_index.as_deref() {
1054                // The backing index is owned exclusively by the
1055                // constraint, so dropping it is unconditional.
1056                let _ = self.drop_named_index_inner(index_name, true, false);
1057            }
1058            self.active_constraints.fetch_sub(1, Ordering::Relaxed);
1059            self.emit(|| crate::MutationEvent::DropConstraint {
1060                name: name.to_string(),
1061                if_exists,
1062            });
1063        }
1064        Ok(outcome)
1065    }
1066
1067    /// Inverse of [`Self::register_index`]. Removes the catalog entry
1068    /// and (for RANGE) leaves the underlying property-index buckets in
1069    /// place — they may still be needed for lazy-activation lookups
1070    /// even after the explicit DDL declaration is gone.
1071    pub(super) fn drop_named_index(
1072        &self,
1073        name: &str,
1074        if_exists: bool,
1075    ) -> Result<DropIndexOutcome, DropIndexError> {
1076        self.drop_named_index_inner(name, if_exists, true)
1077    }
1078
1079    fn drop_named_index_inner(
1080        &self,
1081        name: &str,
1082        if_exists: bool,
1083        emit_event: bool,
1084    ) -> Result<DropIndexOutcome, DropIndexError> {
1085        if let Some(owner) = self
1086            .constraint_catalog_read()
1087            .constraint_owning_index(name)
1088            .cloned()
1089        {
1090            return Err(DropIndexError::ConstraintOwned {
1091                index: name.to_string(),
1092                constraint: owner.name,
1093            });
1094        }
1095
1096        let outcome = {
1097            let mut catalog = self.index_catalog_write();
1098            catalog.try_drop(name, if_exists)?
1099        };
1100        if let DropIndexOutcome::Dropped(def) = &outcome {
1101            // Release backing structures keyed off the dropped def.
1102            match def.kind {
1103                StoredIndexKind::Text => {
1104                    if let Some(label) = def.label.as_deref() {
1105                        for prop in &def.properties {
1106                            self.deactivate_text_scope(def.entity, label, prop);
1107                        }
1108                    }
1109                }
1110                StoredIndexKind::Range => {
1111                    if let Some(label) = def.label.as_deref() {
1112                        for prop in &def.properties {
1113                            self.deactivate_sorted_scope(def.entity, label, prop);
1114                        }
1115                    }
1116                }
1117                StoredIndexKind::Point => {
1118                    if let Some(label) = def.label.as_deref() {
1119                        for prop in &def.properties {
1120                            self.deactivate_point_scope(def.entity, label, prop);
1121                        }
1122                    }
1123                }
1124                StoredIndexKind::Lookup => {
1125                    // Lookup rides on the eagerly-maintained label/type
1126                    // indexes; nothing to release.
1127                }
1128                StoredIndexKind::Vector => {
1129                    self.deactivate_vector_index(def.entity, &def.name);
1130                }
1131                StoredIndexKind::Fulltext => {
1132                    self.deactivate_fulltext_index(def.entity, &def.name);
1133                }
1134            }
1135            if emit_event {
1136                self.emit(|| crate::MutationEvent::DropIndex {
1137                    name: name.to_string(),
1138                    if_exists,
1139                });
1140            }
1141        }
1142        Ok(outcome)
1143    }
1144
1145    fn populate_index_data(&self, def: &IndexDefinition) {
1146        // RANGE: piggy-back on the existing lazy property-index buckets.
1147        // TEXT: build a trigram inverted index over the existing entity
1148        //       data for the (label, property) tuple.
1149        // POINT: build a grid-bucket spatial index over the existing
1150        //        entity data.
1151        // LOOKUP: catalog-only; existing label/type indexes already
1152        //         answer the predicates.
1153        match def.kind {
1154            StoredIndexKind::Range => {
1155                for key in &def.properties {
1156                    match def.entity {
1157                        StoredIndexEntity::Node => self.ensure_node_property_index(key),
1158                        StoredIndexEntity::Relationship => {
1159                            self.ensure_relationship_property_index(key)
1160                        }
1161                    }
1162                    if let Some(label) = def.label.as_deref() {
1163                        self.activate_sorted_scope(def.entity, label, key);
1164                    }
1165                }
1166            }
1167            StoredIndexKind::Text => {
1168                let label = match def.label.as_deref() {
1169                    Some(l) => l,
1170                    None => return,
1171                };
1172                for property in &def.properties {
1173                    self.activate_text_scope(def.entity, label, property);
1174                }
1175            }
1176            StoredIndexKind::Point => {
1177                let label = match def.label.as_deref() {
1178                    Some(l) => l,
1179                    None => return,
1180                };
1181                let cell_size = PointRegistry::cell_size_from_options(&def.options);
1182                for property in &def.properties {
1183                    self.activate_point_scope(def.entity, label, property, cell_size);
1184                }
1185            }
1186            StoredIndexKind::Fulltext => {
1187                let labels: Vec<String> = def.all_labels().map(String::from).collect();
1188                if labels.is_empty() {
1189                    return;
1190                }
1191                self.activate_fulltext_index(def.entity, &def.name, &labels, &def.properties);
1192            }
1193            StoredIndexKind::Vector => {
1194                let label = match def.label.as_deref() {
1195                    Some(l) => l,
1196                    None => return,
1197                };
1198                let property = match def.properties.first() {
1199                    Some(p) => p.as_str(),
1200                    None => return,
1201                };
1202                let similarity = VectorSimilarity::from_options(&def.options)
1203                    .unwrap_or(VectorSimilarity::Cosine);
1204                let provider = VectorIndexProvider::from_options(&def.options)
1205                    .unwrap_or(VectorIndexProvider::Flat);
1206                let hnsw = HnswParams::from_options(&def.options);
1207                let lazy = matches!(
1208                    def.options.get("vector.populate.async"),
1209                    Some(super::index_catalog::IndexConfigValue::Bool(true))
1210                );
1211                self.activate_vector_index(
1212                    def.entity, &def.name, label, property, similarity, provider, hnsw, lazy,
1213                );
1214                if lazy {
1215                    self.index_catalog_write()
1216                        .set_state(&def.name, StoredIndexState::Populating);
1217                }
1218            }
1219            // LOOKUP rides on the label/type indexes maintained eagerly.
1220            StoredIndexKind::Lookup => {}
1221        }
1222    }
1223
1224    pub(super) fn text_indexes_read(
1225        &self,
1226        entity: StoredIndexEntity,
1227    ) -> std::sync::RwLockReadGuard<'_, TrigramRegistry> {
1228        self.indexes.text.read(entity)
1229    }
1230
1231    pub(super) fn text_indexes_write(
1232        &self,
1233        entity: StoredIndexEntity,
1234    ) -> RwLockWriteGuard<'_, TrigramRegistry> {
1235        self.indexes.text.write(entity)
1236    }
1237
1238    pub(super) fn fulltext_indexes_read(
1239        &self,
1240        entity: StoredIndexEntity,
1241    ) -> std::sync::RwLockReadGuard<'_, FulltextRegistry> {
1242        self.indexes.fulltext.read(entity)
1243    }
1244
1245    #[allow(dead_code)]
1246    pub(super) fn fulltext_indexes_write(
1247        &self,
1248        entity: StoredIndexEntity,
1249    ) -> RwLockWriteGuard<'_, FulltextRegistry> {
1250        self.indexes.fulltext.write(entity)
1251    }
1252
1253    fn activate_text_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1254        if !self.text_indexes_write(entity).add_scope(label, property) {
1255            return;
1256        }
1257
1258        let backfill: Vec<(u64, String)> = match entity {
1259            StoredIndexEntity::Node => self
1260                .iter_nodes()
1261                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1262                .filter_map(|(id, node)| match node.properties.get(property) {
1263                    Some(PropertyValue::String(value)) => Some((id, value.clone())),
1264                    _ => None,
1265                })
1266                .collect(),
1267            StoredIndexEntity::Relationship => self
1268                .iter_rels()
1269                .filter(|(_, rel)| rel.rel_type == label)
1270                .filter_map(|(id, rel)| match rel.properties.get(property) {
1271                    Some(PropertyValue::String(value)) => Some((id, value.clone())),
1272                    _ => None,
1273                })
1274                .collect(),
1275        };
1276
1277        let mut registry = self.text_indexes_write(entity);
1278        for (id, value) in backfill {
1279            registry.insert(label, property, id, &value);
1280        }
1281    }
1282
1283    /// Drop a (label, property) text scope, decrementing the refcount.
1284    pub(super) fn deactivate_text_scope(
1285        &self,
1286        entity: StoredIndexEntity,
1287        label: &str,
1288        property: &str,
1289    ) {
1290        self.text_indexes_write(entity)
1291            .remove_scope(label, property);
1292    }
1293
1294    fn activate_fulltext_index(
1295        &self,
1296        entity: StoredIndexEntity,
1297        name: &str,
1298        labels: &[String],
1299        properties: &[String],
1300    ) {
1301        use super::fulltext_index::{term_counts_for_properties, TermCounts};
1302
1303        {
1304            let mut registry = self.fulltext_indexes_write(entity);
1305            registry.register(name.to_string(), labels.to_vec(), properties.to_vec());
1306        }
1307        self.indexes
1308            .active_fulltext_indexes
1309            .fetch_add(1, Ordering::Relaxed);
1310
1311        // Backfill: walk every entity matching any label, tokenise covered
1312        // string properties, install one posting batch per entity.
1313        let backfill: Vec<(u64, TermCounts)> = match entity {
1314            StoredIndexEntity::Node => self
1315                .iter_nodes()
1316                .filter(|(_, node)| {
1317                    labels
1318                        .iter()
1319                        .any(|wanted| node.labels.iter().any(|l| l == wanted))
1320                })
1321                .map(|(id, node)| {
1322                    let counts = term_counts_for_properties(&node.properties, properties);
1323                    (id, counts)
1324                })
1325                .filter(|(_, c)| !c.is_empty())
1326                .collect(),
1327            StoredIndexEntity::Relationship => self
1328                .iter_rels()
1329                .filter(|(_, rel)| labels.iter().any(|wanted| wanted == &rel.rel_type))
1330                .map(|(id, rel)| {
1331                    let counts = term_counts_for_properties(&rel.properties, properties);
1332                    (id, counts)
1333                })
1334                .filter(|(_, c)| !c.is_empty())
1335                .collect(),
1336        };
1337
1338        let mut registry = self.fulltext_indexes_write(entity);
1339        if let Some(index) = registry.get_mut(name) {
1340            for (id, counts) in backfill {
1341                index.reindex_entity(id, counts);
1342            }
1343        }
1344    }
1345
1346    pub(super) fn deactivate_fulltext_index(&self, entity: StoredIndexEntity, name: &str) {
1347        self.fulltext_indexes_write(entity).deregister(name);
1348        self.indexes
1349            .active_fulltext_indexes
1350            .fetch_sub(1, Ordering::Relaxed);
1351    }
1352
1353    pub(super) fn sorted_indexes_read(
1354        &self,
1355        entity: StoredIndexEntity,
1356    ) -> std::sync::RwLockReadGuard<'_, SortedPropertyIndex> {
1357        self.indexes.sorted.read(entity)
1358    }
1359
1360    pub(super) fn sorted_indexes_write(
1361        &self,
1362        entity: StoredIndexEntity,
1363    ) -> RwLockWriteGuard<'_, SortedPropertyIndex> {
1364        self.indexes.sorted.write(entity)
1365    }
1366
1367    fn activate_sorted_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1368        if !self.sorted_indexes_write(entity).add_scope(label, property) {
1369            return;
1370        }
1371
1372        let backfill: Vec<(u64, PropertyValue)> = match entity {
1373            StoredIndexEntity::Node => self
1374                .iter_nodes()
1375                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1376                .filter_map(|(id, node)| {
1377                    node.properties
1378                        .get(property)
1379                        .map(|value| (id, value.clone()))
1380                })
1381                .collect(),
1382            StoredIndexEntity::Relationship => self
1383                .iter_rels()
1384                .filter(|(_, rel)| rel.rel_type == label)
1385                .filter_map(|(id, rel)| {
1386                    rel.properties
1387                        .get(property)
1388                        .map(|value| (id, value.clone()))
1389                })
1390                .collect(),
1391        };
1392
1393        let mut registry = self.sorted_indexes_write(entity);
1394        for (id, value) in backfill {
1395            registry.insert(label, property, id, &value);
1396        }
1397    }
1398
1399    pub(super) fn deactivate_sorted_scope(
1400        &self,
1401        entity: StoredIndexEntity,
1402        label: &str,
1403        property: &str,
1404    ) {
1405        self.sorted_indexes_write(entity)
1406            .remove_scope(label, property);
1407    }
1408
1409    pub(super) fn point_indexes_read(
1410        &self,
1411        entity: StoredIndexEntity,
1412    ) -> std::sync::RwLockReadGuard<'_, PointRegistry> {
1413        self.indexes.point.read(entity)
1414    }
1415
1416    pub(super) fn point_indexes_write(
1417        &self,
1418        entity: StoredIndexEntity,
1419    ) -> RwLockWriteGuard<'_, PointRegistry> {
1420        self.indexes.point.write(entity)
1421    }
1422
1423    fn activate_point_scope(
1424        &self,
1425        entity: StoredIndexEntity,
1426        label: &str,
1427        property: &str,
1428        cell_size: Option<f64>,
1429    ) {
1430        if !self
1431            .point_indexes_write(entity)
1432            .add_scope(label, property, cell_size)
1433        {
1434            return;
1435        }
1436
1437        let backfill: Vec<(u64, LoraPoint)> = match entity {
1438            StoredIndexEntity::Node => self
1439                .iter_nodes()
1440                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1441                .filter_map(|(id, node)| match node.properties.get(property) {
1442                    Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1443                    _ => None,
1444                })
1445                .collect(),
1446            StoredIndexEntity::Relationship => self
1447                .iter_rels()
1448                .filter(|(_, rel)| rel.rel_type == label)
1449                .filter_map(|(id, rel)| match rel.properties.get(property) {
1450                    Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1451                    _ => None,
1452                })
1453                .collect(),
1454        };
1455
1456        let mut registry = self.point_indexes_write(entity);
1457        for (id, point) in backfill {
1458            registry.insert(label, property, id, point);
1459        }
1460    }
1461
1462    pub(super) fn deactivate_point_scope(
1463        &self,
1464        entity: StoredIndexEntity,
1465        label: &str,
1466        property: &str,
1467    ) {
1468        self.point_indexes_write(entity)
1469            .remove_scope(label, property);
1470    }
1471
1472    pub(super) fn vector_indexes_read(
1473        &self,
1474        entity: StoredIndexEntity,
1475    ) -> std::sync::RwLockReadGuard<'_, VectorIndexRegistry> {
1476        self.indexes.vector.read(entity)
1477    }
1478
1479    pub(super) fn vector_indexes_write(
1480        &self,
1481        entity: StoredIndexEntity,
1482    ) -> RwLockWriteGuard<'_, VectorIndexRegistry> {
1483        self.indexes.vector.write(entity)
1484    }
1485
1486    #[allow(clippy::too_many_arguments)]
1487    fn activate_vector_index(
1488        &self,
1489        entity: StoredIndexEntity,
1490        name: &str,
1491        label: &str,
1492        property: &str,
1493        similarity: VectorSimilarity,
1494        provider: VectorIndexProvider,
1495        hnsw: HnswParams,
1496        lazy: bool,
1497    ) {
1498        {
1499            let mut registry = self.vector_indexes_write(entity);
1500            registry.register(
1501                name.to_string(),
1502                label.to_string(),
1503                property.to_string(),
1504                similarity,
1505                provider,
1506                hnsw,
1507            );
1508        }
1509
1510        if lazy {
1511            // Skip the initial backfill — the catalog state is flipped
1512            // to Populating by the caller and the first query routed
1513            // to this index triggers `lazy_populate_vector_index`.
1514            // Mutations between CREATE and first query still feed the
1515            // registry via the maintenance hook, so the lazy phase
1516            // only handles vectors that existed before CREATE.
1517            return;
1518        }
1519
1520        self.backfill_vector_index(entity, label, property);
1521    }
1522
1523    /// Walk the property store for vectors matching this index's
1524    /// `(label, property)` scope and replay them into the registry.
1525    /// Shared by sync CREATE and lazy-populate flows.
1526    fn backfill_vector_index(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1527        let backfill: Vec<(u64, crate::LoraVector)> = match entity {
1528            StoredIndexEntity::Node => self
1529                .iter_nodes()
1530                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1531                .filter_map(|(id, node)| match node.properties.get(property) {
1532                    Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1533                    _ => None,
1534                })
1535                .collect(),
1536            StoredIndexEntity::Relationship => self
1537                .iter_rels()
1538                .filter(|(_, rel)| rel.rel_type == label)
1539                .filter_map(|(id, rel)| match rel.properties.get(property) {
1540                    Some(PropertyValue::Vector(v)) => Some((id, v.clone())),
1541                    _ => None,
1542                })
1543                .collect(),
1544        };
1545
1546        let mut registry = self.vector_indexes_write(entity);
1547        for (id, vector) in backfill {
1548            registry.insert_for(label, property, id, &vector);
1549        }
1550    }
1551
1552    /// Lazy-populate a `Populating` vector index: backfill from the
1553    /// property store, then flip catalog state to `Online`. Called
1554    /// from `GraphStorage::vector_search` on the first request that
1555    /// hits a still-populating index. Idempotent: a second concurrent
1556    /// caller finds the state already `Online` and does no work.
1557    pub(super) fn lazy_populate_vector_index(&self, name: &str) {
1558        let def = match self.index_catalog_read().get(name).cloned() {
1559            Some(d) => d,
1560            None => return,
1561        };
1562        if def.state != StoredIndexState::Populating || def.kind != StoredIndexKind::Vector {
1563            return;
1564        }
1565        let label = match def.label.as_deref() {
1566            Some(l) => l,
1567            None => return,
1568        };
1569        let property = match def.properties.first() {
1570            Some(p) => p.as_str(),
1571            None => return,
1572        };
1573        self.backfill_vector_index(def.entity, label, property);
1574        self.index_catalog_write()
1575            .set_state(name, StoredIndexState::Online);
1576    }
1577
1578    pub(super) fn deactivate_vector_index(&self, entity: StoredIndexEntity, name: &str) {
1579        self.vector_indexes_write(entity).deregister(name);
1580    }
1581
1582    /// Snapshot of cardinality stats. Cheap: derived from already-tracked
1583    /// `nodes_by_label` / `relationships_by_type` lengths and the active
1584    /// property-index buckets. The cost model uses this to populate
1585    /// `estimated_rows` on plan-tree nodes.
1586    pub fn graph_stats(&self) -> GraphStats {
1587        let mut stats = GraphStats {
1588            node_count: self.live_node_count,
1589            relationship_count: self.live_rel_count,
1590            ..Default::default()
1591        };
1592        for (label, ids) in &self.nodes_by_label {
1593            stats.nodes_by_label.insert(label.clone(), ids.len());
1594        }
1595        for (rel_type, ids) in &self.relationships_by_type {
1596            stats
1597                .relationships_by_type
1598                .insert(rel_type.clone(), ids.len());
1599        }
1600        // Distinct values per (label, property): pulled from the
1601        // property-index scoped buckets, where we already track the
1602        // per-scope value distribution. Empty for properties without
1603        // an active hash-index — the cost model falls back to a
1604        // conservative estimate in that case.
1605        let prop_indexes = self.indexes_read();
1606        for (scope, props) in &prop_indexes.node_properties.scoped_values {
1607            for (key, values) in props {
1608                stats
1609                    .node_distinct_values
1610                    .insert((scope.clone(), key.clone()), values.len());
1611            }
1612        }
1613        for (scope, props) in &prop_indexes.relationship_properties.scoped_values {
1614            for (key, values) in props {
1615                stats
1616                    .relationship_distinct_values
1617                    .insert((scope.clone(), key.clone()), values.len());
1618            }
1619        }
1620
1621        for def in self.index_catalog_read().list() {
1622            if def.state != StoredIndexState::Online {
1623                continue;
1624            }
1625            let Some(label) = def.label else {
1626                continue;
1627            };
1628            for property in def.properties {
1629                let scope = (label.clone(), property);
1630                match (def.entity, def.kind) {
1631                    (StoredIndexEntity::Node, StoredIndexKind::Range) => {
1632                        stats.node_range_indexes.insert(scope);
1633                    }
1634                    (StoredIndexEntity::Node, StoredIndexKind::Text) => {
1635                        stats.node_text_indexes.insert(scope);
1636                    }
1637                    (StoredIndexEntity::Node, StoredIndexKind::Point) => {
1638                        stats.node_point_indexes.insert(scope);
1639                    }
1640                    (StoredIndexEntity::Relationship, StoredIndexKind::Range) => {
1641                        stats.relationship_range_indexes.insert(scope);
1642                    }
1643                    (StoredIndexEntity::Relationship, StoredIndexKind::Text) => {
1644                        stats.relationship_text_indexes.insert(scope);
1645                    }
1646                    (StoredIndexEntity::Relationship, StoredIndexKind::Point) => {
1647                        stats.relationship_point_indexes.insert(scope);
1648                    }
1649                    (StoredIndexEntity::Node, StoredIndexKind::Vector) => {
1650                        stats.node_vector_indexes.insert(scope);
1651                    }
1652                    (StoredIndexEntity::Relationship, StoredIndexKind::Vector) => {
1653                        stats.relationship_vector_indexes.insert(scope);
1654                    }
1655                    (_, StoredIndexKind::Lookup | StoredIndexKind::Fulltext) => {}
1656                }
1657            }
1658        }
1659        stats
1660    }
1661
1662    pub(super) fn rebuild_property_indexes(&mut self) {
1663        let mut indexes = PropertyIndexRegistry::default();
1664
1665        for (id, node) in self.iter_nodes() {
1666            for (key, value) in &node.properties {
1667                if PropertyIndexKey::from_value(value).is_some() {
1668                    indexes.node_properties.activate(key);
1669                    indexes.node_properties.insert_with_scopes(
1670                        id,
1671                        node.labels.iter().map(String::as_str),
1672                        key,
1673                        value,
1674                    );
1675                }
1676            }
1677        }
1678
1679        for (id, rel) in self.iter_rels() {
1680            for (key, value) in &rel.properties {
1681                if PropertyIndexKey::from_value(value).is_some() {
1682                    indexes.relationship_properties.activate(key);
1683                    indexes.relationship_properties.insert_with_scopes(
1684                        id,
1685                        [rel.rel_type.as_str()],
1686                        key,
1687                        value,
1688                    );
1689                }
1690            }
1691        }
1692
1693        let node_index_count = indexes.node_properties.active_keys.len();
1694        let relationship_index_count = indexes.relationship_properties.active_keys.len();
1695        *self.indexes_mut() = indexes;
1696        self.indexes
1697            .active_node_property_indexes
1698            .store(node_index_count, Ordering::Relaxed);
1699        self.indexes
1700            .active_relationship_property_indexes
1701            .store(relationship_index_count, Ordering::Relaxed);
1702    }
1703
1704    pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
1705        for label in &node.labels {
1706            self.insert_node_label_index(node.id, label);
1707        }
1708        self.index_node_properties_if_active(
1709            node.id,
1710            node.labels.iter().map(String::as_str),
1711            &node.properties,
1712        );
1713        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1714    }
1715
1716    pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
1717        for label in &node.labels {
1718            self.insert_node_label_index(node.id, label);
1719        }
1720        self.index_node_properties_eager(
1721            node.id,
1722            node.labels.iter().map(String::as_str),
1723            &node.properties,
1724        );
1725        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1726    }
1727
1728    pub(super) fn on_node_property_set(
1729        &mut self,
1730        node_id: NodeId,
1731        key: &str,
1732        old: Option<&PropertyValue>,
1733        new: &PropertyValue,
1734    ) {
1735        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1736            return;
1737        };
1738
1739        if self.node_property_index_is_active(key) {
1740            if let Some(old) = old {
1741                self.unindex_node_property_if_active(
1742                    node_id,
1743                    labels.iter().map(String::as_str),
1744                    key,
1745                    old,
1746                );
1747            }
1748            self.index_node_property_if_active(
1749                node_id,
1750                labels.iter().map(String::as_str),
1751                key,
1752                new,
1753            );
1754        }
1755
1756        self.update_secondary_property(
1757            StoredIndexEntity::Node,
1758            labels.iter().map(String::as_str),
1759            node_id,
1760            key,
1761            old,
1762            Some(new),
1763        );
1764    }
1765
1766    pub(super) fn on_node_property_removed(
1767        &mut self,
1768        node_id: NodeId,
1769        key: &str,
1770        old: &PropertyValue,
1771    ) {
1772        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1773            return;
1774        };
1775        if self.node_property_index_is_active(key) {
1776            self.unindex_node_property_if_active(
1777                node_id,
1778                labels.iter().map(String::as_str),
1779                key,
1780                old,
1781            );
1782        }
1783        self.update_secondary_property(
1784            StoredIndexEntity::Node,
1785            labels.iter().map(String::as_str),
1786            node_id,
1787            key,
1788            Some(old),
1789            None,
1790        );
1791    }
1792
1793    pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
1794        self.insert_node_label_index(node_id, label);
1795
1796        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1797            return;
1798        };
1799        if self.active_node_property_index_count() != 0 {
1800            self.index_node_scope_properties_if_active(node_id, label, &properties);
1801        }
1802        for (key, value) in &properties {
1803            self.update_secondary_property(
1804                StoredIndexEntity::Node,
1805                [label],
1806                node_id,
1807                key,
1808                None,
1809                Some(value),
1810            );
1811        }
1812    }
1813
1814    pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
1815        self.remove_node_label_index(node_id, label);
1816
1817        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1818            return;
1819        };
1820        if self.active_node_property_index_count() != 0 {
1821            self.unindex_node_scope_properties_if_active(node_id, label, &properties);
1822        }
1823        for (key, value) in &properties {
1824            self.update_secondary_property(
1825                StoredIndexEntity::Node,
1826                [label],
1827                node_id,
1828                key,
1829                Some(value),
1830                None,
1831            );
1832        }
1833    }
1834
1835    pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
1836        for label in &node.labels {
1837            self.remove_node_label_index(node.id, label);
1838        }
1839        self.unindex_active_node_properties(
1840            node.id,
1841            node.labels.iter().map(String::as_str),
1842            &node.properties,
1843        );
1844        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Remove);
1845    }
1846
1847    pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
1848        self.attach_relationship(rel);
1849        self.index_relationship_properties_if_active(
1850            rel.id,
1851            [rel.rel_type.as_str()],
1852            &rel.properties,
1853        );
1854        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1855    }
1856
1857    pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
1858        self.attach_relationship(rel);
1859        self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
1860        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1861    }
1862
1863    pub(super) fn on_relationship_property_set(
1864        &mut self,
1865        rel_id: RelationshipId,
1866        key: &str,
1867        old: Option<&PropertyValue>,
1868        new: &PropertyValue,
1869    ) {
1870        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1871            return;
1872        };
1873
1874        if self.relationship_property_index_is_active(key) {
1875            if let Some(old) = old {
1876                self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1877            }
1878            self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
1879        }
1880
1881        self.update_secondary_property(
1882            StoredIndexEntity::Relationship,
1883            [rel_type.as_str()],
1884            rel_id,
1885            key,
1886            old,
1887            Some(new),
1888        );
1889    }
1890
1891    pub(super) fn on_relationship_property_removed(
1892        &mut self,
1893        rel_id: RelationshipId,
1894        key: &str,
1895        old: &PropertyValue,
1896    ) {
1897        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1898            return;
1899        };
1900        if self.relationship_property_index_is_active(key) {
1901            self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1902        }
1903        self.update_secondary_property(
1904            StoredIndexEntity::Relationship,
1905            [rel_type.as_str()],
1906            rel_id,
1907            key,
1908            Some(old),
1909            None,
1910        );
1911    }
1912
1913    pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
1914        self.detach_relationship_indexes(rel);
1915        self.unindex_active_relationship_properties(
1916            rel.id,
1917            [rel.rel_type.as_str()],
1918            &rel.properties,
1919        );
1920        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Remove);
1921    }
1922
1923    fn index_node_property_eager<'a>(
1924        &mut self,
1925        node_id: NodeId,
1926        labels: impl IntoIterator<Item = &'a str>,
1927        key: &str,
1928        value: &PropertyValue,
1929    ) {
1930        if PropertyIndexKey::from_value(value).is_none() {
1931            return;
1932        }
1933
1934        let activated = {
1935            let indexes = self.indexes_mut();
1936            let activated = indexes.node_properties.activate(key);
1937            indexes
1938                .node_properties
1939                .insert_with_scopes(node_id, labels, key, value);
1940            activated
1941        };
1942        if activated {
1943            self.indexes
1944                .active_node_property_indexes
1945                .fetch_add(1, Ordering::Relaxed);
1946        }
1947    }
1948
1949    fn index_relationship_property_eager<'a>(
1950        &mut self,
1951        rel_id: RelationshipId,
1952        scopes: impl IntoIterator<Item = &'a str>,
1953        key: &str,
1954        value: &PropertyValue,
1955    ) {
1956        if PropertyIndexKey::from_value(value).is_none() {
1957            return;
1958        }
1959
1960        let activated = {
1961            let indexes = self.indexes_mut();
1962            let activated = indexes.relationship_properties.activate(key);
1963            indexes
1964                .relationship_properties
1965                .insert_with_scopes(rel_id, scopes, key, value);
1966            activated
1967        };
1968        if activated {
1969            self.indexes
1970                .active_relationship_property_indexes
1971                .fetch_add(1, Ordering::Relaxed);
1972        }
1973    }
1974
1975    fn index_node_properties_eager<'a>(
1976        &mut self,
1977        node_id: NodeId,
1978        labels: impl IntoIterator<Item = &'a str> + Clone,
1979        properties: &Properties,
1980    ) {
1981        for (key, value) in properties {
1982            self.index_node_property_eager(node_id, labels.clone(), key, value);
1983        }
1984    }
1985
1986    fn index_relationship_properties_eager<'a>(
1987        &mut self,
1988        rel_id: RelationshipId,
1989        scopes: impl IntoIterator<Item = &'a str> + Clone,
1990        properties: &Properties,
1991    ) {
1992        for (key, value) in properties {
1993            self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
1994        }
1995    }
1996
1997    fn index_node_property_if_active<'a>(
1998        &mut self,
1999        node_id: NodeId,
2000        labels: impl IntoIterator<Item = &'a str>,
2001        key: &str,
2002        value: &PropertyValue,
2003    ) {
2004        if self.active_node_property_index_count() == 0 {
2005            return;
2006        }
2007        let indexes = self.indexes_mut();
2008        if indexes.node_properties.is_active(key) {
2009            indexes
2010                .node_properties
2011                .insert_with_scopes(node_id, labels, key, value);
2012        }
2013    }
2014
2015    fn index_node_properties_if_active<'a>(
2016        &mut self,
2017        node_id: NodeId,
2018        labels: impl IntoIterator<Item = &'a str> + Clone,
2019        properties: &Properties,
2020    ) {
2021        if self.active_node_property_index_count() == 0 {
2022            return;
2023        }
2024        let indexes = self.indexes_mut();
2025        for (key, value) in properties {
2026            if indexes.node_properties.is_active(key) {
2027                indexes
2028                    .node_properties
2029                    .insert_with_scopes(node_id, labels.clone(), key, value);
2030            }
2031        }
2032    }
2033
2034    fn unindex_node_property_if_active<'a>(
2035        &mut self,
2036        node_id: NodeId,
2037        labels: impl IntoIterator<Item = &'a str>,
2038        key: &str,
2039        value: &PropertyValue,
2040    ) {
2041        if self.active_node_property_index_count() == 0 {
2042            return;
2043        }
2044        let indexes = self.indexes_mut();
2045        if indexes.node_properties.is_active(key) {
2046            indexes
2047                .node_properties
2048                .remove_with_scopes(node_id, labels, key, value);
2049        }
2050    }
2051
2052    fn index_node_scope_properties_if_active(
2053        &mut self,
2054        node_id: NodeId,
2055        scope: &str,
2056        properties: &Properties,
2057    ) {
2058        if self.active_node_property_index_count() == 0 {
2059            return;
2060        }
2061        let indexes = self.indexes_mut();
2062        for (key, value) in properties {
2063            if indexes.node_properties.is_active(key) {
2064                indexes
2065                    .node_properties
2066                    .insert_scoped(node_id, scope, key, value);
2067            }
2068        }
2069    }
2070
2071    fn unindex_node_scope_properties_if_active(
2072        &mut self,
2073        node_id: NodeId,
2074        scope: &str,
2075        properties: &Properties,
2076    ) {
2077        if self.active_node_property_index_count() == 0 {
2078            return;
2079        }
2080        let indexes = self.indexes_mut();
2081        for (key, value) in properties {
2082            if indexes.node_properties.is_active(key) {
2083                indexes
2084                    .node_properties
2085                    .remove_scoped(node_id, scope, key, value);
2086            }
2087        }
2088    }
2089
2090    fn unindex_active_node_properties<'a>(
2091        &mut self,
2092        node_id: NodeId,
2093        labels: impl IntoIterator<Item = &'a str> + Clone,
2094        properties: &Properties,
2095    ) {
2096        if self.active_node_property_index_count() == 0 {
2097            return;
2098        }
2099        let indexes = self.indexes_mut();
2100        for (key, value) in properties {
2101            if indexes.node_properties.is_active(key) {
2102                indexes
2103                    .node_properties
2104                    .remove_with_scopes(node_id, labels.clone(), key, value);
2105            }
2106        }
2107    }
2108
2109    fn index_relationship_property_if_active<'a>(
2110        &mut self,
2111        rel_id: RelationshipId,
2112        scopes: impl IntoIterator<Item = &'a str>,
2113        key: &str,
2114        value: &PropertyValue,
2115    ) {
2116        if self.active_relationship_property_index_count() == 0 {
2117            return;
2118        }
2119        let indexes = self.indexes_mut();
2120        if indexes.relationship_properties.is_active(key) {
2121            indexes
2122                .relationship_properties
2123                .insert_with_scopes(rel_id, scopes, key, value);
2124        }
2125    }
2126
2127    fn index_relationship_properties_if_active<'a>(
2128        &mut self,
2129        rel_id: RelationshipId,
2130        scopes: impl IntoIterator<Item = &'a str> + Clone,
2131        properties: &Properties,
2132    ) {
2133        if self.active_relationship_property_index_count() == 0 {
2134            return;
2135        }
2136        let indexes = self.indexes_mut();
2137        for (key, value) in properties {
2138            if indexes.relationship_properties.is_active(key) {
2139                indexes.relationship_properties.insert_with_scopes(
2140                    rel_id,
2141                    scopes.clone(),
2142                    key,
2143                    value,
2144                );
2145            }
2146        }
2147    }
2148
2149    fn unindex_relationship_property_if_active<'a>(
2150        &mut self,
2151        rel_id: RelationshipId,
2152        scopes: impl IntoIterator<Item = &'a str>,
2153        key: &str,
2154        value: &PropertyValue,
2155    ) {
2156        if self.active_relationship_property_index_count() == 0 {
2157            return;
2158        }
2159        let indexes = self.indexes_mut();
2160        if indexes.relationship_properties.is_active(key) {
2161            indexes
2162                .relationship_properties
2163                .remove_with_scopes(rel_id, scopes, key, value);
2164        }
2165    }
2166
2167    fn unindex_active_relationship_properties<'a>(
2168        &mut self,
2169        rel_id: RelationshipId,
2170        scopes: impl IntoIterator<Item = &'a str> + Clone,
2171        properties: &Properties,
2172    ) {
2173        if self.active_relationship_property_index_count() == 0 {
2174            return;
2175        }
2176        let indexes = self.indexes_mut();
2177        for (key, value) in properties {
2178            if indexes.relationship_properties.is_active(key) {
2179                indexes.relationship_properties.remove_with_scopes(
2180                    rel_id,
2181                    scopes.clone(),
2182                    key,
2183                    value,
2184                );
2185            }
2186        }
2187    }
2188
2189    pub(super) fn scan_nodes_by_property(
2190        &self,
2191        label: Option<&str>,
2192        key: &str,
2193        value: &PropertyValue,
2194    ) -> Vec<NodeRecord> {
2195        match label {
2196            Some(label) => self
2197                .nodes_by_label
2198                .get(label)
2199                .into_iter()
2200                .flat_map(|ids| ids.iter())
2201                .filter_map(|&id| self.node_at(id))
2202                .filter(|node| node.properties.get(key) == Some(value))
2203                .cloned()
2204                .collect(),
2205            None => self
2206                .iter_node_records()
2207                .filter(|node| node.properties.get(key) == Some(value))
2208                .cloned()
2209                .collect(),
2210        }
2211    }
2212
2213    pub(super) fn scan_node_ids_by_property(
2214        &self,
2215        label: Option<&str>,
2216        key: &str,
2217        value: &PropertyValue,
2218    ) -> Vec<NodeId> {
2219        match label {
2220            Some(label) => self
2221                .nodes_by_label
2222                .get(label)
2223                .into_iter()
2224                .flat_map(|ids| ids.iter())
2225                .filter_map(|&id| {
2226                    (self.node_at(id)?.properties.get(key) == Some(value)).then_some(id)
2227                })
2228                .collect(),
2229            None => self
2230                .iter_nodes()
2231                .filter_map(|(id, node)| (node.properties.get(key) == Some(value)).then_some(id))
2232                .collect(),
2233        }
2234    }
2235
2236    pub(super) fn any_node_by_property(
2237        &self,
2238        label: &str,
2239        key: &str,
2240        value: &PropertyValue,
2241    ) -> bool {
2242        self.nodes_by_label
2243            .get(label)
2244            .into_iter()
2245            .flat_map(|ids| ids.iter())
2246            .filter_map(|&id| self.node_at(id))
2247            .any(|node| node.properties.get(key) == Some(value))
2248    }
2249
2250    pub(super) fn scan_relationships_by_property(
2251        &self,
2252        rel_type: Option<&str>,
2253        key: &str,
2254        value: &PropertyValue,
2255    ) -> Vec<RelationshipRecord> {
2256        match rel_type {
2257            Some(rel_type) => self
2258                .relationships_by_type
2259                .get(rel_type)
2260                .into_iter()
2261                .flat_map(|ids| ids.iter())
2262                .filter_map(|&id| self.rel_at(id))
2263                .filter(|rel| rel.properties.get(key) == Some(value))
2264                .cloned()
2265                .collect(),
2266            None => self
2267                .iter_rel_records()
2268                .filter(|rel| rel.properties.get(key) == Some(value))
2269                .cloned()
2270                .collect(),
2271        }
2272    }
2273
2274    pub(super) fn scan_relationship_ids_by_property(
2275        &self,
2276        rel_type: Option<&str>,
2277        key: &str,
2278        value: &PropertyValue,
2279    ) -> Vec<RelationshipId> {
2280        match rel_type {
2281            Some(rel_type) => self
2282                .relationships_by_type
2283                .get(rel_type)
2284                .into_iter()
2285                .flat_map(|ids| ids.iter())
2286                .filter_map(|&id| {
2287                    (self.rel_at(id)?.properties.get(key) == Some(value)).then_some(id)
2288                })
2289                .collect(),
2290            None => self
2291                .iter_rels()
2292                .filter_map(|(id, rel)| (rel.properties.get(key) == Some(value)).then_some(id))
2293                .collect(),
2294        }
2295    }
2296
2297    pub(super) fn any_relationship_by_property(
2298        &self,
2299        rel_type: &str,
2300        key: &str,
2301        value: &PropertyValue,
2302    ) -> bool {
2303        self.relationships_by_type
2304            .get(rel_type)
2305            .into_iter()
2306            .flat_map(|ids| ids.iter())
2307            .filter_map(|&id| self.rel_at(id))
2308            .any(|rel| rel.properties.get(key) == Some(value))
2309    }
2310
2311    pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
2312        self.outgoing_push(rel.src, rel.id);
2313        self.incoming_push(rel.dst, rel.id);
2314        self.insert_relationship_type_index(rel.id, &rel.rel_type);
2315    }
2316
2317    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
2318        // Adjacency is now positional `Vec<Vec<RelationshipId>>` — clearing
2319        // the inner Vec leaves the slot in place (the slot is sized for the
2320        // node's lifetime, not the edge's).
2321        self.outgoing_remove(rel.src, rel.id);
2322        self.incoming_remove(rel.dst, rel.id);
2323
2324        self.remove_relationship_type_index(rel.id, &rel.rel_type);
2325    }
2326
2327    pub(super) fn relationship_ids_for_direction(
2328        &self,
2329        node_id: NodeId,
2330        direction: Direction,
2331    ) -> Vec<RelationshipId> {
2332        match direction {
2333            Direction::Left => self
2334                .incoming_at(node_id)
2335                .map(<[_]>::to_vec)
2336                .unwrap_or_default(),
2337
2338            Direction::Right => self
2339                .outgoing_at(node_id)
2340                .map(<[_]>::to_vec)
2341                .unwrap_or_default(),
2342
2343            Direction::Undirected => {
2344                let out = self.outgoing_at(node_id);
2345                let inc = self.incoming_at(node_id);
2346                let mut ids = Vec::with_capacity(
2347                    out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0),
2348                );
2349
2350                if let Some(out) = out {
2351                    ids.extend(out.iter().copied());
2352                }
2353                if let Some(inc) = inc {
2354                    for &rel_id in inc {
2355                        let Some(rel) = self.rel_at(rel_id) else {
2356                            continue;
2357                        };
2358                        if rel.src == node_id && rel.dst == node_id {
2359                            continue;
2360                        }
2361                        ids.push(rel_id);
2362                    }
2363                }
2364
2365                ids
2366            }
2367        }
2368    }
2369
2370    pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
2371        if rel.src == node_id {
2372            Some(rel.dst)
2373        } else if rel.dst == node_id {
2374            Some(rel.src)
2375        } else {
2376            None
2377        }
2378    }
2379
2380    pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
2381        self.outgoing_at(node_id)
2382            .map(|ids| !ids.is_empty())
2383            .unwrap_or(false)
2384            || self
2385                .incoming_at(node_id)
2386                .map(|ids| !ids.is_empty())
2387                .unwrap_or(false)
2388    }
2389
2390    pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> Vec<RelationshipId> {
2391        let out = self.outgoing_at(node_id);
2392        let inc = self.incoming_at(node_id);
2393        let mut rel_ids =
2394            Vec::with_capacity(out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0));
2395
2396        if let Some(ids) = out {
2397            rel_ids.extend(ids.iter().copied());
2398        }
2399        if let Some(ids) = inc {
2400            for &rel_id in ids {
2401                let Some(rel) = self.rel_at(rel_id) else {
2402                    continue;
2403                };
2404                if rel.src == node_id && rel.dst == node_id {
2405                    continue;
2406                }
2407                rel_ids.push(rel_id);
2408            }
2409        }
2410
2411        rel_ids
2412    }
2413
2414    /// Replay a node creation using the id captured in a durable mutation
2415    /// event. This intentionally does not emit a new mutation event: callers
2416    /// must invoke it before installing a recorder on the graph.
2417    #[doc(hidden)]
2418    pub fn replay_create_node(
2419        &mut self,
2420        id: NodeId,
2421        labels: Vec<String>,
2422        properties: Properties,
2423    ) -> Result<NodeRecord, String> {
2424        if self.recorder.is_some() {
2425            return Err(
2426                "cannot replay node creation while a mutation recorder is installed".into(),
2427            );
2428        }
2429        if self.node_at(id).is_some() {
2430            return Err(format!("node id {id} already exists"));
2431        }
2432        let idx = self.ensure_node_slot_checked(id)?;
2433        self.bump_next_node_id_past(id)?;
2434
2435        let labels = Self::normalize_labels(labels);
2436        let node = NodeRecord {
2437            id,
2438            labels: labels.clone(),
2439            properties,
2440        };
2441
2442        self.put_node_at_slot(idx, node.clone());
2443        self.on_node_replayed(&node);
2444
2445        Ok(node)
2446    }
2447
2448    /// Replay a relationship creation using the id captured in a durable
2449    /// mutation event. This intentionally does not emit a new mutation event:
2450    /// callers must invoke it before installing a recorder on the graph.
2451    #[doc(hidden)]
2452    pub fn replay_create_relationship(
2453        &mut self,
2454        id: RelationshipId,
2455        src: NodeId,
2456        dst: NodeId,
2457        rel_type: &str,
2458        properties: Properties,
2459    ) -> Result<RelationshipRecord, String> {
2460        if self.recorder.is_some() {
2461            return Err(
2462                "cannot replay relationship creation while a mutation recorder is installed".into(),
2463            );
2464        }
2465        if self.rel_at(id).is_some() {
2466            return Err(format!("relationship id {id} already exists"));
2467        }
2468        if self.node_at(src).is_none() {
2469            return Err(format!(
2470                "relationship {id} references missing source node {src}"
2471            ));
2472        }
2473        if self.node_at(dst).is_none() {
2474            return Err(format!(
2475                "relationship {id} references missing target node {dst}"
2476            ));
2477        }
2478
2479        let trimmed = rel_type.trim();
2480        if trimmed.is_empty() {
2481            return Err(format!("relationship {id} has an empty type"));
2482        }
2483        let idx = self.ensure_rel_slot_checked(id)?;
2484        self.bump_next_rel_id_past(id)?;
2485
2486        let rel = RelationshipRecord {
2487            id,
2488            src,
2489            dst,
2490            rel_type: trimmed.to_string(),
2491            properties,
2492        };
2493
2494        self.put_rel_at_slot(idx, rel.clone());
2495        self.on_relationship_replayed(&rel);
2496
2497        Ok(rel)
2498    }
2499
2500    #[cfg(test)]
2501    pub(super) fn assert_property_indexes_match_scan(&self) {
2502        let indexes = self.indexes_read();
2503        assert_eq!(
2504            indexes.node_properties.active_keys.len(),
2505            self.active_node_property_index_count(),
2506            "node property index counter diverged from active key set"
2507        );
2508        assert_eq!(
2509            indexes.relationship_properties.active_keys.len(),
2510            self.active_relationship_property_index_count(),
2511            "relationship property index counter diverged from active key set"
2512        );
2513
2514        let mut expected_nodes = PropertyIndexState {
2515            active_keys: indexes.node_properties.active_keys.clone(),
2516            ..PropertyIndexState::default()
2517        };
2518        for (id, node) in self.iter_nodes() {
2519            for (key, value) in &node.properties {
2520                if expected_nodes.is_active(key) {
2521                    expected_nodes.insert_with_scopes(
2522                        id,
2523                        node.labels.iter().map(String::as_str),
2524                        key,
2525                        value,
2526                    );
2527                }
2528            }
2529        }
2530        assert_eq!(
2531            indexes.node_properties.values, expected_nodes.values,
2532            "node property index values diverged from scan"
2533        );
2534        assert_eq!(
2535            indexes.node_properties.scoped_values, expected_nodes.scoped_values,
2536            "node property scoped index values diverged from scan"
2537        );
2538
2539        let mut expected_relationships = PropertyIndexState {
2540            active_keys: indexes.relationship_properties.active_keys.clone(),
2541            ..PropertyIndexState::default()
2542        };
2543        for (id, rel) in self.iter_rels() {
2544            for (key, value) in &rel.properties {
2545                if expected_relationships.is_active(key) {
2546                    expected_relationships.insert_with_scopes(
2547                        id,
2548                        [rel.rel_type.as_str()],
2549                        key,
2550                        value,
2551                    );
2552                }
2553            }
2554        }
2555        assert_eq!(
2556            indexes.relationship_properties.values, expected_relationships.values,
2557            "relationship property index values diverged from scan"
2558        );
2559        assert_eq!(
2560            indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
2561            "relationship property scoped index values diverged from scan"
2562        );
2563    }
2564}