Skip to main content

lora_store/memory/
graph.rs

1//! The [`InMemoryGraph`] data structure: slot-indexed node/relationship
2//! storage, adjacency lists, label/type indexes, and the inherent
3//! helpers that the trait impls in `super::impls` delegate to.
4
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12    LoraPoint, MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue,
13    RelationshipId, RelationshipRecord,
14};
15
16use super::constraint_catalog::{
17    ConstraintCatalog, ConstraintRequest, CreateConstraintError, CreateConstraintOutcome,
18    DropConstraintError, DropConstraintOutcome,
19};
20use super::entity_index_store::IndexBundle;
21use super::fulltext_index::FulltextRegistry;
22use super::index_catalog::IndexConfigValue;
23use super::index_catalog::{
24    CreateIndexError, CreateIndexOutcome, DropIndexError, DropIndexOutcome, IndexCatalog,
25    IndexDefinition, IndexRequest, StoredIndexEntity, StoredIndexKind, StoredIndexState,
26};
27use super::point_index::PointRegistry;
28#[cfg(test)]
29use super::property_index::PropertyIndexState;
30use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
31use super::secondary_index_maintenance::SecondaryIndexMutation;
32use super::sorted_property_index::SortedPropertyIndex;
33use super::stats::GraphStats;
34use super::text_index::TrigramRegistry;
35
36#[derive(Default)]
37pub struct InMemoryGraph {
38    pub(super) next_node_id: NodeId,
39    pub(super) next_rel_id: RelationshipId,
40
41    /// Slot-indexed node storage: `nodes[id as usize]` is the record at `id`.
42    /// `None` slots are tombstones from deletes (we don't compact). Because
43    /// `next_node_id` is monotonic the slot at `id` is initialized exactly
44    /// when `id < next_node_id` — same identity guarantee the previous
45    /// `BTreeMap<NodeId, NodeRecord>` had, just with O(1) lookup and
46    /// cache-coherent layout.
47    ///
48    /// Records are wrapped in `Arc` so [`Self::clone`] (called on every
49    /// auto-commit write to build a working copy) is `O(N)` atomic
50    /// refcount bumps instead of `O(N)` deep record clones — for a
51    /// 100k-node graph the difference is microseconds vs. tens of
52    /// milliseconds. Mutating a record uses `Arc::make_mut`, which
53    /// clones in place when the refcount is 1 (no concurrent reader)
54    /// and falls back to a single-record clone-on-write when readers
55    /// still hold a snapshot.
56    pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
57    pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
58    /// Live (non-tombstoned) counts kept in sync with `put_*`/`take_*` so
59    /// `node_count` / `relationship_count` stay O(1) — without a counter
60    /// they'd have to scan the slab.
61    pub(super) live_node_count: usize,
62    pub(super) live_rel_count: usize,
63
64    /// Adjacency keyed by NodeId. `outgoing[id]` is the list of relationship
65    /// ids that leave `id`; mirrored on `incoming[id]`. Inner `Vec` instead
66    /// of `BTreeSet` because edges are inserted exactly once and traversal
67    /// only needs sequential iteration; the cache-friendly contiguous layout
68    /// shows up on every traversal hop.
69    pub(super) outgoing: Vec<Vec<RelationshipId>>,
70    pub(super) incoming: Vec<Vec<RelationshipId>>,
71
72    // secondary indexes
73    /// Label -> the (unique, monotonic) node ids that carry it. The inner
74    /// `Vec` instead of `BTreeSet` because every node id is inserted at most
75    /// once per label (no dedup needed) and every consumer iterates the
76    /// whole list anyway — contiguous storage iterates faster than a
77    /// tree-of-pointers, and removes via `swap_remove` stay O(degree-of-label).
78    pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
79    pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
80
81    /// All index machinery — the declared-index catalog, hash-bucket
82    /// property registry, and the per-entity-kind secondary index
83    /// registries (text, sorted, point, fulltext) plus their active
84    /// counters — collapsed into one bundle. See [`IndexBundle`] for
85    /// the rationale. The bundle is a packaging-only abstraction:
86    /// every field accessed through `self.indexes.<x>` lives at the
87    /// same address it would have as a top-level field.
88    pub(super) indexes: IndexBundle,
89
90    /// Catalog of explicitly-created constraints (CREATE CONSTRAINT).
91    /// Deliberately not part of [`IndexBundle`] — constraints describe
92    /// data invariants, not indexed access. The fact that uniqueness /
93    /// key constraints back range indexes is handled in the
94    /// constraint code path, not by the bundle's layout.
95    pub(super) constraint_catalog: RwLock<ConstraintCatalog>,
96    /// Fast-path counter for mutation-time constraint checks. Most
97    /// workloads have no constraints installed; this lets the executor
98    /// skip taking the catalog lock in that case.
99    pub(super) active_constraints: AtomicUsize,
100
101    /// Optional mutation observer. When `Some`, every committed mutation
102    /// fans out to this recorder *after* the in-memory state has been
103    /// updated. The recorder is not part of the graph's identity, so Clone
104    /// and snapshot restore both reset it to `None`.
105    pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
106}
107
108impl std::fmt::Debug for InMemoryGraph {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        f.debug_struct("InMemoryGraph")
111            .field("next_node_id", &self.next_node_id)
112            .field("next_rel_id", &self.next_rel_id)
113            .field("nodes", &self.nodes)
114            .field("relationships", &self.relationships)
115            .field("outgoing", &self.outgoing)
116            .field("incoming", &self.incoming)
117            .field("nodes_by_label", &self.nodes_by_label)
118            .field("relationships_by_type", &self.relationships_by_type)
119            .field("indexes", &self.indexes)
120            .field(
121                "active_node_property_indexes",
122                &self.active_node_property_index_count(),
123            )
124            .field(
125                "active_relationship_property_indexes",
126                &self.active_relationship_property_index_count(),
127            )
128            .field(
129                "index_catalog_entries",
130                &self
131                    .indexes
132                    .catalog
133                    .read()
134                    .map(|c| c.list().len())
135                    .unwrap_or(0),
136            )
137            .field("active_constraints", &self.active_constraint_count())
138            .field(
139                "active_fulltext_indexes",
140                &self.active_fulltext_index_count(),
141            )
142            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
143            .finish()
144    }
145}
146
147impl Clone for InMemoryGraph {
148    fn clone(&self) -> Self {
149        // Deliberately drop the recorder on clone: a cloned store is a
150        // separate identity; it should not silently share the observer.
151        Self {
152            next_node_id: self.next_node_id,
153            next_rel_id: self.next_rel_id,
154            nodes: self.nodes.clone(),
155            relationships: self.relationships.clone(),
156            live_node_count: self.live_node_count,
157            live_rel_count: self.live_rel_count,
158            outgoing: self.outgoing.clone(),
159            incoming: self.incoming.clone(),
160            nodes_by_label: self.nodes_by_label.clone(),
161            relationships_by_type: self.relationships_by_type.clone(),
162            // IndexBundle::clone deep-copies every owned registry under
163            // its locks, mirroring what the old per-field clones did.
164            // The hash-bucket registry skip-on-empty optimisation is
165            // preserved: `PropertyIndexRegistry::clone` itself is cheap
166            // when no entries exist.
167            indexes: self.indexes.clone(),
168            constraint_catalog: RwLock::new(self.constraint_catalog_read().clone()),
169            active_constraints: AtomicUsize::new(self.active_constraint_count()),
170            recorder: None,
171        }
172    }
173}
174
175impl InMemoryGraph {
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    pub fn with_capacity_hint(nodes: usize, relationships: usize) -> Self {
181        Self {
182            nodes: Vec::with_capacity(nodes),
183            relationships: Vec::with_capacity(relationships),
184            outgoing: Vec::with_capacity(nodes),
185            incoming: Vec::with_capacity(nodes),
186            ..Self::default()
187        }
188    }
189
190    pub fn contains_node(&self, node_id: NodeId) -> bool {
191        self.node_at(node_id).is_some()
192    }
193
194    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
195        self.rel_at(rel_id).is_some()
196    }
197
198    /// Install (or clear) the mutation recorder. Passing `None` detaches any
199    /// currently-installed recorder. The recorder observes every committed
200    /// mutation *after* it has been applied.
201    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
202        self.recorder = recorder;
203    }
204
205    /// Handle to the currently-installed recorder, if any.
206    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
207        self.recorder.as_ref()
208    }
209
210    /// Emit a mutation event only if a recorder is installed. The event is
211    /// built lazily — callers pass a closure, so when no recorder is
212    /// attached we pay only a `None` check and the cost of constructing the
213    /// event (labels/properties clones) is avoided.
214    #[inline]
215    pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
216        if let Some(rec) = &self.recorder {
217            rec.record(build());
218        }
219    }
220
221    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
222        let next = id
223            .checked_add(1)
224            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
225        self.next_node_id = self.next_node_id.max(next);
226        Ok(())
227    }
228
229    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
230        let next = id
231            .checked_add(1)
232            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
233        self.next_rel_id = self.next_rel_id.max(next);
234        Ok(())
235    }
236
237    pub(super) fn reserve_next_node_slot(&mut self) -> (NodeId, usize) {
238        let id = self.next_node_id;
239        let idx = self
240            .ensure_node_slot_checked(id)
241            .expect("next node id should fit in memory-backed slab");
242        self.bump_next_node_id_past(id)
243            .expect("next node id should leave a valid successor");
244        (id, idx)
245    }
246
247    pub(super) fn try_reserve_next_rel_slot(&mut self) -> Option<(RelationshipId, usize)> {
248        let id = self.next_rel_id;
249        let idx = self.ensure_rel_slot_checked(id).ok()?;
250        self.bump_next_rel_id_past(id).ok()?;
251        Some((id, idx))
252    }
253
254    // ---------- Slab access helpers ----------
255    //
256    // Stand-in for the BTreeMap API the previous storage used. They keep the
257    // call sites readable while the underlying layout is positional Vec.
258
259    #[inline]
260    pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
261        self.nodes
262            .get(Self::slot_index(id)?)
263            .and_then(|s| s.as_ref())
264            .map(|arc| arc.as_ref())
265    }
266
267    /// Mutable handle to a node record, doing copy-on-write only when the
268    /// `Arc` is shared with a concurrent reader. With no readers (the
269    /// common case after a fresh write_store clone), `Arc::make_mut`
270    /// upgrades in place — no record clone.
271    #[inline]
272    pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
273        self.nodes
274            .get_mut(Self::slot_index(id)?)
275            .and_then(|s| s.as_mut())
276            .map(Arc::make_mut)
277    }
278
279    #[inline]
280    pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
281        self.relationships
282            .get(Self::slot_index(id)?)
283            .and_then(|s| s.as_ref())
284            .map(|arc| arc.as_ref())
285    }
286
287    #[inline]
288    pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
289        self.relationships
290            .get_mut(Self::slot_index(id)?)
291            .and_then(|s| s.as_mut())
292            .map(Arc::make_mut)
293    }
294
295    /// Resize the node-keyed Vecs so `id as usize` is in range. Adjacency
296    /// lists are kept in lockstep with `nodes`, so a freshly-grown slot has
297    /// empty outgoing/incoming Vecs ready to receive edges.
298    fn slot_len_for_id(id: u64, kind: &str) -> Result<usize, String> {
299        let idx = usize::try_from(id)
300            .map_err(|_| format!("{kind} id {id} does not fit in usize on this platform"))?;
301        idx.checked_add(1)
302            .ok_or_else(|| format!("{kind} id {id} leaves no valid slab slot"))
303    }
304
305    #[inline]
306    fn slot_index(id: u64) -> Option<usize> {
307        usize::try_from(id).ok()
308    }
309
310    fn ensure_node_slot_checked(&mut self, id: NodeId) -> Result<usize, String> {
311        let target = Self::slot_len_for_id(id, "node")?;
312        if self.nodes.len() < target {
313            let additional = target - self.nodes.len();
314            self.nodes.try_reserve_exact(additional).map_err(|e| {
315                format!("node id {id} requires {target} slots, but allocation failed: {e}")
316            })?;
317            self.outgoing.try_reserve_exact(additional).map_err(|e| {
318                format!(
319                    "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
320                )
321            })?;
322            self.incoming.try_reserve_exact(additional).map_err(|e| {
323                format!(
324                    "node id {id} requires {target} adjacency slots, but allocation failed: {e}"
325                )
326            })?;
327            self.nodes.resize_with(target, || None);
328            self.outgoing.resize_with(target, Vec::new);
329            self.incoming.resize_with(target, Vec::new);
330        }
331        Ok(target - 1)
332    }
333
334    fn ensure_rel_slot_checked(&mut self, id: RelationshipId) -> Result<usize, String> {
335        let target = Self::slot_len_for_id(id, "relationship")?;
336        if self.relationships.len() < target {
337            self.relationships
338                .try_reserve_exact(target - self.relationships.len())
339                .map_err(|e| {
340                    format!(
341                        "relationship id {id} requires {target} slots, but allocation failed: {e}"
342                    )
343                })?;
344            self.relationships.resize_with(target, || None);
345        }
346        Ok(target - 1)
347    }
348
349    fn ensure_node_slot(&mut self, id: NodeId) -> usize {
350        self.ensure_node_slot_checked(id)
351            .expect("node id should fit in memory-backed slab")
352    }
353
354    pub(super) fn put_node_checked(&mut self, id: NodeId, node: NodeRecord) -> Result<(), String> {
355        let idx = self.ensure_node_slot_checked(id)?;
356        self.put_node_at_slot(idx, node);
357        Ok(())
358    }
359
360    pub(super) fn put_rel_checked(
361        &mut self,
362        id: RelationshipId,
363        rel: RelationshipRecord,
364    ) -> Result<(), String> {
365        let idx = self.ensure_rel_slot_checked(id)?;
366        self.put_rel_at_slot(idx, rel);
367        Ok(())
368    }
369
370    pub(super) fn put_node_at_slot(&mut self, idx: usize, node: NodeRecord) {
371        let was_present = self.nodes[idx].is_some();
372        self.nodes[idx] = Some(Arc::new(node));
373        if !was_present {
374            self.live_node_count += 1;
375        }
376    }
377
378    pub(super) fn put_rel_at_slot(&mut self, idx: usize, rel: RelationshipRecord) {
379        let was_present = self.relationships[idx].is_some();
380        self.relationships[idx] = Some(Arc::new(rel));
381        if !was_present {
382            self.live_rel_count += 1;
383        }
384    }
385
386    pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
387        let idx = Self::slot_index(id)?;
388        let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
389        if removed.is_some() {
390            self.live_node_count -= 1;
391            // Also clear the per-id adjacency entries so the memory is reclaimed
392            // on the typical "delete every node" pattern. We deliberately do not
393            // shrink the outer Vec — leaving the slot lets new ids reuse the
394            // same index without growth churn (and `next_node_id` is monotonic
395            // anyway, so no immediate reuse).
396            if let Some(out) = self.outgoing.get_mut(idx) {
397                out.clear();
398            }
399            if let Some(inc) = self.incoming.get_mut(idx) {
400                inc.clear();
401            }
402        }
403        // Unwrap the Arc — `try_unwrap` returns the inner `NodeRecord`
404        // without cloning when our slab held the only reference, falling
405        // back to a clone only when concurrent readers still hold a
406        // snapshot Arc.
407        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
408    }
409
410    pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
411        let idx = Self::slot_index(id)?;
412        let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
413        if removed.is_some() {
414            self.live_rel_count -= 1;
415        }
416        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
417    }
418
419    #[inline]
420    pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
421        self.outgoing.get(Self::slot_index(id)?).map(Vec::as_slice)
422    }
423
424    #[inline]
425    pub(super) fn incoming_at(&self, id: NodeId) -> Option<&[RelationshipId]> {
426        self.incoming.get(Self::slot_index(id)?).map(Vec::as_slice)
427    }
428
429    #[inline]
430    fn try_for_each_adjacent_slice<F, E>(
431        &self,
432        node_id: NodeId,
433        types: &[String],
434        adj: &[RelationshipId],
435        skip_self_loops: bool,
436        visit: &mut F,
437    ) -> Result<(), E>
438    where
439        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
440    {
441        let single_type = match types {
442            [single] => Some(single.as_str()),
443            _ => None,
444        };
445        let has_type_filter = !types.is_empty();
446
447        for &rel_id in adj {
448            let Some(rel) = self.rel_at(rel_id) else {
449                continue;
450            };
451            if skip_self_loops && rel.src == node_id && rel.dst == node_id {
452                continue;
453            }
454            if let Some(single) = single_type {
455                if rel.rel_type != single {
456                    continue;
457                }
458            } else if has_type_filter && !types.iter().any(|t| t == &rel.rel_type) {
459                continue;
460            }
461            let Some(other_id) = Self::other_endpoint(rel, node_id) else {
462                continue;
463            };
464            visit(rel_id, other_id)?;
465        }
466        Ok(())
467    }
468
469    #[inline]
470    pub(super) fn try_for_each_adjacent_id_unchecked<F, E>(
471        &self,
472        node_id: NodeId,
473        direction: Direction,
474        types: &[String],
475        mut visit: F,
476    ) -> Result<(), E>
477    where
478        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
479    {
480        match direction {
481            Direction::Right => {
482                if let Some(adj) = self.outgoing_at(node_id) {
483                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
484                }
485            }
486            Direction::Left => {
487                if let Some(adj) = self.incoming_at(node_id) {
488                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
489                }
490            }
491            Direction::Undirected => {
492                if let Some(adj) = self.outgoing_at(node_id) {
493                    self.try_for_each_adjacent_slice(node_id, types, adj, false, &mut visit)?;
494                }
495                if let Some(adj) = self.incoming_at(node_id) {
496                    self.try_for_each_adjacent_slice(node_id, types, adj, true, &mut visit)?;
497                }
498            }
499        }
500
501        Ok(())
502    }
503
504    #[inline]
505    pub(super) fn try_for_each_adjacent_id<F, E>(
506        &self,
507        node_id: NodeId,
508        direction: Direction,
509        types: &[String],
510        visit: F,
511    ) -> Result<(), E>
512    where
513        F: FnMut(RelationshipId, NodeId) -> Result<(), E>,
514    {
515        if self.node_at(node_id).is_none() {
516            return Ok(());
517        }
518        self.try_for_each_adjacent_id_unchecked(node_id, direction, types, visit)
519    }
520
521    pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
522        self.nodes
523            .iter()
524            .enumerate()
525            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
526    }
527
528    pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
529        self.nodes
530            .iter()
531            .filter_map(|s| s.as_ref())
532            .map(|arc| arc.as_ref())
533    }
534
535    pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
536        self.relationships
537            .iter()
538            .enumerate()
539            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
540    }
541
542    pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
543        self.relationships
544            .iter()
545            .filter_map(|s| s.as_ref())
546            .map(|arc| arc.as_ref())
547    }
548
549    pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
550        self.nodes
551            .iter()
552            .enumerate()
553            .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
554    }
555
556    pub(super) fn iter_rels(
557        &self,
558    ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
559        self.relationships
560            .iter()
561            .enumerate()
562            .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
563    }
564
565    /// Add `rel_id` to `node_id`'s outgoing list. Relies on the monotonic-id
566    /// invariant: relationship ids are allocated once and never re-used, so
567    /// the bucket can never see a duplicate.
568    fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
569        let idx = self.ensure_node_slot(node_id);
570        self.outgoing[idx].push(rel_id);
571    }
572
573    fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
574        let idx = self.ensure_node_slot(node_id);
575        self.incoming[idx].push(rel_id);
576    }
577
578    /// Remove `rel_id` from `node_id`'s outgoing list. `swap_remove` keeps
579    /// the operation O(1) — adjacency order doesn't carry semantic meaning.
580    fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
581        if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.outgoing.get_mut(idx)) {
582            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
583                v.swap_remove(pos);
584            }
585        }
586    }
587
588    fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
589        if let Some(v) = Self::slot_index(node_id).and_then(|idx| self.incoming.get_mut(idx)) {
590            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
591                v.swap_remove(pos);
592            }
593        }
594    }
595
596    pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
597        let mut seen = BTreeSet::new();
598
599        labels
600            .into_iter()
601            .map(|s| s.trim().to_string())
602            .filter(|s| !s.is_empty())
603            .filter(|s| seen.insert(s.clone()))
604            .collect()
605    }
606
607    pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
608        // Hot path: skip the `String` alloc when the label bucket already
609        // exists. The monotonic-id invariant on the create path guarantees
610        // `node_id` is unique, so we push unconditionally; the previous
611        // `contains` guard turned bulk CREATE into O(n²).
612        if let Some(bucket) = self.nodes_by_label.get_mut(label) {
613            bucket.push(node_id);
614        } else {
615            self.nodes_by_label.insert(label.to_string(), vec![node_id]);
616        }
617    }
618
619    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
620        if let Some(ids) = self.nodes_by_label.get_mut(label) {
621            if let Some(pos) = ids.iter().position(|&id| id == node_id) {
622                ids.swap_remove(pos);
623            }
624            if ids.is_empty() {
625                self.nodes_by_label.remove(label);
626            }
627        }
628    }
629
630    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
631        // See `insert_node_label_index` for the same hot-path rationale.
632        if let Some(bucket) = self.relationships_by_type.get_mut(rel_type) {
633            bucket.push(rel_id);
634        } else {
635            self.relationships_by_type
636                .insert(rel_type.to_string(), vec![rel_id]);
637        }
638    }
639
640    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
641        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
642            if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
643                ids.swap_remove(pos);
644            }
645            if ids.is_empty() {
646                self.relationships_by_type.remove(rel_type);
647            }
648        }
649    }
650
651    pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
652        self.indexes
653            .properties
654            .read()
655            .unwrap_or_else(|poisoned| poisoned.into_inner())
656    }
657
658    pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
659        self.indexes
660            .properties
661            .write()
662            .unwrap_or_else(|poisoned| poisoned.into_inner())
663    }
664
665    pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
666        self.indexes
667            .properties
668            .get_mut()
669            .unwrap_or_else(|poisoned| poisoned.into_inner())
670    }
671
672    #[inline]
673    pub(super) fn active_node_property_index_count(&self) -> usize {
674        self.indexes
675            .active_node_property_indexes
676            .load(Ordering::Relaxed)
677    }
678
679    #[inline]
680    pub(super) fn active_relationship_property_index_count(&self) -> usize {
681        self.indexes
682            .active_relationship_property_indexes
683            .load(Ordering::Relaxed)
684    }
685
686    #[inline]
687    pub(super) fn active_constraint_count(&self) -> usize {
688        self.active_constraints.load(Ordering::Relaxed)
689    }
690
691    #[inline]
692    pub(super) fn has_active_constraints(&self) -> bool {
693        self.active_constraint_count() != 0
694    }
695
696    #[inline]
697    pub(super) fn active_fulltext_index_count(&self) -> usize {
698        self.indexes.active_fulltext_indexes.load(Ordering::Relaxed)
699    }
700
701    #[inline]
702    pub(super) fn has_active_fulltext_indexes(&self) -> bool {
703        self.active_fulltext_index_count() != 0
704    }
705
706    pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
707        self.active_node_property_index_count() != 0
708            && self.indexes_mut().node_properties.is_active(key)
709    }
710
711    pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
712        self.active_relationship_property_index_count() != 0
713            && self.indexes_mut().relationship_properties.is_active(key)
714    }
715
716    pub(super) fn ensure_node_property_index(&self, key: &str) {
717        {
718            let indexes = self.indexes_read();
719            if indexes.node_properties.is_active(key) {
720                return;
721            }
722        }
723
724        let mut indexes = self.indexes_write();
725        if indexes.node_properties.is_active(key) {
726            return;
727        }
728
729        for (id, node) in self.iter_nodes() {
730            if let Some(value) = node.properties.get(key) {
731                indexes.node_properties.insert_with_scopes(
732                    id,
733                    node.labels.iter().map(String::as_str),
734                    key,
735                    value,
736                );
737            }
738        }
739        if indexes.node_properties.activate(key) {
740            self.indexes
741                .active_node_property_indexes
742                .fetch_add(1, Ordering::Relaxed);
743        }
744    }
745
746    pub(super) fn ensure_relationship_property_index(&self, key: &str) {
747        {
748            let indexes = self.indexes_read();
749            if indexes.relationship_properties.is_active(key) {
750                return;
751            }
752        }
753
754        let mut indexes = self.indexes_write();
755        if indexes.relationship_properties.is_active(key) {
756            return;
757        }
758
759        for (id, rel) in self.iter_rels() {
760            if let Some(value) = rel.properties.get(key) {
761                indexes.relationship_properties.insert_with_scopes(
762                    id,
763                    [rel.rel_type.as_str()],
764                    key,
765                    value,
766                );
767            }
768        }
769        if indexes.relationship_properties.activate(key) {
770            self.indexes
771                .active_relationship_property_indexes
772                .fetch_add(1, Ordering::Relaxed);
773        }
774    }
775
776    pub(super) fn index_catalog_read(&self) -> std::sync::RwLockReadGuard<'_, IndexCatalog> {
777        self.indexes
778            .catalog
779            .read()
780            .unwrap_or_else(|poisoned| poisoned.into_inner())
781    }
782
783    pub(super) fn index_catalog_write(&self) -> RwLockWriteGuard<'_, IndexCatalog> {
784        self.indexes
785            .catalog
786            .write()
787            .unwrap_or_else(|poisoned| poisoned.into_inner())
788    }
789
790    pub(super) fn constraint_catalog_read(
791        &self,
792    ) -> std::sync::RwLockReadGuard<'_, ConstraintCatalog> {
793        self.constraint_catalog
794            .read()
795            .unwrap_or_else(|poisoned| poisoned.into_inner())
796    }
797
798    pub(super) fn constraint_catalog_write(&self) -> RwLockWriteGuard<'_, ConstraintCatalog> {
799        self.constraint_catalog
800            .write()
801            .unwrap_or_else(|poisoned| poisoned.into_inner())
802    }
803
804    /// Register an explicitly-declared index in the catalog and, when
805    /// applicable, force the underlying property-index buckets to be
806    /// populated so equality lookups can use them immediately.
807    ///
808    /// Named with a `register_` prefix to avoid colliding with the
809    /// trait method `GraphStorageMut::create_index` — the trait impl
810    /// in `impls.rs` delegates here.
811    #[allow(clippy::result_large_err)]
812    pub(super) fn register_index(
813        &self,
814        request: IndexRequest,
815        if_not_exists: bool,
816    ) -> Result<CreateIndexOutcome, CreateIndexError> {
817        self.register_index_with_recording(request, if_not_exists, true)
818    }
819
820    #[allow(clippy::result_large_err)]
821    fn register_index_with_recording(
822        &self,
823        request: IndexRequest,
824        if_not_exists: bool,
825        record_event: bool,
826    ) -> Result<CreateIndexOutcome, CreateIndexError> {
827        let request_for_event = record_event.then(|| request.clone());
828        let outcome = {
829            let mut catalog = self.index_catalog_write();
830            catalog.try_create(request, if_not_exists)?
831        };
832
833        if let CreateIndexOutcome::Created(def) = &outcome {
834            self.populate_index_data(def);
835        }
836
837        // Both Created and NoOpExists are committed catalog states; we
838        // log only Created because NoOpExists implies a redundant DDL
839        // that adds nothing to durable state.
840        if matches!(outcome, CreateIndexOutcome::Created(_)) {
841            if let Some(request_for_event) = request_for_event {
842                self.emit(|| crate::MutationEvent::CreateIndex {
843                    request: request_for_event,
844                    if_not_exists,
845                });
846            }
847        }
848
849        Ok(outcome)
850    }
851
852    /// Replay a CreateIndex event against an empty graph. Mirrors the
853    /// `replay_create_node` shape: callers must invoke before installing
854    /// a recorder so we don't re-emit during recovery.
855    #[doc(hidden)]
856    pub fn replay_create_index(
857        &mut self,
858        request: IndexRequest,
859        if_not_exists: bool,
860    ) -> Result<(), String> {
861        if self.recorder.is_some() {
862            return Err("cannot replay create_index while a mutation recorder is installed".into());
863        }
864        self.register_index(request, if_not_exists)
865            .map(|_| ())
866            .map_err(|e| e.to_string())
867    }
868
869    /// Replay a DropIndex event.
870    #[doc(hidden)]
871    pub fn replay_drop_index(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
872        if self.recorder.is_some() {
873            return Err("cannot replay drop_index while a mutation recorder is installed".into());
874        }
875        self.drop_named_index(name, if_exists)
876            .map(|_| ())
877            .map_err(|e| e.to_string())
878    }
879
880    /// Register a constraint. For uniqueness/key kinds this also
881    /// registers a backing RANGE index in the index catalog under the
882    /// same name. Validation of existing data is the caller's
883    /// responsibility (the enforcement layer runs a pre-create scan
884    /// before this method commits).
885    pub(super) fn register_constraint(
886        &self,
887        request: ConstraintRequest,
888        if_not_exists: bool,
889    ) -> Result<CreateConstraintOutcome, CreateConstraintError> {
890        // Constraint-level conflicts (22N65/66/67) take precedence over
891        // index-catalog conflicts: if the request collides with an
892        // existing *constraint* shape or name, we never get to the
893        // backing-index step.
894        {
895            let constraint_catalog = self.constraint_catalog_read();
896            if let Some(existing) = constraint_catalog.find_equivalent(&request) {
897                let cloned = existing.clone();
898                drop(constraint_catalog);
899                if if_not_exists {
900                    return Ok(CreateConstraintOutcome::NoOpExists(cloned));
901                }
902                return Err(CreateConstraintError::EquivalentConstraintExists(
903                    cloned.name,
904                ));
905            }
906            if let Some(existing) = constraint_catalog.get(&request.name) {
907                let cloned = existing.clone();
908                drop(constraint_catalog);
909                if if_not_exists {
910                    return Ok(CreateConstraintOutcome::NoOpExists(cloned));
911                }
912                return Err(CreateConstraintError::DuplicateName(cloned.name));
913            }
914            if let Some(existing) = constraint_catalog.find_same_schema(&request) {
915                let cloned = existing.clone();
916                drop(constraint_catalog);
917                if super::constraint_catalog::kinds_conflict_for_validation(
918                    &cloned.kind,
919                    &request.kind,
920                ) {
921                    if if_not_exists {
922                        return Ok(CreateConstraintOutcome::NoOpExists(cloned));
923                    }
924                    return Err(CreateConstraintError::ConflictingConstraint(cloned.name));
925                }
926            }
927        }
928
929        // Index-catalog conflicts only matter for constraints that need
930        // a backing range index. The catalog won't yet own one for this
931        // request — that registration happens below — so any existing
932        // entry under the same name or schema is from a foreign index.
933        if request.kind.requires_backing_index() {
934            let idx_catalog = self.index_catalog_read();
935            if idx_catalog.get(&request.name).is_some() {
936                return Err(CreateConstraintError::DuplicateIndexName(
937                    request.name.clone(),
938                ));
939            }
940            let conflict = idx_catalog.list().into_iter().find(|def| {
941                def.kind == StoredIndexKind::Range
942                    && def.entity == request.entity
943                    && def.label.as_deref() == Some(request.label.as_str())
944                    && def.properties == request.properties
945                    && def.name != request.name
946            });
947            drop(idx_catalog);
948            if let Some(def) = conflict {
949                return Err(CreateConstraintError::BackingIndexConflict(format!(
950                    "(:{} {{{}}}) already covered by index `{}`",
951                    request.label,
952                    request.properties.join(", "),
953                    def.name,
954                )));
955            }
956        }
957
958        let owns_backing = request.kind.requires_backing_index();
959        let request_for_event = request.clone();
960        let outcome = {
961            let mut catalog = self.constraint_catalog_write();
962            catalog.try_create(request, if_not_exists)?
963        };
964
965        if let CreateConstraintOutcome::Created(def) = &outcome {
966            // Pre-create data scan: if the live graph already violates
967            // the constraint, fail and roll back the catalog write.
968            // Creating constraints when conflicting data exists fails
969            // before the catalog change is retained (22N77/79/80).
970            if let Err(violation) = self.validate_existing_data_for_constraint(def) {
971                let mut catalog = self.constraint_catalog_write();
972                let _ = catalog.try_drop(&def.name, true);
973                return Err(CreateConstraintError::DataViolation(violation.to_string()));
974            }
975        }
976
977        if let CreateConstraintOutcome::Created(def) = &outcome {
978            if owns_backing {
979                // Register a backing RANGE index under the same name. This
980                // is an implementation detail of the constraint, so WAL and
981                // snapshot replay record only the constraint mutation.
982                let idx_request = IndexRequest {
983                    explicit_name: Some(def.name.clone()),
984                    kind: StoredIndexKind::Range,
985                    entity: def.entity,
986                    label: Some(def.label.clone()),
987                    additional_labels: Vec::new(),
988                    properties: def.properties.clone(),
989                    options: Default::default(),
990                };
991                // Errors from the backing-index registration unwind the
992                // constraint registration to keep the two catalogs in
993                // step.
994                if let Err(err) = self.register_index_with_recording(idx_request, true, false) {
995                    let mut catalog = self.constraint_catalog_write();
996                    let _ = catalog.try_drop(&def.name, true);
997                    return Err(CreateConstraintError::BackingIndexConflict(err.to_string()));
998                }
999            }
1000            self.emit(|| crate::MutationEvent::CreateConstraint {
1001                request: request_for_event,
1002                if_not_exists,
1003            });
1004            self.active_constraints.fetch_add(1, Ordering::Relaxed);
1005        }
1006
1007        Ok(outcome)
1008    }
1009
1010    /// Replay a CreateConstraint event against a recorder-detached graph.
1011    #[doc(hidden)]
1012    pub fn replay_create_constraint(
1013        &mut self,
1014        request: ConstraintRequest,
1015        if_not_exists: bool,
1016    ) -> Result<(), String> {
1017        if self.recorder.is_some() {
1018            return Err(
1019                "cannot replay create_constraint while a mutation recorder is installed".into(),
1020            );
1021        }
1022        self.register_constraint(request, if_not_exists)
1023            .map(|_| ())
1024            .map_err(|e| e.to_string())
1025    }
1026
1027    /// Replay a DropConstraint event.
1028    #[doc(hidden)]
1029    pub fn replay_drop_constraint(&mut self, name: &str, if_exists: bool) -> Result<(), String> {
1030        if self.recorder.is_some() {
1031            return Err(
1032                "cannot replay drop_constraint while a mutation recorder is installed".into(),
1033            );
1034        }
1035        self.drop_named_constraint(name, if_exists)
1036            .map(|_| ())
1037            .map_err(|e| e.to_string())
1038    }
1039
1040    /// Inverse of [`Self::register_constraint`]. Cascades to the backing
1041    /// range index when one is owned.
1042    pub(super) fn drop_named_constraint(
1043        &self,
1044        name: &str,
1045        if_exists: bool,
1046    ) -> Result<DropConstraintOutcome, DropConstraintError> {
1047        let outcome = {
1048            let mut catalog = self.constraint_catalog_write();
1049            catalog.try_drop(name, if_exists)?
1050        };
1051        if let DropConstraintOutcome::Dropped(def) = &outcome {
1052            if let Some(index_name) = def.owned_index.as_deref() {
1053                // The backing index is owned exclusively by the
1054                // constraint, so dropping it is unconditional.
1055                let _ = self.drop_named_index_inner(index_name, true, false);
1056            }
1057            self.active_constraints.fetch_sub(1, Ordering::Relaxed);
1058            self.emit(|| crate::MutationEvent::DropConstraint {
1059                name: name.to_string(),
1060                if_exists,
1061            });
1062        }
1063        Ok(outcome)
1064    }
1065
1066    /// Inverse of [`Self::register_index`]. Removes the catalog entry
1067    /// and (for RANGE) leaves the underlying property-index buckets in
1068    /// place — they may still be needed for lazy-activation lookups
1069    /// even after the explicit DDL declaration is gone.
1070    pub(super) fn drop_named_index(
1071        &self,
1072        name: &str,
1073        if_exists: bool,
1074    ) -> Result<DropIndexOutcome, DropIndexError> {
1075        self.drop_named_index_inner(name, if_exists, true)
1076    }
1077
1078    fn drop_named_index_inner(
1079        &self,
1080        name: &str,
1081        if_exists: bool,
1082        emit_event: bool,
1083    ) -> Result<DropIndexOutcome, DropIndexError> {
1084        if let Some(owner) = self
1085            .constraint_catalog_read()
1086            .constraint_owning_index(name)
1087            .cloned()
1088        {
1089            return Err(DropIndexError::ConstraintOwned {
1090                index: name.to_string(),
1091                constraint: owner.name,
1092            });
1093        }
1094
1095        let outcome = {
1096            let mut catalog = self.index_catalog_write();
1097            catalog.try_drop(name, if_exists)?
1098        };
1099        if let DropIndexOutcome::Dropped(def) = &outcome {
1100            // Release backing structures keyed off the dropped def.
1101            match def.kind {
1102                StoredIndexKind::Text => {
1103                    if let Some(label) = def.label.as_deref() {
1104                        for prop in &def.properties {
1105                            self.deactivate_text_scope(def.entity, label, prop);
1106                        }
1107                    }
1108                }
1109                StoredIndexKind::Range => {
1110                    if let Some(label) = def.label.as_deref() {
1111                        for prop in &def.properties {
1112                            self.deactivate_sorted_scope(def.entity, label, prop);
1113                        }
1114                    }
1115                }
1116                StoredIndexKind::Point => {
1117                    if let Some(label) = def.label.as_deref() {
1118                        for prop in &def.properties {
1119                            self.deactivate_point_scope(def.entity, label, prop);
1120                        }
1121                    }
1122                }
1123                StoredIndexKind::Lookup | StoredIndexKind::Vector => {
1124                    // VECTOR uses a flat per-query scan today; no
1125                    // backing structure to release.
1126                }
1127                StoredIndexKind::Fulltext => {
1128                    self.deactivate_fulltext_index(def.entity, &def.name);
1129                }
1130            }
1131            if emit_event {
1132                self.emit(|| crate::MutationEvent::DropIndex {
1133                    name: name.to_string(),
1134                    if_exists,
1135                });
1136            }
1137        }
1138        Ok(outcome)
1139    }
1140
1141    fn populate_index_data(&self, def: &IndexDefinition) {
1142        // RANGE: piggy-back on the existing lazy property-index buckets.
1143        // TEXT: build a trigram inverted index over the existing entity
1144        //       data for the (label, property) tuple.
1145        // POINT: build a grid-bucket spatial index over the existing
1146        //        entity data.
1147        // LOOKUP: catalog-only; existing label/type indexes already
1148        //         answer the predicates.
1149        match def.kind {
1150            StoredIndexKind::Range => {
1151                for key in &def.properties {
1152                    match def.entity {
1153                        StoredIndexEntity::Node => self.ensure_node_property_index(key),
1154                        StoredIndexEntity::Relationship => {
1155                            self.ensure_relationship_property_index(key)
1156                        }
1157                    }
1158                    if let Some(label) = def.label.as_deref() {
1159                        self.activate_sorted_scope(def.entity, label, key);
1160                    }
1161                }
1162            }
1163            StoredIndexKind::Text => {
1164                let label = match def.label.as_deref() {
1165                    Some(l) => l,
1166                    None => return,
1167                };
1168                for property in &def.properties {
1169                    self.activate_text_scope(def.entity, label, property);
1170                }
1171            }
1172            StoredIndexKind::Point => {
1173                let label = match def.label.as_deref() {
1174                    Some(l) => l,
1175                    None => return,
1176                };
1177                let cell_size = point_cell_size_from_options(&def.options);
1178                for property in &def.properties {
1179                    self.activate_point_scope(def.entity, label, property, cell_size);
1180                }
1181            }
1182            StoredIndexKind::Fulltext => {
1183                let labels: Vec<String> = def.all_labels().map(String::from).collect();
1184                if labels.is_empty() {
1185                    return;
1186                }
1187                self.activate_fulltext_index(def.entity, &def.name, &labels, &def.properties);
1188            }
1189            // LOOKUP rides on the label/type indexes maintained eagerly.
1190            // VECTOR runs flat scans per query — no precomputed structure
1191            // to populate here.
1192            StoredIndexKind::Lookup | StoredIndexKind::Vector => {}
1193        }
1194    }
1195
1196    pub(super) fn text_indexes_read(
1197        &self,
1198        entity: StoredIndexEntity,
1199    ) -> std::sync::RwLockReadGuard<'_, TrigramRegistry> {
1200        self.indexes.text.read(entity)
1201    }
1202
1203    pub(super) fn text_indexes_write(
1204        &self,
1205        entity: StoredIndexEntity,
1206    ) -> RwLockWriteGuard<'_, TrigramRegistry> {
1207        self.indexes.text.write(entity)
1208    }
1209
1210    pub(super) fn fulltext_indexes_read(
1211        &self,
1212        entity: StoredIndexEntity,
1213    ) -> std::sync::RwLockReadGuard<'_, FulltextRegistry> {
1214        self.indexes.fulltext.read(entity)
1215    }
1216
1217    #[allow(dead_code)]
1218    pub(super) fn fulltext_indexes_write(
1219        &self,
1220        entity: StoredIndexEntity,
1221    ) -> RwLockWriteGuard<'_, FulltextRegistry> {
1222        self.indexes.fulltext.write(entity)
1223    }
1224
1225    fn activate_text_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1226        if !self.text_indexes_write(entity).add_scope(label, property) {
1227            return;
1228        }
1229
1230        let backfill: Vec<(u64, String)> = match entity {
1231            StoredIndexEntity::Node => self
1232                .iter_nodes()
1233                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1234                .filter_map(|(id, node)| match node.properties.get(property) {
1235                    Some(PropertyValue::String(value)) => Some((id, value.clone())),
1236                    _ => None,
1237                })
1238                .collect(),
1239            StoredIndexEntity::Relationship => self
1240                .iter_rels()
1241                .filter(|(_, rel)| rel.rel_type == label)
1242                .filter_map(|(id, rel)| match rel.properties.get(property) {
1243                    Some(PropertyValue::String(value)) => Some((id, value.clone())),
1244                    _ => None,
1245                })
1246                .collect(),
1247        };
1248
1249        let mut registry = self.text_indexes_write(entity);
1250        for (id, value) in backfill {
1251            registry.insert(label, property, id, &value);
1252        }
1253    }
1254
1255    /// Drop a (label, property) text scope, decrementing the refcount.
1256    pub(super) fn deactivate_text_scope(
1257        &self,
1258        entity: StoredIndexEntity,
1259        label: &str,
1260        property: &str,
1261    ) {
1262        self.text_indexes_write(entity)
1263            .remove_scope(label, property);
1264    }
1265
1266    fn activate_fulltext_index(
1267        &self,
1268        entity: StoredIndexEntity,
1269        name: &str,
1270        labels: &[String],
1271        properties: &[String],
1272    ) {
1273        use super::fulltext_index::{term_counts_for_properties, TermCounts};
1274
1275        {
1276            let mut registry = self.fulltext_indexes_write(entity);
1277            registry.register(name.to_string(), labels.to_vec(), properties.to_vec());
1278        }
1279        self.indexes
1280            .active_fulltext_indexes
1281            .fetch_add(1, Ordering::Relaxed);
1282
1283        // Backfill: walk every entity matching any label, tokenise covered
1284        // string properties, install one posting batch per entity.
1285        let backfill: Vec<(u64, TermCounts)> = match entity {
1286            StoredIndexEntity::Node => self
1287                .iter_nodes()
1288                .filter(|(_, node)| {
1289                    labels
1290                        .iter()
1291                        .any(|wanted| node.labels.iter().any(|l| l == wanted))
1292                })
1293                .map(|(id, node)| {
1294                    let counts = term_counts_for_properties(&node.properties, properties);
1295                    (id, counts)
1296                })
1297                .filter(|(_, c)| !c.is_empty())
1298                .collect(),
1299            StoredIndexEntity::Relationship => self
1300                .iter_rels()
1301                .filter(|(_, rel)| labels.iter().any(|wanted| wanted == &rel.rel_type))
1302                .map(|(id, rel)| {
1303                    let counts = term_counts_for_properties(&rel.properties, properties);
1304                    (id, counts)
1305                })
1306                .filter(|(_, c)| !c.is_empty())
1307                .collect(),
1308        };
1309
1310        let mut registry = self.fulltext_indexes_write(entity);
1311        if let Some(index) = registry.get_mut(name) {
1312            for (id, counts) in backfill {
1313                index.reindex_entity(id, counts);
1314            }
1315        }
1316    }
1317
1318    pub(super) fn deactivate_fulltext_index(&self, entity: StoredIndexEntity, name: &str) {
1319        self.fulltext_indexes_write(entity).deregister(name);
1320        self.indexes
1321            .active_fulltext_indexes
1322            .fetch_sub(1, Ordering::Relaxed);
1323    }
1324
1325    pub(super) fn sorted_indexes_read(
1326        &self,
1327        entity: StoredIndexEntity,
1328    ) -> std::sync::RwLockReadGuard<'_, SortedPropertyIndex> {
1329        self.indexes.sorted.read(entity)
1330    }
1331
1332    pub(super) fn sorted_indexes_write(
1333        &self,
1334        entity: StoredIndexEntity,
1335    ) -> RwLockWriteGuard<'_, SortedPropertyIndex> {
1336        self.indexes.sorted.write(entity)
1337    }
1338
1339    fn activate_sorted_scope(&self, entity: StoredIndexEntity, label: &str, property: &str) {
1340        if !self.sorted_indexes_write(entity).add_scope(label, property) {
1341            return;
1342        }
1343
1344        let backfill: Vec<(u64, PropertyValue)> = match entity {
1345            StoredIndexEntity::Node => self
1346                .iter_nodes()
1347                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1348                .filter_map(|(id, node)| {
1349                    node.properties
1350                        .get(property)
1351                        .map(|value| (id, value.clone()))
1352                })
1353                .collect(),
1354            StoredIndexEntity::Relationship => self
1355                .iter_rels()
1356                .filter(|(_, rel)| rel.rel_type == label)
1357                .filter_map(|(id, rel)| {
1358                    rel.properties
1359                        .get(property)
1360                        .map(|value| (id, value.clone()))
1361                })
1362                .collect(),
1363        };
1364
1365        let mut registry = self.sorted_indexes_write(entity);
1366        for (id, value) in backfill {
1367            registry.insert(label, property, id, &value);
1368        }
1369    }
1370
1371    pub(super) fn deactivate_sorted_scope(
1372        &self,
1373        entity: StoredIndexEntity,
1374        label: &str,
1375        property: &str,
1376    ) {
1377        self.sorted_indexes_write(entity)
1378            .remove_scope(label, property);
1379    }
1380
1381    pub(super) fn point_indexes_read(
1382        &self,
1383        entity: StoredIndexEntity,
1384    ) -> std::sync::RwLockReadGuard<'_, PointRegistry> {
1385        self.indexes.point.read(entity)
1386    }
1387
1388    pub(super) fn point_indexes_write(
1389        &self,
1390        entity: StoredIndexEntity,
1391    ) -> RwLockWriteGuard<'_, PointRegistry> {
1392        self.indexes.point.write(entity)
1393    }
1394
1395    fn activate_point_scope(
1396        &self,
1397        entity: StoredIndexEntity,
1398        label: &str,
1399        property: &str,
1400        cell_size: Option<f64>,
1401    ) {
1402        if !self
1403            .point_indexes_write(entity)
1404            .add_scope(label, property, cell_size)
1405        {
1406            return;
1407        }
1408
1409        let backfill: Vec<(u64, LoraPoint)> = match entity {
1410            StoredIndexEntity::Node => self
1411                .iter_nodes()
1412                .filter(|(_, node)| node.labels.iter().any(|l| l == label))
1413                .filter_map(|(id, node)| match node.properties.get(property) {
1414                    Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1415                    _ => None,
1416                })
1417                .collect(),
1418            StoredIndexEntity::Relationship => self
1419                .iter_rels()
1420                .filter(|(_, rel)| rel.rel_type == label)
1421                .filter_map(|(id, rel)| match rel.properties.get(property) {
1422                    Some(PropertyValue::Point(point)) => Some((id, point.clone())),
1423                    _ => None,
1424                })
1425                .collect(),
1426        };
1427
1428        let mut registry = self.point_indexes_write(entity);
1429        for (id, point) in backfill {
1430            registry.insert(label, property, id, point);
1431        }
1432    }
1433
1434    pub(super) fn deactivate_point_scope(
1435        &self,
1436        entity: StoredIndexEntity,
1437        label: &str,
1438        property: &str,
1439    ) {
1440        self.point_indexes_write(entity)
1441            .remove_scope(label, property);
1442    }
1443
1444    /// Snapshot of cardinality stats. Cheap: derived from already-tracked
1445    /// `nodes_by_label` / `relationships_by_type` lengths and the active
1446    /// property-index buckets. The cost model uses this to populate
1447    /// `estimated_rows` on plan-tree nodes.
1448    pub fn graph_stats(&self) -> GraphStats {
1449        let mut stats = GraphStats {
1450            node_count: self.live_node_count,
1451            relationship_count: self.live_rel_count,
1452            ..Default::default()
1453        };
1454        for (label, ids) in &self.nodes_by_label {
1455            stats.nodes_by_label.insert(label.clone(), ids.len());
1456        }
1457        for (rel_type, ids) in &self.relationships_by_type {
1458            stats
1459                .relationships_by_type
1460                .insert(rel_type.clone(), ids.len());
1461        }
1462        // Distinct values per (label, property): pulled from the
1463        // property-index scoped buckets, where we already track the
1464        // per-scope value distribution. Empty for properties without
1465        // an active hash-index — the cost model falls back to a
1466        // conservative estimate in that case.
1467        let prop_indexes = self.indexes_read();
1468        for (scope, props) in &prop_indexes.node_properties.scoped_values {
1469            for (key, values) in props {
1470                stats
1471                    .node_distinct_values
1472                    .insert((scope.clone(), key.clone()), values.len());
1473            }
1474        }
1475        for (scope, props) in &prop_indexes.relationship_properties.scoped_values {
1476            for (key, values) in props {
1477                stats
1478                    .relationship_distinct_values
1479                    .insert((scope.clone(), key.clone()), values.len());
1480            }
1481        }
1482
1483        for def in self.index_catalog_read().list() {
1484            if def.state != StoredIndexState::Online {
1485                continue;
1486            }
1487            let Some(label) = def.label else {
1488                continue;
1489            };
1490            for property in def.properties {
1491                let scope = (label.clone(), property);
1492                match (def.entity, def.kind) {
1493                    (StoredIndexEntity::Node, StoredIndexKind::Range) => {
1494                        stats.node_range_indexes.insert(scope);
1495                    }
1496                    (StoredIndexEntity::Node, StoredIndexKind::Text) => {
1497                        stats.node_text_indexes.insert(scope);
1498                    }
1499                    (StoredIndexEntity::Node, StoredIndexKind::Point) => {
1500                        stats.node_point_indexes.insert(scope);
1501                    }
1502                    (StoredIndexEntity::Relationship, StoredIndexKind::Range) => {
1503                        stats.relationship_range_indexes.insert(scope);
1504                    }
1505                    (StoredIndexEntity::Relationship, StoredIndexKind::Text) => {
1506                        stats.relationship_text_indexes.insert(scope);
1507                    }
1508                    (StoredIndexEntity::Relationship, StoredIndexKind::Point) => {
1509                        stats.relationship_point_indexes.insert(scope);
1510                    }
1511                    (StoredIndexEntity::Node, StoredIndexKind::Vector) => {
1512                        stats.node_vector_indexes.insert(scope);
1513                    }
1514                    (StoredIndexEntity::Relationship, StoredIndexKind::Vector) => {
1515                        stats.relationship_vector_indexes.insert(scope);
1516                    }
1517                    (_, StoredIndexKind::Lookup | StoredIndexKind::Fulltext) => {}
1518                }
1519            }
1520        }
1521        stats
1522    }
1523
1524    pub(super) fn rebuild_property_indexes(&mut self) {
1525        let mut indexes = PropertyIndexRegistry::default();
1526
1527        for (id, node) in self.iter_nodes() {
1528            for (key, value) in &node.properties {
1529                if PropertyIndexKey::from_value(value).is_some() {
1530                    indexes.node_properties.activate(key);
1531                    indexes.node_properties.insert_with_scopes(
1532                        id,
1533                        node.labels.iter().map(String::as_str),
1534                        key,
1535                        value,
1536                    );
1537                }
1538            }
1539        }
1540
1541        for (id, rel) in self.iter_rels() {
1542            for (key, value) in &rel.properties {
1543                if PropertyIndexKey::from_value(value).is_some() {
1544                    indexes.relationship_properties.activate(key);
1545                    indexes.relationship_properties.insert_with_scopes(
1546                        id,
1547                        [rel.rel_type.as_str()],
1548                        key,
1549                        value,
1550                    );
1551                }
1552            }
1553        }
1554
1555        let node_index_count = indexes.node_properties.active_keys.len();
1556        let relationship_index_count = indexes.relationship_properties.active_keys.len();
1557        *self.indexes_mut() = indexes;
1558        self.indexes
1559            .active_node_property_indexes
1560            .store(node_index_count, Ordering::Relaxed);
1561        self.indexes
1562            .active_relationship_property_indexes
1563            .store(relationship_index_count, Ordering::Relaxed);
1564    }
1565
1566    pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
1567        for label in &node.labels {
1568            self.insert_node_label_index(node.id, label);
1569        }
1570        self.index_node_properties_if_active(
1571            node.id,
1572            node.labels.iter().map(String::as_str),
1573            &node.properties,
1574        );
1575        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1576    }
1577
1578    pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
1579        for label in &node.labels {
1580            self.insert_node_label_index(node.id, label);
1581        }
1582        self.index_node_properties_eager(
1583            node.id,
1584            node.labels.iter().map(String::as_str),
1585            &node.properties,
1586        );
1587        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Insert);
1588    }
1589
1590    pub(super) fn on_node_property_set(
1591        &mut self,
1592        node_id: NodeId,
1593        key: &str,
1594        old: Option<&PropertyValue>,
1595        new: &PropertyValue,
1596    ) {
1597        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1598            return;
1599        };
1600
1601        if self.node_property_index_is_active(key) {
1602            if let Some(old) = old {
1603                self.unindex_node_property_if_active(
1604                    node_id,
1605                    labels.iter().map(String::as_str),
1606                    key,
1607                    old,
1608                );
1609            }
1610            self.index_node_property_if_active(
1611                node_id,
1612                labels.iter().map(String::as_str),
1613                key,
1614                new,
1615            );
1616        }
1617
1618        self.update_secondary_property(
1619            StoredIndexEntity::Node,
1620            labels.iter().map(String::as_str),
1621            node_id,
1622            key,
1623            old,
1624            Some(new),
1625        );
1626    }
1627
1628    pub(super) fn on_node_property_removed(
1629        &mut self,
1630        node_id: NodeId,
1631        key: &str,
1632        old: &PropertyValue,
1633    ) {
1634        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
1635            return;
1636        };
1637        if self.node_property_index_is_active(key) {
1638            self.unindex_node_property_if_active(
1639                node_id,
1640                labels.iter().map(String::as_str),
1641                key,
1642                old,
1643            );
1644        }
1645        self.update_secondary_property(
1646            StoredIndexEntity::Node,
1647            labels.iter().map(String::as_str),
1648            node_id,
1649            key,
1650            Some(old),
1651            None,
1652        );
1653    }
1654
1655    pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
1656        self.insert_node_label_index(node_id, label);
1657
1658        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1659            return;
1660        };
1661        if self.active_node_property_index_count() != 0 {
1662            self.index_node_scope_properties_if_active(node_id, label, &properties);
1663        }
1664        for (key, value) in &properties {
1665            self.update_secondary_property(
1666                StoredIndexEntity::Node,
1667                [label],
1668                node_id,
1669                key,
1670                None,
1671                Some(value),
1672            );
1673        }
1674    }
1675
1676    pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
1677        self.remove_node_label_index(node_id, label);
1678
1679        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
1680            return;
1681        };
1682        if self.active_node_property_index_count() != 0 {
1683            self.unindex_node_scope_properties_if_active(node_id, label, &properties);
1684        }
1685        for (key, value) in &properties {
1686            self.update_secondary_property(
1687                StoredIndexEntity::Node,
1688                [label],
1689                node_id,
1690                key,
1691                Some(value),
1692                None,
1693            );
1694        }
1695    }
1696
1697    pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
1698        for label in &node.labels {
1699            self.remove_node_label_index(node.id, label);
1700        }
1701        self.unindex_active_node_properties(
1702            node.id,
1703            node.labels.iter().map(String::as_str),
1704            &node.properties,
1705        );
1706        self.maintain_node_secondary_indexes(node, SecondaryIndexMutation::Remove);
1707    }
1708
1709    pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
1710        self.attach_relationship(rel);
1711        self.index_relationship_properties_if_active(
1712            rel.id,
1713            [rel.rel_type.as_str()],
1714            &rel.properties,
1715        );
1716        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1717    }
1718
1719    pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
1720        self.attach_relationship(rel);
1721        self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
1722        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Insert);
1723    }
1724
1725    pub(super) fn on_relationship_property_set(
1726        &mut self,
1727        rel_id: RelationshipId,
1728        key: &str,
1729        old: Option<&PropertyValue>,
1730        new: &PropertyValue,
1731    ) {
1732        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1733            return;
1734        };
1735
1736        if self.relationship_property_index_is_active(key) {
1737            if let Some(old) = old {
1738                self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1739            }
1740            self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
1741        }
1742
1743        self.update_secondary_property(
1744            StoredIndexEntity::Relationship,
1745            [rel_type.as_str()],
1746            rel_id,
1747            key,
1748            old,
1749            Some(new),
1750        );
1751    }
1752
1753    pub(super) fn on_relationship_property_removed(
1754        &mut self,
1755        rel_id: RelationshipId,
1756        key: &str,
1757        old: &PropertyValue,
1758    ) {
1759        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
1760            return;
1761        };
1762        if self.relationship_property_index_is_active(key) {
1763            self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
1764        }
1765        self.update_secondary_property(
1766            StoredIndexEntity::Relationship,
1767            [rel_type.as_str()],
1768            rel_id,
1769            key,
1770            Some(old),
1771            None,
1772        );
1773    }
1774
1775    pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
1776        self.detach_relationship_indexes(rel);
1777        self.unindex_active_relationship_properties(
1778            rel.id,
1779            [rel.rel_type.as_str()],
1780            &rel.properties,
1781        );
1782        self.maintain_relationship_secondary_indexes(rel, SecondaryIndexMutation::Remove);
1783    }
1784
1785    fn index_node_property_eager<'a>(
1786        &mut self,
1787        node_id: NodeId,
1788        labels: impl IntoIterator<Item = &'a str>,
1789        key: &str,
1790        value: &PropertyValue,
1791    ) {
1792        if PropertyIndexKey::from_value(value).is_none() {
1793            return;
1794        }
1795
1796        let activated = {
1797            let indexes = self.indexes_mut();
1798            let activated = indexes.node_properties.activate(key);
1799            indexes
1800                .node_properties
1801                .insert_with_scopes(node_id, labels, key, value);
1802            activated
1803        };
1804        if activated {
1805            self.indexes
1806                .active_node_property_indexes
1807                .fetch_add(1, Ordering::Relaxed);
1808        }
1809    }
1810
1811    fn index_relationship_property_eager<'a>(
1812        &mut self,
1813        rel_id: RelationshipId,
1814        scopes: impl IntoIterator<Item = &'a str>,
1815        key: &str,
1816        value: &PropertyValue,
1817    ) {
1818        if PropertyIndexKey::from_value(value).is_none() {
1819            return;
1820        }
1821
1822        let activated = {
1823            let indexes = self.indexes_mut();
1824            let activated = indexes.relationship_properties.activate(key);
1825            indexes
1826                .relationship_properties
1827                .insert_with_scopes(rel_id, scopes, key, value);
1828            activated
1829        };
1830        if activated {
1831            self.indexes
1832                .active_relationship_property_indexes
1833                .fetch_add(1, Ordering::Relaxed);
1834        }
1835    }
1836
1837    fn index_node_properties_eager<'a>(
1838        &mut self,
1839        node_id: NodeId,
1840        labels: impl IntoIterator<Item = &'a str> + Clone,
1841        properties: &Properties,
1842    ) {
1843        for (key, value) in properties {
1844            self.index_node_property_eager(node_id, labels.clone(), key, value);
1845        }
1846    }
1847
1848    fn index_relationship_properties_eager<'a>(
1849        &mut self,
1850        rel_id: RelationshipId,
1851        scopes: impl IntoIterator<Item = &'a str> + Clone,
1852        properties: &Properties,
1853    ) {
1854        for (key, value) in properties {
1855            self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
1856        }
1857    }
1858
1859    fn index_node_property_if_active<'a>(
1860        &mut self,
1861        node_id: NodeId,
1862        labels: impl IntoIterator<Item = &'a str>,
1863        key: &str,
1864        value: &PropertyValue,
1865    ) {
1866        if self.active_node_property_index_count() == 0 {
1867            return;
1868        }
1869        let indexes = self.indexes_mut();
1870        if indexes.node_properties.is_active(key) {
1871            indexes
1872                .node_properties
1873                .insert_with_scopes(node_id, labels, key, value);
1874        }
1875    }
1876
1877    fn index_node_properties_if_active<'a>(
1878        &mut self,
1879        node_id: NodeId,
1880        labels: impl IntoIterator<Item = &'a str> + Clone,
1881        properties: &Properties,
1882    ) {
1883        if self.active_node_property_index_count() == 0 {
1884            return;
1885        }
1886        let indexes = self.indexes_mut();
1887        for (key, value) in properties {
1888            if indexes.node_properties.is_active(key) {
1889                indexes
1890                    .node_properties
1891                    .insert_with_scopes(node_id, labels.clone(), key, value);
1892            }
1893        }
1894    }
1895
1896    fn unindex_node_property_if_active<'a>(
1897        &mut self,
1898        node_id: NodeId,
1899        labels: impl IntoIterator<Item = &'a str>,
1900        key: &str,
1901        value: &PropertyValue,
1902    ) {
1903        if self.active_node_property_index_count() == 0 {
1904            return;
1905        }
1906        let indexes = self.indexes_mut();
1907        if indexes.node_properties.is_active(key) {
1908            indexes
1909                .node_properties
1910                .remove_with_scopes(node_id, labels, key, value);
1911        }
1912    }
1913
1914    fn index_node_scope_properties_if_active(
1915        &mut self,
1916        node_id: NodeId,
1917        scope: &str,
1918        properties: &Properties,
1919    ) {
1920        if self.active_node_property_index_count() == 0 {
1921            return;
1922        }
1923        let indexes = self.indexes_mut();
1924        for (key, value) in properties {
1925            if indexes.node_properties.is_active(key) {
1926                indexes
1927                    .node_properties
1928                    .insert_scoped(node_id, scope, key, value);
1929            }
1930        }
1931    }
1932
1933    fn unindex_node_scope_properties_if_active(
1934        &mut self,
1935        node_id: NodeId,
1936        scope: &str,
1937        properties: &Properties,
1938    ) {
1939        if self.active_node_property_index_count() == 0 {
1940            return;
1941        }
1942        let indexes = self.indexes_mut();
1943        for (key, value) in properties {
1944            if indexes.node_properties.is_active(key) {
1945                indexes
1946                    .node_properties
1947                    .remove_scoped(node_id, scope, key, value);
1948            }
1949        }
1950    }
1951
1952    fn unindex_active_node_properties<'a>(
1953        &mut self,
1954        node_id: NodeId,
1955        labels: impl IntoIterator<Item = &'a str> + Clone,
1956        properties: &Properties,
1957    ) {
1958        if self.active_node_property_index_count() == 0 {
1959            return;
1960        }
1961        let indexes = self.indexes_mut();
1962        for (key, value) in properties {
1963            if indexes.node_properties.is_active(key) {
1964                indexes
1965                    .node_properties
1966                    .remove_with_scopes(node_id, labels.clone(), key, value);
1967            }
1968        }
1969    }
1970
1971    fn index_relationship_property_if_active<'a>(
1972        &mut self,
1973        rel_id: RelationshipId,
1974        scopes: impl IntoIterator<Item = &'a str>,
1975        key: &str,
1976        value: &PropertyValue,
1977    ) {
1978        if self.active_relationship_property_index_count() == 0 {
1979            return;
1980        }
1981        let indexes = self.indexes_mut();
1982        if indexes.relationship_properties.is_active(key) {
1983            indexes
1984                .relationship_properties
1985                .insert_with_scopes(rel_id, scopes, key, value);
1986        }
1987    }
1988
1989    fn index_relationship_properties_if_active<'a>(
1990        &mut self,
1991        rel_id: RelationshipId,
1992        scopes: impl IntoIterator<Item = &'a str> + Clone,
1993        properties: &Properties,
1994    ) {
1995        if self.active_relationship_property_index_count() == 0 {
1996            return;
1997        }
1998        let indexes = self.indexes_mut();
1999        for (key, value) in properties {
2000            if indexes.relationship_properties.is_active(key) {
2001                indexes.relationship_properties.insert_with_scopes(
2002                    rel_id,
2003                    scopes.clone(),
2004                    key,
2005                    value,
2006                );
2007            }
2008        }
2009    }
2010
2011    fn unindex_relationship_property_if_active<'a>(
2012        &mut self,
2013        rel_id: RelationshipId,
2014        scopes: impl IntoIterator<Item = &'a str>,
2015        key: &str,
2016        value: &PropertyValue,
2017    ) {
2018        if self.active_relationship_property_index_count() == 0 {
2019            return;
2020        }
2021        let indexes = self.indexes_mut();
2022        if indexes.relationship_properties.is_active(key) {
2023            indexes
2024                .relationship_properties
2025                .remove_with_scopes(rel_id, scopes, key, value);
2026        }
2027    }
2028
2029    fn unindex_active_relationship_properties<'a>(
2030        &mut self,
2031        rel_id: RelationshipId,
2032        scopes: impl IntoIterator<Item = &'a str> + Clone,
2033        properties: &Properties,
2034    ) {
2035        if self.active_relationship_property_index_count() == 0 {
2036            return;
2037        }
2038        let indexes = self.indexes_mut();
2039        for (key, value) in properties {
2040            if indexes.relationship_properties.is_active(key) {
2041                indexes.relationship_properties.remove_with_scopes(
2042                    rel_id,
2043                    scopes.clone(),
2044                    key,
2045                    value,
2046                );
2047            }
2048        }
2049    }
2050
2051    pub(super) fn scan_nodes_by_property(
2052        &self,
2053        label: Option<&str>,
2054        key: &str,
2055        value: &PropertyValue,
2056    ) -> Vec<NodeRecord> {
2057        match label {
2058            Some(label) => self
2059                .nodes_by_label
2060                .get(label)
2061                .into_iter()
2062                .flat_map(|ids| ids.iter())
2063                .filter_map(|&id| self.node_at(id))
2064                .filter(|node| node.properties.get(key) == Some(value))
2065                .cloned()
2066                .collect(),
2067            None => self
2068                .iter_node_records()
2069                .filter(|node| node.properties.get(key) == Some(value))
2070                .cloned()
2071                .collect(),
2072        }
2073    }
2074
2075    pub(super) fn scan_node_ids_by_property(
2076        &self,
2077        label: Option<&str>,
2078        key: &str,
2079        value: &PropertyValue,
2080    ) -> Vec<NodeId> {
2081        match label {
2082            Some(label) => self
2083                .nodes_by_label
2084                .get(label)
2085                .into_iter()
2086                .flat_map(|ids| ids.iter())
2087                .filter_map(|&id| {
2088                    (self.node_at(id)?.properties.get(key) == Some(value)).then_some(id)
2089                })
2090                .collect(),
2091            None => self
2092                .iter_nodes()
2093                .filter_map(|(id, node)| (node.properties.get(key) == Some(value)).then_some(id))
2094                .collect(),
2095        }
2096    }
2097
2098    pub(super) fn any_node_by_property(
2099        &self,
2100        label: &str,
2101        key: &str,
2102        value: &PropertyValue,
2103    ) -> bool {
2104        self.nodes_by_label
2105            .get(label)
2106            .into_iter()
2107            .flat_map(|ids| ids.iter())
2108            .filter_map(|&id| self.node_at(id))
2109            .any(|node| node.properties.get(key) == Some(value))
2110    }
2111
2112    pub(super) fn scan_relationships_by_property(
2113        &self,
2114        rel_type: Option<&str>,
2115        key: &str,
2116        value: &PropertyValue,
2117    ) -> Vec<RelationshipRecord> {
2118        match rel_type {
2119            Some(rel_type) => self
2120                .relationships_by_type
2121                .get(rel_type)
2122                .into_iter()
2123                .flat_map(|ids| ids.iter())
2124                .filter_map(|&id| self.rel_at(id))
2125                .filter(|rel| rel.properties.get(key) == Some(value))
2126                .cloned()
2127                .collect(),
2128            None => self
2129                .iter_rel_records()
2130                .filter(|rel| rel.properties.get(key) == Some(value))
2131                .cloned()
2132                .collect(),
2133        }
2134    }
2135
2136    pub(super) fn scan_relationship_ids_by_property(
2137        &self,
2138        rel_type: Option<&str>,
2139        key: &str,
2140        value: &PropertyValue,
2141    ) -> Vec<RelationshipId> {
2142        match rel_type {
2143            Some(rel_type) => self
2144                .relationships_by_type
2145                .get(rel_type)
2146                .into_iter()
2147                .flat_map(|ids| ids.iter())
2148                .filter_map(|&id| {
2149                    (self.rel_at(id)?.properties.get(key) == Some(value)).then_some(id)
2150                })
2151                .collect(),
2152            None => self
2153                .iter_rels()
2154                .filter_map(|(id, rel)| (rel.properties.get(key) == Some(value)).then_some(id))
2155                .collect(),
2156        }
2157    }
2158
2159    pub(super) fn any_relationship_by_property(
2160        &self,
2161        rel_type: &str,
2162        key: &str,
2163        value: &PropertyValue,
2164    ) -> bool {
2165        self.relationships_by_type
2166            .get(rel_type)
2167            .into_iter()
2168            .flat_map(|ids| ids.iter())
2169            .filter_map(|&id| self.rel_at(id))
2170            .any(|rel| rel.properties.get(key) == Some(value))
2171    }
2172
2173    pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
2174        self.outgoing_push(rel.src, rel.id);
2175        self.incoming_push(rel.dst, rel.id);
2176        self.insert_relationship_type_index(rel.id, &rel.rel_type);
2177    }
2178
2179    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
2180        // Adjacency is now positional `Vec<Vec<RelationshipId>>` — clearing
2181        // the inner Vec leaves the slot in place (the slot is sized for the
2182        // node's lifetime, not the edge's).
2183        self.outgoing_remove(rel.src, rel.id);
2184        self.incoming_remove(rel.dst, rel.id);
2185
2186        self.remove_relationship_type_index(rel.id, &rel.rel_type);
2187    }
2188
2189    pub(super) fn relationship_ids_for_direction(
2190        &self,
2191        node_id: NodeId,
2192        direction: Direction,
2193    ) -> Vec<RelationshipId> {
2194        match direction {
2195            Direction::Left => self
2196                .incoming_at(node_id)
2197                .map(<[_]>::to_vec)
2198                .unwrap_or_default(),
2199
2200            Direction::Right => self
2201                .outgoing_at(node_id)
2202                .map(<[_]>::to_vec)
2203                .unwrap_or_default(),
2204
2205            Direction::Undirected => {
2206                let out = self.outgoing_at(node_id);
2207                let inc = self.incoming_at(node_id);
2208                let mut ids = Vec::with_capacity(
2209                    out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0),
2210                );
2211
2212                if let Some(out) = out {
2213                    ids.extend(out.iter().copied());
2214                }
2215                if let Some(inc) = inc {
2216                    for &rel_id in inc {
2217                        let Some(rel) = self.rel_at(rel_id) else {
2218                            continue;
2219                        };
2220                        if rel.src == node_id && rel.dst == node_id {
2221                            continue;
2222                        }
2223                        ids.push(rel_id);
2224                    }
2225                }
2226
2227                ids
2228            }
2229        }
2230    }
2231
2232    pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
2233        if rel.src == node_id {
2234            Some(rel.dst)
2235        } else if rel.dst == node_id {
2236            Some(rel.src)
2237        } else {
2238            None
2239        }
2240    }
2241
2242    pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
2243        self.outgoing_at(node_id)
2244            .map(|ids| !ids.is_empty())
2245            .unwrap_or(false)
2246            || self
2247                .incoming_at(node_id)
2248                .map(|ids| !ids.is_empty())
2249                .unwrap_or(false)
2250    }
2251
2252    pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> Vec<RelationshipId> {
2253        let out = self.outgoing_at(node_id);
2254        let inc = self.incoming_at(node_id);
2255        let mut rel_ids =
2256            Vec::with_capacity(out.map(<[_]>::len).unwrap_or(0) + inc.map(<[_]>::len).unwrap_or(0));
2257
2258        if let Some(ids) = out {
2259            rel_ids.extend(ids.iter().copied());
2260        }
2261        if let Some(ids) = inc {
2262            for &rel_id in ids {
2263                let Some(rel) = self.rel_at(rel_id) else {
2264                    continue;
2265                };
2266                if rel.src == node_id && rel.dst == node_id {
2267                    continue;
2268                }
2269                rel_ids.push(rel_id);
2270            }
2271        }
2272
2273        rel_ids
2274    }
2275
2276    /// Replay a node creation using the id captured in a durable mutation
2277    /// event. This intentionally does not emit a new mutation event: callers
2278    /// must invoke it before installing a recorder on the graph.
2279    #[doc(hidden)]
2280    pub fn replay_create_node(
2281        &mut self,
2282        id: NodeId,
2283        labels: Vec<String>,
2284        properties: Properties,
2285    ) -> Result<NodeRecord, String> {
2286        if self.recorder.is_some() {
2287            return Err(
2288                "cannot replay node creation while a mutation recorder is installed".into(),
2289            );
2290        }
2291        if self.node_at(id).is_some() {
2292            return Err(format!("node id {id} already exists"));
2293        }
2294        let idx = self.ensure_node_slot_checked(id)?;
2295        self.bump_next_node_id_past(id)?;
2296
2297        let labels = Self::normalize_labels(labels);
2298        let node = NodeRecord {
2299            id,
2300            labels: labels.clone(),
2301            properties,
2302        };
2303
2304        self.put_node_at_slot(idx, node.clone());
2305        self.on_node_replayed(&node);
2306
2307        Ok(node)
2308    }
2309
2310    /// Replay a relationship creation using the id captured in a durable
2311    /// mutation event. This intentionally does not emit a new mutation event:
2312    /// callers must invoke it before installing a recorder on the graph.
2313    #[doc(hidden)]
2314    pub fn replay_create_relationship(
2315        &mut self,
2316        id: RelationshipId,
2317        src: NodeId,
2318        dst: NodeId,
2319        rel_type: &str,
2320        properties: Properties,
2321    ) -> Result<RelationshipRecord, String> {
2322        if self.recorder.is_some() {
2323            return Err(
2324                "cannot replay relationship creation while a mutation recorder is installed".into(),
2325            );
2326        }
2327        if self.rel_at(id).is_some() {
2328            return Err(format!("relationship id {id} already exists"));
2329        }
2330        if self.node_at(src).is_none() {
2331            return Err(format!(
2332                "relationship {id} references missing source node {src}"
2333            ));
2334        }
2335        if self.node_at(dst).is_none() {
2336            return Err(format!(
2337                "relationship {id} references missing target node {dst}"
2338            ));
2339        }
2340
2341        let trimmed = rel_type.trim();
2342        if trimmed.is_empty() {
2343            return Err(format!("relationship {id} has an empty type"));
2344        }
2345        let idx = self.ensure_rel_slot_checked(id)?;
2346        self.bump_next_rel_id_past(id)?;
2347
2348        let rel = RelationshipRecord {
2349            id,
2350            src,
2351            dst,
2352            rel_type: trimmed.to_string(),
2353            properties,
2354        };
2355
2356        self.put_rel_at_slot(idx, rel.clone());
2357        self.on_relationship_replayed(&rel);
2358
2359        Ok(rel)
2360    }
2361
2362    #[cfg(test)]
2363    pub(super) fn assert_property_indexes_match_scan(&self) {
2364        let indexes = self.indexes_read();
2365        assert_eq!(
2366            indexes.node_properties.active_keys.len(),
2367            self.active_node_property_index_count(),
2368            "node property index counter diverged from active key set"
2369        );
2370        assert_eq!(
2371            indexes.relationship_properties.active_keys.len(),
2372            self.active_relationship_property_index_count(),
2373            "relationship property index counter diverged from active key set"
2374        );
2375
2376        let mut expected_nodes = PropertyIndexState {
2377            active_keys: indexes.node_properties.active_keys.clone(),
2378            ..PropertyIndexState::default()
2379        };
2380        for (id, node) in self.iter_nodes() {
2381            for (key, value) in &node.properties {
2382                if expected_nodes.is_active(key) {
2383                    expected_nodes.insert_with_scopes(
2384                        id,
2385                        node.labels.iter().map(String::as_str),
2386                        key,
2387                        value,
2388                    );
2389                }
2390            }
2391        }
2392        assert_eq!(
2393            indexes.node_properties.values, expected_nodes.values,
2394            "node property index values diverged from scan"
2395        );
2396        assert_eq!(
2397            indexes.node_properties.scoped_values, expected_nodes.scoped_values,
2398            "node property scoped index values diverged from scan"
2399        );
2400
2401        let mut expected_relationships = PropertyIndexState {
2402            active_keys: indexes.relationship_properties.active_keys.clone(),
2403            ..PropertyIndexState::default()
2404        };
2405        for (id, rel) in self.iter_rels() {
2406            for (key, value) in &rel.properties {
2407                if expected_relationships.is_active(key) {
2408                    expected_relationships.insert_with_scopes(
2409                        id,
2410                        [rel.rel_type.as_str()],
2411                        key,
2412                        value,
2413                    );
2414                }
2415            }
2416        }
2417        assert_eq!(
2418            indexes.relationship_properties.values, expected_relationships.values,
2419            "relationship property index values diverged from scan"
2420        );
2421        assert_eq!(
2422            indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
2423            "relationship property scoped index values diverged from scan"
2424        );
2425    }
2426}
2427
2428/// Read the optional `cell_size` from a POINT index `OPTIONS` map.
2429/// Falls back to the registry's default when the key is missing,
2430/// not numeric, or non-positive.
2431fn point_cell_size_from_options(
2432    options: &std::collections::BTreeMap<String, IndexConfigValue>,
2433) -> Option<f64> {
2434    let raw = options.get("cellSize")?;
2435    match raw {
2436        IndexConfigValue::Number(v) if *v > 0.0 && v.is_finite() => Some(*v),
2437        IndexConfigValue::Integer(v) if *v > 0 => Some(*v as f64),
2438        _ => None,
2439    }
2440}