Skip to main content

lora_store/memory/
graph.rs

1//! The [`InMemoryGraph`] data structure: slot-indexed node/relationship
2//! storage, adjacency lists, label/type indexes, and the inherent
3//! helpers that the trait impls in `super::impls` delegate to.
4
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12    MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue, RelationshipId,
13    RelationshipRecord,
14};
15
16#[cfg(test)]
17use super::property_index::PropertyIndexState;
18use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
19
20#[derive(Default)]
21pub struct InMemoryGraph {
22    pub(super) next_node_id: NodeId,
23    pub(super) next_rel_id: RelationshipId,
24
25    /// Slot-indexed node storage: `nodes[id as usize]` is the record at `id`.
26    /// `None` slots are tombstones from deletes (we don't compact). Because
27    /// `next_node_id` is monotonic the slot at `id` is initialized exactly
28    /// when `id < next_node_id` — same identity guarantee the previous
29    /// `BTreeMap<NodeId, NodeRecord>` had, just with O(1) lookup and
30    /// cache-coherent layout.
31    ///
32    /// Records are wrapped in `Arc` so [`Self::clone`] (called on every
33    /// auto-commit write to build a working copy) is `O(N)` atomic
34    /// refcount bumps instead of `O(N)` deep record clones — for a
35    /// 100k-node graph the difference is microseconds vs. tens of
36    /// milliseconds. Mutating a record uses `Arc::make_mut`, which
37    /// clones in place when the refcount is 1 (no concurrent reader)
38    /// and falls back to a single-record clone-on-write when readers
39    /// still hold a snapshot.
40    pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
41    pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
42    /// Live (non-tombstoned) counts kept in sync with `put_*`/`take_*` so
43    /// `node_count` / `relationship_count` stay O(1) — without a counter
44    /// they'd have to scan the slab.
45    pub(super) live_node_count: usize,
46    pub(super) live_rel_count: usize,
47
48    /// Adjacency keyed by NodeId. `outgoing[id]` is the list of relationship
49    /// ids that leave `id`; mirrored on `incoming[id]`. Inner `Vec` instead
50    /// of `BTreeSet` because edges are inserted exactly once and traversal
51    /// only needs sequential iteration; the cache-friendly contiguous layout
52    /// shows up on every traversal hop.
53    pub(super) outgoing: Vec<Vec<RelationshipId>>,
54    pub(super) incoming: Vec<Vec<RelationshipId>>,
55
56    // secondary indexes
57    /// Label -> the (unique, monotonic) node ids that carry it. The inner
58    /// `Vec` instead of `BTreeSet` because every node id is inserted at most
59    /// once per label (no dedup needed) and every consumer iterates the
60    /// whole list anyway — contiguous storage iterates faster than a
61    /// tree-of-pointers, and removes via `swap_remove` stay O(degree-of-label).
62    pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
63    pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
64    pub(super) indexes: RwLock<PropertyIndexRegistry>,
65    pub(super) active_node_property_indexes: AtomicUsize,
66    pub(super) active_relationship_property_indexes: AtomicUsize,
67
68    /// Optional mutation observer. When `Some`, every committed mutation
69    /// fans out to this recorder *after* the in-memory state has been
70    /// updated. The recorder is not part of the graph's identity, so Clone
71    /// and snapshot restore both reset it to `None`.
72    pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
73}
74
75impl std::fmt::Debug for InMemoryGraph {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("InMemoryGraph")
78            .field("next_node_id", &self.next_node_id)
79            .field("next_rel_id", &self.next_rel_id)
80            .field("nodes", &self.nodes)
81            .field("relationships", &self.relationships)
82            .field("outgoing", &self.outgoing)
83            .field("incoming", &self.incoming)
84            .field("nodes_by_label", &self.nodes_by_label)
85            .field("relationships_by_type", &self.relationships_by_type)
86            .field("indexes", &self.indexes)
87            .field(
88                "active_node_property_indexes",
89                &self.active_node_property_index_count(),
90            )
91            .field(
92                "active_relationship_property_indexes",
93                &self.active_relationship_property_index_count(),
94            )
95            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
96            .finish()
97    }
98}
99
100impl Clone for InMemoryGraph {
101    fn clone(&self) -> Self {
102        // Deliberately drop the recorder on clone: a cloned store is a
103        // separate identity; it should not silently share the observer.
104        Self {
105            next_node_id: self.next_node_id,
106            next_rel_id: self.next_rel_id,
107            nodes: self.nodes.clone(),
108            relationships: self.relationships.clone(),
109            live_node_count: self.live_node_count,
110            live_rel_count: self.live_rel_count,
111            outgoing: self.outgoing.clone(),
112            incoming: self.incoming.clone(),
113            nodes_by_label: self.nodes_by_label.clone(),
114            relationships_by_type: self.relationships_by_type.clone(),
115            indexes: RwLock::new(if self.has_active_property_indexes() {
116                self.indexes_read().clone()
117            } else {
118                PropertyIndexRegistry::default()
119            }),
120            active_node_property_indexes: AtomicUsize::new(self.active_node_property_index_count()),
121            active_relationship_property_indexes: AtomicUsize::new(
122                self.active_relationship_property_index_count(),
123            ),
124            recorder: None,
125        }
126    }
127}
128
129impl InMemoryGraph {
130    pub fn new() -> Self {
131        Self::default()
132    }
133
134    pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
135        // BTreeMap/BTreeSet do not support capacity reservation.
136        Self::default()
137    }
138
139    pub fn contains_node(&self, node_id: NodeId) -> bool {
140        self.node_at(node_id).is_some()
141    }
142
143    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
144        self.rel_at(rel_id).is_some()
145    }
146
147    /// Install (or clear) the mutation recorder. Passing `None` detaches any
148    /// currently-installed recorder. The recorder observes every committed
149    /// mutation *after* it has been applied.
150    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
151        self.recorder = recorder;
152    }
153
154    /// Handle to the currently-installed recorder, if any.
155    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
156        self.recorder.as_ref()
157    }
158
159    /// Emit a mutation event only if a recorder is installed. The event is
160    /// built lazily — callers pass a closure, so when no recorder is
161    /// attached we pay only a `None` check and the cost of constructing the
162    /// event (labels/properties clones) is avoided.
163    #[inline]
164    pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
165        if let Some(rec) = &self.recorder {
166            rec.record(build());
167        }
168    }
169
170    pub(super) fn alloc_node_id(&mut self) -> NodeId {
171        let id = self.next_node_id;
172        self.next_node_id += 1;
173        id
174    }
175
176    pub(super) fn alloc_rel_id(&mut self) -> RelationshipId {
177        let id = self.next_rel_id;
178        self.next_rel_id += 1;
179        id
180    }
181
182    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
183        let next = id
184            .checked_add(1)
185            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
186        self.next_node_id = self.next_node_id.max(next);
187        Ok(())
188    }
189
190    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
191        let next = id
192            .checked_add(1)
193            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
194        self.next_rel_id = self.next_rel_id.max(next);
195        Ok(())
196    }
197
198    // ---------- Slab access helpers ----------
199    //
200    // Stand-in for the BTreeMap API the previous storage used. They keep the
201    // call sites readable while the underlying layout is positional Vec.
202
203    #[inline]
204    pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
205        self.nodes
206            .get(id as usize)
207            .and_then(|s| s.as_ref())
208            .map(|arc| arc.as_ref())
209    }
210
211    /// Mutable handle to a node record, doing copy-on-write only when the
212    /// `Arc` is shared with a concurrent reader. With no readers (the
213    /// common case after a fresh write_store clone), `Arc::make_mut`
214    /// upgrades in place — no record clone.
215    #[inline]
216    pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
217        self.nodes
218            .get_mut(id as usize)
219            .and_then(|s| s.as_mut())
220            .map(Arc::make_mut)
221    }
222
223    #[inline]
224    pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
225        self.relationships
226            .get(id as usize)
227            .and_then(|s| s.as_ref())
228            .map(|arc| arc.as_ref())
229    }
230
231    #[inline]
232    pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
233        self.relationships
234            .get_mut(id as usize)
235            .and_then(|s| s.as_mut())
236            .map(Arc::make_mut)
237    }
238
239    /// Resize the node-keyed Vecs so `id as usize` is in range. Adjacency
240    /// lists are kept in lockstep with `nodes`, so a freshly-grown slot has
241    /// empty outgoing/incoming Vecs ready to receive edges.
242    fn ensure_node_slot(&mut self, id: NodeId) {
243        let target = id as usize + 1;
244        if self.nodes.len() < target {
245            self.nodes.resize_with(target, || None);
246            self.outgoing.resize_with(target, Vec::new);
247            self.incoming.resize_with(target, Vec::new);
248        }
249    }
250
251    fn ensure_rel_slot(&mut self, id: RelationshipId) {
252        let target = id as usize + 1;
253        if self.relationships.len() < target {
254            self.relationships.resize_with(target, || None);
255        }
256    }
257
258    pub(super) fn put_node(&mut self, id: NodeId, node: NodeRecord) {
259        self.ensure_node_slot(id);
260        let was_present = self.nodes[id as usize].is_some();
261        self.nodes[id as usize] = Some(Arc::new(node));
262        if !was_present {
263            self.live_node_count += 1;
264        }
265    }
266
267    pub(super) fn put_rel(&mut self, id: RelationshipId, rel: RelationshipRecord) {
268        self.ensure_rel_slot(id);
269        let was_present = self.relationships[id as usize].is_some();
270        self.relationships[id as usize] = Some(Arc::new(rel));
271        if !was_present {
272            self.live_rel_count += 1;
273        }
274    }
275
276    pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
277        let idx = id as usize;
278        let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
279        if removed.is_some() {
280            self.live_node_count -= 1;
281            // Also clear the per-id adjacency entries so the memory is reclaimed
282            // on the typical "delete every node" pattern. We deliberately do not
283            // shrink the outer Vec — leaving the slot lets new ids reuse the
284            // same index without growth churn (and `next_node_id` is monotonic
285            // anyway, so no immediate reuse).
286            if let Some(out) = self.outgoing.get_mut(idx) {
287                out.clear();
288            }
289            if let Some(inc) = self.incoming.get_mut(idx) {
290                inc.clear();
291            }
292        }
293        // Unwrap the Arc — `try_unwrap` returns the inner `NodeRecord`
294        // without cloning when our slab held the only reference, falling
295        // back to a clone only when concurrent readers still hold a
296        // snapshot Arc.
297        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
298    }
299
300    pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
301        let idx = id as usize;
302        let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
303        if removed.is_some() {
304            self.live_rel_count -= 1;
305        }
306        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
307    }
308
309    #[inline]
310    pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
311        self.outgoing.get(id as usize)
312    }
313
314    #[inline]
315    pub(super) fn incoming_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
316        self.incoming.get(id as usize)
317    }
318
319    pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
320        self.nodes
321            .iter()
322            .enumerate()
323            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
324    }
325
326    pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
327        self.nodes
328            .iter()
329            .filter_map(|s| s.as_ref())
330            .map(|arc| arc.as_ref())
331    }
332
333    pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
334        self.relationships
335            .iter()
336            .enumerate()
337            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
338    }
339
340    pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
341        self.relationships
342            .iter()
343            .filter_map(|s| s.as_ref())
344            .map(|arc| arc.as_ref())
345    }
346
347    pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
348        self.nodes
349            .iter()
350            .enumerate()
351            .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
352    }
353
354    pub(super) fn iter_rels(
355        &self,
356    ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
357        self.relationships
358            .iter()
359            .enumerate()
360            .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
361    }
362
363    /// Add `rel_id` to `node_id`'s outgoing list. Relies on the monotonic-id
364    /// invariant: relationship ids are allocated once and never re-used, so
365    /// the bucket can never see a duplicate.
366    fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
367        self.ensure_node_slot(node_id);
368        self.outgoing[node_id as usize].push(rel_id);
369    }
370
371    fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
372        self.ensure_node_slot(node_id);
373        self.incoming[node_id as usize].push(rel_id);
374    }
375
376    /// Remove `rel_id` from `node_id`'s outgoing list. `swap_remove` keeps
377    /// the operation O(1) — adjacency order doesn't carry semantic meaning.
378    fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
379        if let Some(v) = self.outgoing.get_mut(node_id as usize) {
380            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
381                v.swap_remove(pos);
382            }
383        }
384    }
385
386    fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
387        if let Some(v) = self.incoming.get_mut(node_id as usize) {
388            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
389                v.swap_remove(pos);
390            }
391        }
392    }
393
394    pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
395        let mut seen = BTreeSet::new();
396
397        labels
398            .into_iter()
399            .map(|s| s.trim().to_string())
400            .filter(|s| !s.is_empty())
401            .filter(|s| seen.insert(s.clone()))
402            .collect()
403    }
404
405    pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
406        // Hot path: skip the `String` alloc when the label bucket already
407        // exists. The monotonic-id invariant on the create path guarantees
408        // `node_id` is unique, so we push unconditionally; the previous
409        // `contains` guard turned bulk CREATE into O(n²).
410        if let Some(bucket) = self.nodes_by_label.get_mut(label) {
411            bucket.push(node_id);
412        } else {
413            self.nodes_by_label.insert(label.to_string(), vec![node_id]);
414        }
415    }
416
417    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
418        if let Some(ids) = self.nodes_by_label.get_mut(label) {
419            if let Some(pos) = ids.iter().position(|&id| id == node_id) {
420                ids.swap_remove(pos);
421            }
422            if ids.is_empty() {
423                self.nodes_by_label.remove(label);
424            }
425        }
426    }
427
428    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
429        // See `insert_node_label_index` for the same hot-path rationale.
430        if let Some(bucket) = self.relationships_by_type.get_mut(rel_type) {
431            bucket.push(rel_id);
432        } else {
433            self.relationships_by_type
434                .insert(rel_type.to_string(), vec![rel_id]);
435        }
436    }
437
438    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
439        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
440            if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
441                ids.swap_remove(pos);
442            }
443            if ids.is_empty() {
444                self.relationships_by_type.remove(rel_type);
445            }
446        }
447    }
448
449    pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
450        self.indexes
451            .read()
452            .unwrap_or_else(|poisoned| poisoned.into_inner())
453    }
454
455    pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
456        self.indexes
457            .write()
458            .unwrap_or_else(|poisoned| poisoned.into_inner())
459    }
460
461    pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
462        self.indexes
463            .get_mut()
464            .unwrap_or_else(|poisoned| poisoned.into_inner())
465    }
466
467    #[inline]
468    pub(super) fn active_node_property_index_count(&self) -> usize {
469        self.active_node_property_indexes.load(Ordering::Relaxed)
470    }
471
472    #[inline]
473    pub(super) fn active_relationship_property_index_count(&self) -> usize {
474        self.active_relationship_property_indexes
475            .load(Ordering::Relaxed)
476    }
477
478    #[inline]
479    pub(super) fn has_active_property_indexes(&self) -> bool {
480        self.active_node_property_index_count() != 0
481            || self.active_relationship_property_index_count() != 0
482    }
483
484    pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
485        self.active_node_property_index_count() != 0
486            && self.indexes_mut().node_properties.is_active(key)
487    }
488
489    pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
490        self.active_relationship_property_index_count() != 0
491            && self.indexes_mut().relationship_properties.is_active(key)
492    }
493
494    pub(super) fn ensure_node_property_index(&self, key: &str) {
495        {
496            let indexes = self.indexes_read();
497            if indexes.node_properties.is_active(key) {
498                return;
499            }
500        }
501
502        let mut indexes = self.indexes_write();
503        if indexes.node_properties.is_active(key) {
504            return;
505        }
506
507        for (id, node) in self.iter_nodes() {
508            if let Some(value) = node.properties.get(key) {
509                indexes.node_properties.insert_with_scopes(
510                    id,
511                    node.labels.iter().map(String::as_str),
512                    key,
513                    value,
514                );
515            }
516        }
517        if indexes.node_properties.activate(key) {
518            self.active_node_property_indexes
519                .fetch_add(1, Ordering::Relaxed);
520        }
521    }
522
523    pub(super) fn ensure_relationship_property_index(&self, key: &str) {
524        {
525            let indexes = self.indexes_read();
526            if indexes.relationship_properties.is_active(key) {
527                return;
528            }
529        }
530
531        let mut indexes = self.indexes_write();
532        if indexes.relationship_properties.is_active(key) {
533            return;
534        }
535
536        for (id, rel) in self.iter_rels() {
537            if let Some(value) = rel.properties.get(key) {
538                indexes.relationship_properties.insert_with_scopes(
539                    id,
540                    [rel.rel_type.as_str()],
541                    key,
542                    value,
543                );
544            }
545        }
546        if indexes.relationship_properties.activate(key) {
547            self.active_relationship_property_indexes
548                .fetch_add(1, Ordering::Relaxed);
549        }
550    }
551
552    pub(super) fn rebuild_property_indexes(&mut self) {
553        let mut indexes = PropertyIndexRegistry::default();
554
555        for (id, node) in self.iter_nodes() {
556            for (key, value) in &node.properties {
557                if PropertyIndexKey::from_value(value).is_some() {
558                    indexes.node_properties.activate(key);
559                    indexes.node_properties.insert_with_scopes(
560                        id,
561                        node.labels.iter().map(String::as_str),
562                        key,
563                        value,
564                    );
565                }
566            }
567        }
568
569        for (id, rel) in self.iter_rels() {
570            for (key, value) in &rel.properties {
571                if PropertyIndexKey::from_value(value).is_some() {
572                    indexes.relationship_properties.activate(key);
573                    indexes.relationship_properties.insert_with_scopes(
574                        id,
575                        [rel.rel_type.as_str()],
576                        key,
577                        value,
578                    );
579                }
580            }
581        }
582
583        let node_index_count = indexes.node_properties.active_keys.len();
584        let relationship_index_count = indexes.relationship_properties.active_keys.len();
585        *self.indexes_mut() = indexes;
586        self.active_node_property_indexes
587            .store(node_index_count, Ordering::Relaxed);
588        self.active_relationship_property_indexes
589            .store(relationship_index_count, Ordering::Relaxed);
590    }
591
592    pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
593        for label in &node.labels {
594            self.insert_node_label_index(node.id, label);
595        }
596        self.index_node_properties_if_active(
597            node.id,
598            node.labels.iter().map(String::as_str),
599            &node.properties,
600        );
601    }
602
603    pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
604        for label in &node.labels {
605            self.insert_node_label_index(node.id, label);
606        }
607        self.index_node_properties_eager(
608            node.id,
609            node.labels.iter().map(String::as_str),
610            &node.properties,
611        );
612    }
613
614    pub(super) fn on_node_property_set(
615        &mut self,
616        node_id: NodeId,
617        key: &str,
618        old: Option<&PropertyValue>,
619        new: &PropertyValue,
620    ) {
621        if !self.node_property_index_is_active(key) {
622            return;
623        }
624
625        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
626            return;
627        };
628
629        if let Some(old) = old {
630            self.unindex_node_property_if_active(
631                node_id,
632                labels.iter().map(String::as_str),
633                key,
634                old,
635            );
636        }
637        self.index_node_property_if_active(node_id, labels.iter().map(String::as_str), key, new);
638    }
639
640    pub(super) fn on_node_property_removed(
641        &mut self,
642        node_id: NodeId,
643        key: &str,
644        old: &PropertyValue,
645    ) {
646        if !self.node_property_index_is_active(key) {
647            return;
648        }
649
650        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
651            return;
652        };
653        self.unindex_node_property_if_active(node_id, labels.iter().map(String::as_str), key, old);
654    }
655
656    pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
657        self.insert_node_label_index(node_id, label);
658
659        if self.active_node_property_index_count() == 0 {
660            return;
661        }
662
663        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
664            return;
665        };
666        self.index_node_scope_properties_if_active(node_id, label, &properties);
667    }
668
669    pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
670        self.remove_node_label_index(node_id, label);
671
672        if self.active_node_property_index_count() == 0 {
673            return;
674        }
675
676        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
677            return;
678        };
679        self.unindex_node_scope_properties_if_active(node_id, label, &properties);
680    }
681
682    pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
683        for label in &node.labels {
684            self.remove_node_label_index(node.id, label);
685        }
686        self.unindex_active_node_properties(
687            node.id,
688            node.labels.iter().map(String::as_str),
689            &node.properties,
690        );
691    }
692
693    pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
694        self.attach_relationship(rel);
695        self.index_relationship_properties_if_active(
696            rel.id,
697            [rel.rel_type.as_str()],
698            &rel.properties,
699        );
700    }
701
702    pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
703        self.attach_relationship(rel);
704        self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
705    }
706
707    pub(super) fn on_relationship_property_set(
708        &mut self,
709        rel_id: RelationshipId,
710        key: &str,
711        old: Option<&PropertyValue>,
712        new: &PropertyValue,
713    ) {
714        if !self.relationship_property_index_is_active(key) {
715            return;
716        }
717
718        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
719            return;
720        };
721
722        if let Some(old) = old {
723            self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
724        }
725        self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
726    }
727
728    pub(super) fn on_relationship_property_removed(
729        &mut self,
730        rel_id: RelationshipId,
731        key: &str,
732        old: &PropertyValue,
733    ) {
734        if !self.relationship_property_index_is_active(key) {
735            return;
736        }
737
738        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
739            return;
740        };
741        self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
742    }
743
744    pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
745        self.detach_relationship_indexes(rel);
746        self.unindex_active_relationship_properties(
747            rel.id,
748            [rel.rel_type.as_str()],
749            &rel.properties,
750        );
751    }
752
753    fn index_node_property_eager<'a>(
754        &mut self,
755        node_id: NodeId,
756        labels: impl IntoIterator<Item = &'a str>,
757        key: &str,
758        value: &PropertyValue,
759    ) {
760        if PropertyIndexKey::from_value(value).is_none() {
761            return;
762        }
763
764        let activated = {
765            let indexes = self.indexes_mut();
766            let activated = indexes.node_properties.activate(key);
767            indexes
768                .node_properties
769                .insert_with_scopes(node_id, labels, key, value);
770            activated
771        };
772        if activated {
773            self.active_node_property_indexes
774                .fetch_add(1, Ordering::Relaxed);
775        }
776    }
777
778    fn index_relationship_property_eager<'a>(
779        &mut self,
780        rel_id: RelationshipId,
781        scopes: impl IntoIterator<Item = &'a str>,
782        key: &str,
783        value: &PropertyValue,
784    ) {
785        if PropertyIndexKey::from_value(value).is_none() {
786            return;
787        }
788
789        let activated = {
790            let indexes = self.indexes_mut();
791            let activated = indexes.relationship_properties.activate(key);
792            indexes
793                .relationship_properties
794                .insert_with_scopes(rel_id, scopes, key, value);
795            activated
796        };
797        if activated {
798            self.active_relationship_property_indexes
799                .fetch_add(1, Ordering::Relaxed);
800        }
801    }
802
803    fn index_node_properties_eager<'a>(
804        &mut self,
805        node_id: NodeId,
806        labels: impl IntoIterator<Item = &'a str> + Clone,
807        properties: &Properties,
808    ) {
809        for (key, value) in properties {
810            self.index_node_property_eager(node_id, labels.clone(), key, value);
811        }
812    }
813
814    fn index_relationship_properties_eager<'a>(
815        &mut self,
816        rel_id: RelationshipId,
817        scopes: impl IntoIterator<Item = &'a str> + Clone,
818        properties: &Properties,
819    ) {
820        for (key, value) in properties {
821            self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
822        }
823    }
824
825    fn index_node_property_if_active<'a>(
826        &mut self,
827        node_id: NodeId,
828        labels: impl IntoIterator<Item = &'a str>,
829        key: &str,
830        value: &PropertyValue,
831    ) {
832        if self.active_node_property_index_count() == 0 {
833            return;
834        }
835        let indexes = self.indexes_mut();
836        if indexes.node_properties.is_active(key) {
837            indexes
838                .node_properties
839                .insert_with_scopes(node_id, labels, key, value);
840        }
841    }
842
843    fn index_node_properties_if_active<'a>(
844        &mut self,
845        node_id: NodeId,
846        labels: impl IntoIterator<Item = &'a str> + Clone,
847        properties: &Properties,
848    ) {
849        if self.active_node_property_index_count() == 0 {
850            return;
851        }
852        let indexes = self.indexes_mut();
853        for (key, value) in properties {
854            if indexes.node_properties.is_active(key) {
855                indexes
856                    .node_properties
857                    .insert_with_scopes(node_id, labels.clone(), key, value);
858            }
859        }
860    }
861
862    fn unindex_node_property_if_active<'a>(
863        &mut self,
864        node_id: NodeId,
865        labels: impl IntoIterator<Item = &'a str>,
866        key: &str,
867        value: &PropertyValue,
868    ) {
869        if self.active_node_property_index_count() == 0 {
870            return;
871        }
872        let indexes = self.indexes_mut();
873        if indexes.node_properties.is_active(key) {
874            indexes
875                .node_properties
876                .remove_with_scopes(node_id, labels, key, value);
877        }
878    }
879
880    fn index_node_scope_properties_if_active(
881        &mut self,
882        node_id: NodeId,
883        scope: &str,
884        properties: &Properties,
885    ) {
886        if self.active_node_property_index_count() == 0 {
887            return;
888        }
889        let indexes = self.indexes_mut();
890        for (key, value) in properties {
891            if indexes.node_properties.is_active(key) {
892                indexes
893                    .node_properties
894                    .insert_scoped(node_id, scope, key, value);
895            }
896        }
897    }
898
899    fn unindex_node_scope_properties_if_active(
900        &mut self,
901        node_id: NodeId,
902        scope: &str,
903        properties: &Properties,
904    ) {
905        if self.active_node_property_index_count() == 0 {
906            return;
907        }
908        let indexes = self.indexes_mut();
909        for (key, value) in properties {
910            if indexes.node_properties.is_active(key) {
911                indexes
912                    .node_properties
913                    .remove_scoped(node_id, scope, key, value);
914            }
915        }
916    }
917
918    fn unindex_active_node_properties<'a>(
919        &mut self,
920        node_id: NodeId,
921        labels: impl IntoIterator<Item = &'a str> + Clone,
922        properties: &Properties,
923    ) {
924        if self.active_node_property_index_count() == 0 {
925            return;
926        }
927        let indexes = self.indexes_mut();
928        for (key, value) in properties {
929            if indexes.node_properties.is_active(key) {
930                indexes
931                    .node_properties
932                    .remove_with_scopes(node_id, labels.clone(), key, value);
933            }
934        }
935    }
936
937    fn index_relationship_property_if_active<'a>(
938        &mut self,
939        rel_id: RelationshipId,
940        scopes: impl IntoIterator<Item = &'a str>,
941        key: &str,
942        value: &PropertyValue,
943    ) {
944        if self.active_relationship_property_index_count() == 0 {
945            return;
946        }
947        let indexes = self.indexes_mut();
948        if indexes.relationship_properties.is_active(key) {
949            indexes
950                .relationship_properties
951                .insert_with_scopes(rel_id, scopes, key, value);
952        }
953    }
954
955    fn index_relationship_properties_if_active<'a>(
956        &mut self,
957        rel_id: RelationshipId,
958        scopes: impl IntoIterator<Item = &'a str> + Clone,
959        properties: &Properties,
960    ) {
961        if self.active_relationship_property_index_count() == 0 {
962            return;
963        }
964        let indexes = self.indexes_mut();
965        for (key, value) in properties {
966            if indexes.relationship_properties.is_active(key) {
967                indexes.relationship_properties.insert_with_scopes(
968                    rel_id,
969                    scopes.clone(),
970                    key,
971                    value,
972                );
973            }
974        }
975    }
976
977    fn unindex_relationship_property_if_active<'a>(
978        &mut self,
979        rel_id: RelationshipId,
980        scopes: impl IntoIterator<Item = &'a str>,
981        key: &str,
982        value: &PropertyValue,
983    ) {
984        if self.active_relationship_property_index_count() == 0 {
985            return;
986        }
987        let indexes = self.indexes_mut();
988        if indexes.relationship_properties.is_active(key) {
989            indexes
990                .relationship_properties
991                .remove_with_scopes(rel_id, scopes, key, value);
992        }
993    }
994
995    fn unindex_active_relationship_properties<'a>(
996        &mut self,
997        rel_id: RelationshipId,
998        scopes: impl IntoIterator<Item = &'a str> + Clone,
999        properties: &Properties,
1000    ) {
1001        if self.active_relationship_property_index_count() == 0 {
1002            return;
1003        }
1004        let indexes = self.indexes_mut();
1005        for (key, value) in properties {
1006            if indexes.relationship_properties.is_active(key) {
1007                indexes.relationship_properties.remove_with_scopes(
1008                    rel_id,
1009                    scopes.clone(),
1010                    key,
1011                    value,
1012                );
1013            }
1014        }
1015    }
1016
1017    pub(super) fn scan_nodes_by_property(
1018        &self,
1019        label: Option<&str>,
1020        key: &str,
1021        value: &PropertyValue,
1022    ) -> Vec<NodeRecord> {
1023        let candidates: Box<dyn Iterator<Item = &NodeRecord> + '_> = match label {
1024            Some(label) => Box::new(
1025                self.nodes_by_label
1026                    .get(label)
1027                    .into_iter()
1028                    .flat_map(|ids| ids.iter())
1029                    .filter_map(|&id| self.node_at(id)),
1030            ),
1031            None => Box::new(self.iter_node_records()),
1032        };
1033
1034        candidates
1035            .filter(|node| node.properties.get(key) == Some(value))
1036            .cloned()
1037            .collect()
1038    }
1039
1040    pub(super) fn scan_relationships_by_property(
1041        &self,
1042        rel_type: Option<&str>,
1043        key: &str,
1044        value: &PropertyValue,
1045    ) -> Vec<RelationshipRecord> {
1046        let candidates: Box<dyn Iterator<Item = &RelationshipRecord> + '_> = match rel_type {
1047            Some(rel_type) => Box::new(
1048                self.relationships_by_type
1049                    .get(rel_type)
1050                    .into_iter()
1051                    .flat_map(|ids| ids.iter())
1052                    .filter_map(|&id| self.rel_at(id)),
1053            ),
1054            None => Box::new(self.iter_rel_records()),
1055        };
1056
1057        candidates
1058            .filter(|rel| rel.properties.get(key) == Some(value))
1059            .cloned()
1060            .collect()
1061    }
1062
1063    pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
1064        self.outgoing_push(rel.src, rel.id);
1065        self.incoming_push(rel.dst, rel.id);
1066        self.insert_relationship_type_index(rel.id, &rel.rel_type);
1067    }
1068
1069    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
1070        // Adjacency is now positional `Vec<Vec<RelationshipId>>` — clearing
1071        // the inner Vec leaves the slot in place (the slot is sized for the
1072        // node's lifetime, not the edge's).
1073        self.outgoing_remove(rel.src, rel.id);
1074        self.incoming_remove(rel.dst, rel.id);
1075
1076        self.remove_relationship_type_index(rel.id, &rel.rel_type);
1077    }
1078
1079    pub(super) fn relationship_ids_for_direction(
1080        &self,
1081        node_id: NodeId,
1082        direction: Direction,
1083    ) -> Vec<RelationshipId> {
1084        match direction {
1085            Direction::Left => self.incoming_at(node_id).cloned().unwrap_or_default(),
1086
1087            Direction::Right => self.outgoing_at(node_id).cloned().unwrap_or_default(),
1088
1089            Direction::Undirected => {
1090                let mut ids = BTreeSet::new();
1091
1092                if let Some(out) = self.outgoing_at(node_id) {
1093                    ids.extend(out.iter().copied());
1094                }
1095                if let Some(inc) = self.incoming_at(node_id) {
1096                    ids.extend(inc.iter().copied());
1097                }
1098
1099                ids.into_iter().collect()
1100            }
1101        }
1102    }
1103
1104    pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
1105        if rel.src == node_id {
1106            Some(rel.dst)
1107        } else if rel.dst == node_id {
1108            Some(rel.src)
1109        } else {
1110            None
1111        }
1112    }
1113
1114    pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
1115        self.outgoing_at(node_id)
1116            .map(|ids| !ids.is_empty())
1117            .unwrap_or(false)
1118            || self
1119                .incoming_at(node_id)
1120                .map(|ids| !ids.is_empty())
1121                .unwrap_or(false)
1122    }
1123
1124    pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
1125        let mut rel_ids = BTreeSet::new();
1126
1127        if let Some(ids) = self.outgoing_at(node_id) {
1128            rel_ids.extend(ids.iter().copied());
1129        }
1130        if let Some(ids) = self.incoming_at(node_id) {
1131            rel_ids.extend(ids.iter().copied());
1132        }
1133
1134        rel_ids
1135    }
1136
1137    /// Replay a node creation using the id captured in a durable mutation
1138    /// event. This intentionally does not emit a new mutation event: callers
1139    /// must invoke it before installing a recorder on the graph.
1140    #[doc(hidden)]
1141    pub fn replay_create_node(
1142        &mut self,
1143        id: NodeId,
1144        labels: Vec<String>,
1145        properties: Properties,
1146    ) -> Result<NodeRecord, String> {
1147        if self.recorder.is_some() {
1148            return Err(
1149                "cannot replay node creation while a mutation recorder is installed".into(),
1150            );
1151        }
1152        if self.node_at(id).is_some() {
1153            return Err(format!("node id {id} already exists"));
1154        }
1155
1156        let labels = Self::normalize_labels(labels);
1157        let node = NodeRecord {
1158            id,
1159            labels: labels.clone(),
1160            properties,
1161        };
1162
1163        self.on_node_replayed(&node);
1164        self.put_node(id, node.clone());
1165        // ensure_node_slot grew both adjacency Vecs to cover this id when
1166        // we put_node above.
1167        self.bump_next_node_id_past(id)?;
1168
1169        Ok(node)
1170    }
1171
1172    /// Replay a relationship creation using the id captured in a durable
1173    /// mutation event. This intentionally does not emit a new mutation event:
1174    /// callers must invoke it before installing a recorder on the graph.
1175    #[doc(hidden)]
1176    pub fn replay_create_relationship(
1177        &mut self,
1178        id: RelationshipId,
1179        src: NodeId,
1180        dst: NodeId,
1181        rel_type: &str,
1182        properties: Properties,
1183    ) -> Result<RelationshipRecord, String> {
1184        if self.recorder.is_some() {
1185            return Err(
1186                "cannot replay relationship creation while a mutation recorder is installed".into(),
1187            );
1188        }
1189        if self.rel_at(id).is_some() {
1190            return Err(format!("relationship id {id} already exists"));
1191        }
1192        if self.node_at(src).is_none() {
1193            return Err(format!(
1194                "relationship {id} references missing source node {src}"
1195            ));
1196        }
1197        if self.node_at(dst).is_none() {
1198            return Err(format!(
1199                "relationship {id} references missing target node {dst}"
1200            ));
1201        }
1202
1203        let trimmed = rel_type.trim();
1204        if trimmed.is_empty() {
1205            return Err(format!("relationship {id} has an empty type"));
1206        }
1207
1208        let rel = RelationshipRecord {
1209            id,
1210            src,
1211            dst,
1212            rel_type: trimmed.to_string(),
1213            properties,
1214        };
1215
1216        self.on_relationship_replayed(&rel);
1217        self.put_rel(id, rel.clone());
1218        self.bump_next_rel_id_past(id)?;
1219
1220        Ok(rel)
1221    }
1222
1223    #[cfg(test)]
1224    pub(super) fn assert_property_indexes_match_scan(&self) {
1225        let indexes = self.indexes_read();
1226        assert_eq!(
1227            indexes.node_properties.active_keys.len(),
1228            self.active_node_property_index_count(),
1229            "node property index counter diverged from active key set"
1230        );
1231        assert_eq!(
1232            indexes.relationship_properties.active_keys.len(),
1233            self.active_relationship_property_index_count(),
1234            "relationship property index counter diverged from active key set"
1235        );
1236
1237        let mut expected_nodes = PropertyIndexState {
1238            active_keys: indexes.node_properties.active_keys.clone(),
1239            ..PropertyIndexState::default()
1240        };
1241        for (id, node) in self.iter_nodes() {
1242            for (key, value) in &node.properties {
1243                if expected_nodes.is_active(key) {
1244                    expected_nodes.insert_with_scopes(
1245                        id,
1246                        node.labels.iter().map(String::as_str),
1247                        key,
1248                        value,
1249                    );
1250                }
1251            }
1252        }
1253        assert_eq!(
1254            indexes.node_properties.values, expected_nodes.values,
1255            "node property index values diverged from scan"
1256        );
1257        assert_eq!(
1258            indexes.node_properties.scoped_values, expected_nodes.scoped_values,
1259            "node property scoped index values diverged from scan"
1260        );
1261
1262        let mut expected_relationships = PropertyIndexState {
1263            active_keys: indexes.relationship_properties.active_keys.clone(),
1264            ..PropertyIndexState::default()
1265        };
1266        for (id, rel) in self.iter_rels() {
1267            for (key, value) in &rel.properties {
1268                if expected_relationships.is_active(key) {
1269                    expected_relationships.insert_with_scopes(
1270                        id,
1271                        [rel.rel_type.as_str()],
1272                        key,
1273                        value,
1274                    );
1275                }
1276            }
1277        }
1278        assert_eq!(
1279            indexes.relationship_properties.values, expected_relationships.values,
1280            "relationship property index values diverged from scan"
1281        );
1282        assert_eq!(
1283            indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
1284            "relationship property scoped index values diverged from scan"
1285        );
1286    }
1287}