Skip to main content

lora_store/memory/
graph.rs

1//! The [`InMemoryGraph`] data structure: slot-indexed node/relationship
2//! storage, adjacency lists, label/type indexes, and the inherent
3//! helpers that the trait impls in `super::impls` delegate to.
4
5use std::collections::{BTreeMap, BTreeSet};
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, RwLock, RwLockWriteGuard};
8
9use lora_ast::Direction;
10
11use crate::{
12    MutationEvent, MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue, RelationshipId,
13    RelationshipRecord,
14};
15
16#[cfg(test)]
17use super::property_index::PropertyIndexState;
18use super::property_index::{PropertyIndexKey, PropertyIndexRegistry};
19
20#[derive(Default)]
21pub struct InMemoryGraph {
22    pub(super) next_node_id: NodeId,
23    pub(super) next_rel_id: RelationshipId,
24
25    /// Slot-indexed node storage: `nodes[id as usize]` is the record at `id`.
26    /// `None` slots are tombstones from deletes (we don't compact). Because
27    /// `next_node_id` is monotonic the slot at `id` is initialized exactly
28    /// when `id < next_node_id` — same identity guarantee the previous
29    /// `BTreeMap<NodeId, NodeRecord>` had, just with O(1) lookup and
30    /// cache-coherent layout.
31    ///
32    /// Records are wrapped in `Arc` so [`Self::clone`] (called on every
33    /// auto-commit write to build a working copy) is `O(N)` atomic
34    /// refcount bumps instead of `O(N)` deep record clones — for a
35    /// 100k-node graph the difference is microseconds vs. tens of
36    /// milliseconds. Mutating a record uses `Arc::make_mut`, which
37    /// clones in place when the refcount is 1 (no concurrent reader)
38    /// and falls back to a single-record clone-on-write when readers
39    /// still hold a snapshot.
40    pub(super) nodes: Vec<Option<Arc<NodeRecord>>>,
41    pub(super) relationships: Vec<Option<Arc<RelationshipRecord>>>,
42    /// Live (non-tombstoned) counts kept in sync with `put_*`/`take_*` so
43    /// `node_count` / `relationship_count` stay O(1) — without a counter
44    /// they'd have to scan the slab.
45    pub(super) live_node_count: usize,
46    pub(super) live_rel_count: usize,
47
48    /// Adjacency keyed by NodeId. `outgoing[id]` is the list of relationship
49    /// ids that leave `id`; mirrored on `incoming[id]`. Inner `Vec` instead
50    /// of `BTreeSet` because edges are inserted exactly once and traversal
51    /// only needs sequential iteration; the cache-friendly contiguous layout
52    /// shows up on every traversal hop.
53    pub(super) outgoing: Vec<Vec<RelationshipId>>,
54    pub(super) incoming: Vec<Vec<RelationshipId>>,
55
56    // secondary indexes
57    /// Label -> the (unique, monotonic) node ids that carry it. The inner
58    /// `Vec` instead of `BTreeSet` because every node id is inserted at most
59    /// once per label (no dedup needed) and every consumer iterates the
60    /// whole list anyway — contiguous storage iterates faster than a
61    /// tree-of-pointers, and removes via `swap_remove` stay O(degree-of-label).
62    pub(super) nodes_by_label: BTreeMap<String, Vec<NodeId>>,
63    pub(super) relationships_by_type: BTreeMap<String, Vec<RelationshipId>>,
64    pub(super) indexes: RwLock<PropertyIndexRegistry>,
65    pub(super) active_node_property_indexes: AtomicUsize,
66    pub(super) active_relationship_property_indexes: AtomicUsize,
67
68    /// Optional mutation observer. When `Some`, every committed mutation
69    /// fans out to this recorder *after* the in-memory state has been
70    /// updated. The recorder is not part of the graph's identity, so Clone
71    /// and snapshot restore both reset it to `None`.
72    pub(super) recorder: Option<Arc<dyn MutationRecorder>>,
73}
74
75impl std::fmt::Debug for InMemoryGraph {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("InMemoryGraph")
78            .field("next_node_id", &self.next_node_id)
79            .field("next_rel_id", &self.next_rel_id)
80            .field("nodes", &self.nodes)
81            .field("relationships", &self.relationships)
82            .field("outgoing", &self.outgoing)
83            .field("incoming", &self.incoming)
84            .field("nodes_by_label", &self.nodes_by_label)
85            .field("relationships_by_type", &self.relationships_by_type)
86            .field("indexes", &self.indexes)
87            .field(
88                "active_node_property_indexes",
89                &self.active_node_property_index_count(),
90            )
91            .field(
92                "active_relationship_property_indexes",
93                &self.active_relationship_property_index_count(),
94            )
95            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
96            .finish()
97    }
98}
99
100impl Clone for InMemoryGraph {
101    fn clone(&self) -> Self {
102        // Deliberately drop the recorder on clone: a cloned store is a
103        // separate identity; it should not silently share the observer.
104        Self {
105            next_node_id: self.next_node_id,
106            next_rel_id: self.next_rel_id,
107            nodes: self.nodes.clone(),
108            relationships: self.relationships.clone(),
109            live_node_count: self.live_node_count,
110            live_rel_count: self.live_rel_count,
111            outgoing: self.outgoing.clone(),
112            incoming: self.incoming.clone(),
113            nodes_by_label: self.nodes_by_label.clone(),
114            relationships_by_type: self.relationships_by_type.clone(),
115            indexes: RwLock::new(if self.has_active_property_indexes() {
116                self.indexes_read().clone()
117            } else {
118                PropertyIndexRegistry::default()
119            }),
120            active_node_property_indexes: AtomicUsize::new(self.active_node_property_index_count()),
121            active_relationship_property_indexes: AtomicUsize::new(
122                self.active_relationship_property_index_count(),
123            ),
124            recorder: None,
125        }
126    }
127}
128
129impl InMemoryGraph {
130    pub fn new() -> Self {
131        Self::default()
132    }
133
134    pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
135        // BTreeMap/BTreeSet do not support capacity reservation.
136        Self::default()
137    }
138
139    pub fn contains_node(&self, node_id: NodeId) -> bool {
140        self.node_at(node_id).is_some()
141    }
142
143    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
144        self.rel_at(rel_id).is_some()
145    }
146
147    /// Install (or clear) the mutation recorder. Passing `None` detaches any
148    /// currently-installed recorder. The recorder observes every committed
149    /// mutation *after* it has been applied.
150    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
151        self.recorder = recorder;
152    }
153
154    /// Handle to the currently-installed recorder, if any.
155    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
156        self.recorder.as_ref()
157    }
158
159    /// Emit a mutation event only if a recorder is installed. The event is
160    /// built lazily — callers pass a closure, so when no recorder is
161    /// attached we pay only a `None` check and the cost of constructing the
162    /// event (labels/properties clones) is avoided.
163    #[inline]
164    pub(super) fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
165        if let Some(rec) = &self.recorder {
166            rec.record(build());
167        }
168    }
169
170    pub(super) fn alloc_node_id(&mut self) -> NodeId {
171        let id = self.next_node_id;
172        self.next_node_id += 1;
173        id
174    }
175
176    pub(super) fn alloc_rel_id(&mut self) -> RelationshipId {
177        let id = self.next_rel_id;
178        self.next_rel_id += 1;
179        id
180    }
181
182    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
183        let next = id
184            .checked_add(1)
185            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
186        self.next_node_id = self.next_node_id.max(next);
187        Ok(())
188    }
189
190    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
191        let next = id
192            .checked_add(1)
193            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
194        self.next_rel_id = self.next_rel_id.max(next);
195        Ok(())
196    }
197
198    // ---------- Slab access helpers ----------
199    //
200    // Stand-in for the BTreeMap API the previous storage used. They keep the
201    // call sites readable while the underlying layout is positional Vec.
202
203    #[inline]
204    pub(super) fn node_at(&self, id: NodeId) -> Option<&NodeRecord> {
205        self.nodes
206            .get(id as usize)
207            .and_then(|s| s.as_ref())
208            .map(|arc| arc.as_ref())
209    }
210
211    /// Mutable handle to a node record, doing copy-on-write only when the
212    /// `Arc` is shared with a concurrent reader. With no readers (the
213    /// common case after a fresh write_store clone), `Arc::make_mut`
214    /// upgrades in place — no record clone.
215    #[inline]
216    pub(super) fn node_at_mut(&mut self, id: NodeId) -> Option<&mut NodeRecord> {
217        self.nodes
218            .get_mut(id as usize)
219            .and_then(|s| s.as_mut())
220            .map(Arc::make_mut)
221    }
222
223    #[inline]
224    pub(super) fn rel_at(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
225        self.relationships
226            .get(id as usize)
227            .and_then(|s| s.as_ref())
228            .map(|arc| arc.as_ref())
229    }
230
231    #[inline]
232    pub(super) fn rel_at_mut(&mut self, id: RelationshipId) -> Option<&mut RelationshipRecord> {
233        self.relationships
234            .get_mut(id as usize)
235            .and_then(|s| s.as_mut())
236            .map(Arc::make_mut)
237    }
238
239    /// Resize the node-keyed Vecs so `id as usize` is in range. Adjacency
240    /// lists are kept in lockstep with `nodes`, so a freshly-grown slot has
241    /// empty outgoing/incoming Vecs ready to receive edges.
242    fn ensure_node_slot(&mut self, id: NodeId) {
243        let target = id as usize + 1;
244        if self.nodes.len() < target {
245            self.nodes.resize_with(target, || None);
246            self.outgoing.resize_with(target, Vec::new);
247            self.incoming.resize_with(target, Vec::new);
248        }
249    }
250
251    fn ensure_rel_slot(&mut self, id: RelationshipId) {
252        let target = id as usize + 1;
253        if self.relationships.len() < target {
254            self.relationships.resize_with(target, || None);
255        }
256    }
257
258    pub(super) fn put_node(&mut self, id: NodeId, node: NodeRecord) {
259        self.ensure_node_slot(id);
260        let was_present = self.nodes[id as usize].is_some();
261        self.nodes[id as usize] = Some(Arc::new(node));
262        if !was_present {
263            self.live_node_count += 1;
264        }
265    }
266
267    pub(super) fn put_rel(&mut self, id: RelationshipId, rel: RelationshipRecord) {
268        self.ensure_rel_slot(id);
269        let was_present = self.relationships[id as usize].is_some();
270        self.relationships[id as usize] = Some(Arc::new(rel));
271        if !was_present {
272            self.live_rel_count += 1;
273        }
274    }
275
276    pub(super) fn take_node(&mut self, id: NodeId) -> Option<NodeRecord> {
277        let idx = id as usize;
278        let removed = self.nodes.get_mut(idx).and_then(|s| s.take());
279        if removed.is_some() {
280            self.live_node_count -= 1;
281            // Also clear the per-id adjacency entries so the memory is reclaimed
282            // on the typical "delete every node" pattern. We deliberately do not
283            // shrink the outer Vec — leaving the slot lets new ids reuse the
284            // same index without growth churn (and `next_node_id` is monotonic
285            // anyway, so no immediate reuse).
286            if let Some(out) = self.outgoing.get_mut(idx) {
287                out.clear();
288            }
289            if let Some(inc) = self.incoming.get_mut(idx) {
290                inc.clear();
291            }
292        }
293        // Unwrap the Arc — `try_unwrap` returns the inner `NodeRecord`
294        // without cloning when our slab held the only reference, falling
295        // back to a clone only when concurrent readers still hold a
296        // snapshot Arc.
297        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
298    }
299
300    pub(super) fn take_rel(&mut self, id: RelationshipId) -> Option<RelationshipRecord> {
301        let idx = id as usize;
302        let removed = self.relationships.get_mut(idx).and_then(|s| s.take());
303        if removed.is_some() {
304            self.live_rel_count -= 1;
305        }
306        removed.map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()))
307    }
308
309    #[inline]
310    pub(super) fn outgoing_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
311        self.outgoing.get(id as usize)
312    }
313
314    #[inline]
315    pub(super) fn incoming_at(&self, id: NodeId) -> Option<&Vec<RelationshipId>> {
316        self.incoming.get(id as usize)
317    }
318
319    pub(super) fn iter_node_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
320        self.nodes
321            .iter()
322            .enumerate()
323            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as NodeId))
324    }
325
326    pub(super) fn iter_node_records(&self) -> impl Iterator<Item = &NodeRecord> + '_ {
327        self.nodes
328            .iter()
329            .filter_map(|s| s.as_ref())
330            .map(|arc| arc.as_ref())
331    }
332
333    pub(super) fn iter_rel_ids(&self) -> impl Iterator<Item = RelationshipId> + '_ {
334        self.relationships
335            .iter()
336            .enumerate()
337            .filter_map(|(i, slot)| slot.as_ref().map(|_| i as RelationshipId))
338    }
339
340    pub(super) fn iter_rel_records(&self) -> impl Iterator<Item = &RelationshipRecord> + '_ {
341        self.relationships
342            .iter()
343            .filter_map(|s| s.as_ref())
344            .map(|arc| arc.as_ref())
345    }
346
347    pub(super) fn iter_nodes(&self) -> impl Iterator<Item = (NodeId, &NodeRecord)> + '_ {
348        self.nodes
349            .iter()
350            .enumerate()
351            .filter_map(|(i, slot)| slot.as_ref().map(|n| (i as NodeId, n.as_ref())))
352    }
353
354    pub(super) fn iter_rels(
355        &self,
356    ) -> impl Iterator<Item = (RelationshipId, &RelationshipRecord)> + '_ {
357        self.relationships
358            .iter()
359            .enumerate()
360            .filter_map(|(i, slot)| slot.as_ref().map(|r| (i as RelationshipId, r.as_ref())))
361    }
362
363    /// Add `rel_id` to `node_id`'s outgoing list. Idempotent: skips the push
364    /// if the id is already present (it shouldn't be — relationship ids are
365    /// monotonic — but the guard is cheap and keeps the invariant explicit).
366    fn outgoing_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
367        self.ensure_node_slot(node_id);
368        let v = &mut self.outgoing[node_id as usize];
369        if !v.contains(&rel_id) {
370            v.push(rel_id);
371        }
372    }
373
374    fn incoming_push(&mut self, node_id: NodeId, rel_id: RelationshipId) {
375        self.ensure_node_slot(node_id);
376        let v = &mut self.incoming[node_id as usize];
377        if !v.contains(&rel_id) {
378            v.push(rel_id);
379        }
380    }
381
382    /// Remove `rel_id` from `node_id`'s outgoing list. `swap_remove` keeps
383    /// the operation O(1) — adjacency order doesn't carry semantic meaning.
384    fn outgoing_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
385        if let Some(v) = self.outgoing.get_mut(node_id as usize) {
386            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
387                v.swap_remove(pos);
388            }
389        }
390    }
391
392    fn incoming_remove(&mut self, node_id: NodeId, rel_id: RelationshipId) {
393        if let Some(v) = self.incoming.get_mut(node_id as usize) {
394            if let Some(pos) = v.iter().position(|&id| id == rel_id) {
395                v.swap_remove(pos);
396            }
397        }
398    }
399
400    pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
401        let mut seen = BTreeSet::new();
402
403        labels
404            .into_iter()
405            .map(|s| s.trim().to_string())
406            .filter(|s| !s.is_empty())
407            .filter(|s| seen.insert(s.clone()))
408            .collect()
409    }
410
411    pub(super) fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
412        let bucket = self.nodes_by_label.entry(label.to_string()).or_default();
413        // Same monotonic-id invariant as `outgoing_push`: ids never appear
414        // twice, but the `contains` guard is cheap on small buckets and
415        // makes the invariant explicit for replay paths.
416        if !bucket.contains(&node_id) {
417            bucket.push(node_id);
418        }
419    }
420
421    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
422        if let Some(ids) = self.nodes_by_label.get_mut(label) {
423            if let Some(pos) = ids.iter().position(|&id| id == node_id) {
424                ids.swap_remove(pos);
425            }
426            if ids.is_empty() {
427                self.nodes_by_label.remove(label);
428            }
429        }
430    }
431
432    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
433        let bucket = self
434            .relationships_by_type
435            .entry(rel_type.to_string())
436            .or_default();
437        if !bucket.contains(&rel_id) {
438            bucket.push(rel_id);
439        }
440    }
441
442    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
443        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
444            if let Some(pos) = ids.iter().position(|&id| id == rel_id) {
445                ids.swap_remove(pos);
446            }
447            if ids.is_empty() {
448                self.relationships_by_type.remove(rel_type);
449            }
450        }
451    }
452
453    pub(super) fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
454        self.indexes
455            .read()
456            .unwrap_or_else(|poisoned| poisoned.into_inner())
457    }
458
459    pub(super) fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
460        self.indexes
461            .write()
462            .unwrap_or_else(|poisoned| poisoned.into_inner())
463    }
464
465    pub(super) fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
466        self.indexes
467            .get_mut()
468            .unwrap_or_else(|poisoned| poisoned.into_inner())
469    }
470
471    #[inline]
472    pub(super) fn active_node_property_index_count(&self) -> usize {
473        self.active_node_property_indexes.load(Ordering::Relaxed)
474    }
475
476    #[inline]
477    pub(super) fn active_relationship_property_index_count(&self) -> usize {
478        self.active_relationship_property_indexes
479            .load(Ordering::Relaxed)
480    }
481
482    #[inline]
483    pub(super) fn has_active_property_indexes(&self) -> bool {
484        self.active_node_property_index_count() != 0
485            || self.active_relationship_property_index_count() != 0
486    }
487
488    pub(super) fn node_property_index_is_active(&mut self, key: &str) -> bool {
489        self.active_node_property_index_count() != 0
490            && self.indexes_mut().node_properties.is_active(key)
491    }
492
493    pub(super) fn relationship_property_index_is_active(&mut self, key: &str) -> bool {
494        self.active_relationship_property_index_count() != 0
495            && self.indexes_mut().relationship_properties.is_active(key)
496    }
497
498    pub(super) fn ensure_node_property_index(&self, key: &str) {
499        {
500            let indexes = self.indexes_read();
501            if indexes.node_properties.is_active(key) {
502                return;
503            }
504        }
505
506        let mut indexes = self.indexes_write();
507        if indexes.node_properties.is_active(key) {
508            return;
509        }
510
511        for (id, node) in self.iter_nodes() {
512            if let Some(value) = node.properties.get(key) {
513                indexes.node_properties.insert_with_scopes(
514                    id,
515                    node.labels.iter().map(String::as_str),
516                    key,
517                    value,
518                );
519            }
520        }
521        if indexes.node_properties.activate(key) {
522            self.active_node_property_indexes
523                .fetch_add(1, Ordering::Relaxed);
524        }
525    }
526
527    pub(super) fn ensure_relationship_property_index(&self, key: &str) {
528        {
529            let indexes = self.indexes_read();
530            if indexes.relationship_properties.is_active(key) {
531                return;
532            }
533        }
534
535        let mut indexes = self.indexes_write();
536        if indexes.relationship_properties.is_active(key) {
537            return;
538        }
539
540        for (id, rel) in self.iter_rels() {
541            if let Some(value) = rel.properties.get(key) {
542                indexes.relationship_properties.insert_with_scopes(
543                    id,
544                    [rel.rel_type.as_str()],
545                    key,
546                    value,
547                );
548            }
549        }
550        if indexes.relationship_properties.activate(key) {
551            self.active_relationship_property_indexes
552                .fetch_add(1, Ordering::Relaxed);
553        }
554    }
555
556    pub(super) fn rebuild_property_indexes(&mut self) {
557        let mut indexes = PropertyIndexRegistry::default();
558
559        for (id, node) in self.iter_nodes() {
560            for (key, value) in &node.properties {
561                if PropertyIndexKey::from_value(value).is_some() {
562                    indexes.node_properties.activate(key);
563                    indexes.node_properties.insert_with_scopes(
564                        id,
565                        node.labels.iter().map(String::as_str),
566                        key,
567                        value,
568                    );
569                }
570            }
571        }
572
573        for (id, rel) in self.iter_rels() {
574            for (key, value) in &rel.properties {
575                if PropertyIndexKey::from_value(value).is_some() {
576                    indexes.relationship_properties.activate(key);
577                    indexes.relationship_properties.insert_with_scopes(
578                        id,
579                        [rel.rel_type.as_str()],
580                        key,
581                        value,
582                    );
583                }
584            }
585        }
586
587        let node_index_count = indexes.node_properties.active_keys.len();
588        let relationship_index_count = indexes.relationship_properties.active_keys.len();
589        *self.indexes_mut() = indexes;
590        self.active_node_property_indexes
591            .store(node_index_count, Ordering::Relaxed);
592        self.active_relationship_property_indexes
593            .store(relationship_index_count, Ordering::Relaxed);
594    }
595
596    pub(super) fn on_node_created(&mut self, node: &NodeRecord) {
597        for label in &node.labels {
598            self.insert_node_label_index(node.id, label);
599        }
600        self.index_node_properties_if_active(
601            node.id,
602            node.labels.iter().map(String::as_str),
603            &node.properties,
604        );
605    }
606
607    pub(super) fn on_node_replayed(&mut self, node: &NodeRecord) {
608        for label in &node.labels {
609            self.insert_node_label_index(node.id, label);
610        }
611        self.index_node_properties_eager(
612            node.id,
613            node.labels.iter().map(String::as_str),
614            &node.properties,
615        );
616    }
617
618    pub(super) fn on_node_property_set(
619        &mut self,
620        node_id: NodeId,
621        key: &str,
622        old: Option<&PropertyValue>,
623        new: &PropertyValue,
624    ) {
625        if !self.node_property_index_is_active(key) {
626            return;
627        }
628
629        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
630            return;
631        };
632
633        if let Some(old) = old {
634            self.unindex_node_property_if_active(
635                node_id,
636                labels.iter().map(String::as_str),
637                key,
638                old,
639            );
640        }
641        self.index_node_property_if_active(node_id, labels.iter().map(String::as_str), key, new);
642    }
643
644    pub(super) fn on_node_property_removed(
645        &mut self,
646        node_id: NodeId,
647        key: &str,
648        old: &PropertyValue,
649    ) {
650        if !self.node_property_index_is_active(key) {
651            return;
652        }
653
654        let Some(labels) = self.node_at(node_id).map(|node| node.labels.clone()) else {
655            return;
656        };
657        self.unindex_node_property_if_active(node_id, labels.iter().map(String::as_str), key, old);
658    }
659
660    pub(super) fn on_node_label_added(&mut self, node_id: NodeId, label: &str) {
661        self.insert_node_label_index(node_id, label);
662
663        if self.active_node_property_index_count() == 0 {
664            return;
665        }
666
667        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
668            return;
669        };
670        self.index_node_scope_properties_if_active(node_id, label, &properties);
671    }
672
673    pub(super) fn on_node_label_removed(&mut self, node_id: NodeId, label: &str) {
674        self.remove_node_label_index(node_id, label);
675
676        if self.active_node_property_index_count() == 0 {
677            return;
678        }
679
680        let Some(properties) = self.node_at(node_id).map(|node| node.properties.clone()) else {
681            return;
682        };
683        self.unindex_node_scope_properties_if_active(node_id, label, &properties);
684    }
685
686    pub(super) fn on_node_deleted(&mut self, node: &NodeRecord) {
687        for label in &node.labels {
688            self.remove_node_label_index(node.id, label);
689        }
690        self.unindex_active_node_properties(
691            node.id,
692            node.labels.iter().map(String::as_str),
693            &node.properties,
694        );
695    }
696
697    pub(super) fn on_relationship_created(&mut self, rel: &RelationshipRecord) {
698        self.attach_relationship(rel);
699        self.index_relationship_properties_if_active(
700            rel.id,
701            [rel.rel_type.as_str()],
702            &rel.properties,
703        );
704    }
705
706    pub(super) fn on_relationship_replayed(&mut self, rel: &RelationshipRecord) {
707        self.attach_relationship(rel);
708        self.index_relationship_properties_eager(rel.id, [rel.rel_type.as_str()], &rel.properties);
709    }
710
711    pub(super) fn on_relationship_property_set(
712        &mut self,
713        rel_id: RelationshipId,
714        key: &str,
715        old: Option<&PropertyValue>,
716        new: &PropertyValue,
717    ) {
718        if !self.relationship_property_index_is_active(key) {
719            return;
720        }
721
722        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
723            return;
724        };
725
726        if let Some(old) = old {
727            self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
728        }
729        self.index_relationship_property_if_active(rel_id, [rel_type.as_str()], key, new);
730    }
731
732    pub(super) fn on_relationship_property_removed(
733        &mut self,
734        rel_id: RelationshipId,
735        key: &str,
736        old: &PropertyValue,
737    ) {
738        if !self.relationship_property_index_is_active(key) {
739            return;
740        }
741
742        let Some(rel_type) = self.rel_at(rel_id).map(|rel| rel.rel_type.clone()) else {
743            return;
744        };
745        self.unindex_relationship_property_if_active(rel_id, [rel_type.as_str()], key, old);
746    }
747
748    pub(super) fn on_relationship_deleted(&mut self, rel: &RelationshipRecord) {
749        self.detach_relationship_indexes(rel);
750        self.unindex_active_relationship_properties(
751            rel.id,
752            [rel.rel_type.as_str()],
753            &rel.properties,
754        );
755    }
756
757    fn index_node_property_eager<'a>(
758        &mut self,
759        node_id: NodeId,
760        labels: impl IntoIterator<Item = &'a str>,
761        key: &str,
762        value: &PropertyValue,
763    ) {
764        if PropertyIndexKey::from_value(value).is_none() {
765            return;
766        }
767
768        let activated = {
769            let indexes = self.indexes_mut();
770            let activated = indexes.node_properties.activate(key);
771            indexes
772                .node_properties
773                .insert_with_scopes(node_id, labels, key, value);
774            activated
775        };
776        if activated {
777            self.active_node_property_indexes
778                .fetch_add(1, Ordering::Relaxed);
779        }
780    }
781
782    fn index_relationship_property_eager<'a>(
783        &mut self,
784        rel_id: RelationshipId,
785        scopes: impl IntoIterator<Item = &'a str>,
786        key: &str,
787        value: &PropertyValue,
788    ) {
789        if PropertyIndexKey::from_value(value).is_none() {
790            return;
791        }
792
793        let activated = {
794            let indexes = self.indexes_mut();
795            let activated = indexes.relationship_properties.activate(key);
796            indexes
797                .relationship_properties
798                .insert_with_scopes(rel_id, scopes, key, value);
799            activated
800        };
801        if activated {
802            self.active_relationship_property_indexes
803                .fetch_add(1, Ordering::Relaxed);
804        }
805    }
806
807    fn index_node_properties_eager<'a>(
808        &mut self,
809        node_id: NodeId,
810        labels: impl IntoIterator<Item = &'a str> + Clone,
811        properties: &Properties,
812    ) {
813        for (key, value) in properties {
814            self.index_node_property_eager(node_id, labels.clone(), key, value);
815        }
816    }
817
818    fn index_relationship_properties_eager<'a>(
819        &mut self,
820        rel_id: RelationshipId,
821        scopes: impl IntoIterator<Item = &'a str> + Clone,
822        properties: &Properties,
823    ) {
824        for (key, value) in properties {
825            self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
826        }
827    }
828
829    fn index_node_property_if_active<'a>(
830        &mut self,
831        node_id: NodeId,
832        labels: impl IntoIterator<Item = &'a str>,
833        key: &str,
834        value: &PropertyValue,
835    ) {
836        if self.active_node_property_index_count() == 0 {
837            return;
838        }
839        let indexes = self.indexes_mut();
840        if indexes.node_properties.is_active(key) {
841            indexes
842                .node_properties
843                .insert_with_scopes(node_id, labels, key, value);
844        }
845    }
846
847    fn index_node_properties_if_active<'a>(
848        &mut self,
849        node_id: NodeId,
850        labels: impl IntoIterator<Item = &'a str> + Clone,
851        properties: &Properties,
852    ) {
853        if self.active_node_property_index_count() == 0 {
854            return;
855        }
856        let indexes = self.indexes_mut();
857        for (key, value) in properties {
858            if indexes.node_properties.is_active(key) {
859                indexes
860                    .node_properties
861                    .insert_with_scopes(node_id, labels.clone(), key, value);
862            }
863        }
864    }
865
866    fn unindex_node_property_if_active<'a>(
867        &mut self,
868        node_id: NodeId,
869        labels: impl IntoIterator<Item = &'a str>,
870        key: &str,
871        value: &PropertyValue,
872    ) {
873        if self.active_node_property_index_count() == 0 {
874            return;
875        }
876        let indexes = self.indexes_mut();
877        if indexes.node_properties.is_active(key) {
878            indexes
879                .node_properties
880                .remove_with_scopes(node_id, labels, key, value);
881        }
882    }
883
884    fn index_node_scope_properties_if_active(
885        &mut self,
886        node_id: NodeId,
887        scope: &str,
888        properties: &Properties,
889    ) {
890        if self.active_node_property_index_count() == 0 {
891            return;
892        }
893        let indexes = self.indexes_mut();
894        for (key, value) in properties {
895            if indexes.node_properties.is_active(key) {
896                indexes
897                    .node_properties
898                    .insert_scoped(node_id, scope, key, value);
899            }
900        }
901    }
902
903    fn unindex_node_scope_properties_if_active(
904        &mut self,
905        node_id: NodeId,
906        scope: &str,
907        properties: &Properties,
908    ) {
909        if self.active_node_property_index_count() == 0 {
910            return;
911        }
912        let indexes = self.indexes_mut();
913        for (key, value) in properties {
914            if indexes.node_properties.is_active(key) {
915                indexes
916                    .node_properties
917                    .remove_scoped(node_id, scope, key, value);
918            }
919        }
920    }
921
922    fn unindex_active_node_properties<'a>(
923        &mut self,
924        node_id: NodeId,
925        labels: impl IntoIterator<Item = &'a str> + Clone,
926        properties: &Properties,
927    ) {
928        if self.active_node_property_index_count() == 0 {
929            return;
930        }
931        let indexes = self.indexes_mut();
932        for (key, value) in properties {
933            if indexes.node_properties.is_active(key) {
934                indexes
935                    .node_properties
936                    .remove_with_scopes(node_id, labels.clone(), key, value);
937            }
938        }
939    }
940
941    fn index_relationship_property_if_active<'a>(
942        &mut self,
943        rel_id: RelationshipId,
944        scopes: impl IntoIterator<Item = &'a str>,
945        key: &str,
946        value: &PropertyValue,
947    ) {
948        if self.active_relationship_property_index_count() == 0 {
949            return;
950        }
951        let indexes = self.indexes_mut();
952        if indexes.relationship_properties.is_active(key) {
953            indexes
954                .relationship_properties
955                .insert_with_scopes(rel_id, scopes, key, value);
956        }
957    }
958
959    fn index_relationship_properties_if_active<'a>(
960        &mut self,
961        rel_id: RelationshipId,
962        scopes: impl IntoIterator<Item = &'a str> + Clone,
963        properties: &Properties,
964    ) {
965        if self.active_relationship_property_index_count() == 0 {
966            return;
967        }
968        let indexes = self.indexes_mut();
969        for (key, value) in properties {
970            if indexes.relationship_properties.is_active(key) {
971                indexes.relationship_properties.insert_with_scopes(
972                    rel_id,
973                    scopes.clone(),
974                    key,
975                    value,
976                );
977            }
978        }
979    }
980
981    fn unindex_relationship_property_if_active<'a>(
982        &mut self,
983        rel_id: RelationshipId,
984        scopes: impl IntoIterator<Item = &'a str>,
985        key: &str,
986        value: &PropertyValue,
987    ) {
988        if self.active_relationship_property_index_count() == 0 {
989            return;
990        }
991        let indexes = self.indexes_mut();
992        if indexes.relationship_properties.is_active(key) {
993            indexes
994                .relationship_properties
995                .remove_with_scopes(rel_id, scopes, key, value);
996        }
997    }
998
999    fn unindex_active_relationship_properties<'a>(
1000        &mut self,
1001        rel_id: RelationshipId,
1002        scopes: impl IntoIterator<Item = &'a str> + Clone,
1003        properties: &Properties,
1004    ) {
1005        if self.active_relationship_property_index_count() == 0 {
1006            return;
1007        }
1008        let indexes = self.indexes_mut();
1009        for (key, value) in properties {
1010            if indexes.relationship_properties.is_active(key) {
1011                indexes.relationship_properties.remove_with_scopes(
1012                    rel_id,
1013                    scopes.clone(),
1014                    key,
1015                    value,
1016                );
1017            }
1018        }
1019    }
1020
1021    pub(super) fn scan_nodes_by_property(
1022        &self,
1023        label: Option<&str>,
1024        key: &str,
1025        value: &PropertyValue,
1026    ) -> Vec<NodeRecord> {
1027        let candidates: Box<dyn Iterator<Item = &NodeRecord> + '_> = match label {
1028            Some(label) => Box::new(
1029                self.nodes_by_label
1030                    .get(label)
1031                    .into_iter()
1032                    .flat_map(|ids| ids.iter())
1033                    .filter_map(|&id| self.node_at(id)),
1034            ),
1035            None => Box::new(self.iter_node_records()),
1036        };
1037
1038        candidates
1039            .filter(|node| node.properties.get(key) == Some(value))
1040            .cloned()
1041            .collect()
1042    }
1043
1044    pub(super) fn scan_relationships_by_property(
1045        &self,
1046        rel_type: Option<&str>,
1047        key: &str,
1048        value: &PropertyValue,
1049    ) -> Vec<RelationshipRecord> {
1050        let candidates: Box<dyn Iterator<Item = &RelationshipRecord> + '_> = match rel_type {
1051            Some(rel_type) => Box::new(
1052                self.relationships_by_type
1053                    .get(rel_type)
1054                    .into_iter()
1055                    .flat_map(|ids| ids.iter())
1056                    .filter_map(|&id| self.rel_at(id)),
1057            ),
1058            None => Box::new(self.iter_rel_records()),
1059        };
1060
1061        candidates
1062            .filter(|rel| rel.properties.get(key) == Some(value))
1063            .cloned()
1064            .collect()
1065    }
1066
1067    pub(super) fn attach_relationship(&mut self, rel: &RelationshipRecord) {
1068        self.outgoing_push(rel.src, rel.id);
1069        self.incoming_push(rel.dst, rel.id);
1070        self.insert_relationship_type_index(rel.id, &rel.rel_type);
1071    }
1072
1073    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
1074        // Adjacency is now positional `Vec<Vec<RelationshipId>>` — clearing
1075        // the inner Vec leaves the slot in place (the slot is sized for the
1076        // node's lifetime, not the edge's).
1077        self.outgoing_remove(rel.src, rel.id);
1078        self.incoming_remove(rel.dst, rel.id);
1079
1080        self.remove_relationship_type_index(rel.id, &rel.rel_type);
1081    }
1082
1083    pub(super) fn relationship_ids_for_direction(
1084        &self,
1085        node_id: NodeId,
1086        direction: Direction,
1087    ) -> Vec<RelationshipId> {
1088        match direction {
1089            Direction::Left => self.incoming_at(node_id).cloned().unwrap_or_default(),
1090
1091            Direction::Right => self.outgoing_at(node_id).cloned().unwrap_or_default(),
1092
1093            Direction::Undirected => {
1094                let mut ids = BTreeSet::new();
1095
1096                if let Some(out) = self.outgoing_at(node_id) {
1097                    ids.extend(out.iter().copied());
1098                }
1099                if let Some(inc) = self.incoming_at(node_id) {
1100                    ids.extend(inc.iter().copied());
1101                }
1102
1103                ids.into_iter().collect()
1104            }
1105        }
1106    }
1107
1108    pub(super) fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
1109        if rel.src == node_id {
1110            Some(rel.dst)
1111        } else if rel.dst == node_id {
1112            Some(rel.src)
1113        } else {
1114            None
1115        }
1116    }
1117
1118    pub(super) fn has_incident_relationships(&self, node_id: NodeId) -> bool {
1119        self.outgoing_at(node_id)
1120            .map(|ids| !ids.is_empty())
1121            .unwrap_or(false)
1122            || self
1123                .incoming_at(node_id)
1124                .map(|ids| !ids.is_empty())
1125                .unwrap_or(false)
1126    }
1127
1128    pub(super) fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
1129        let mut rel_ids = BTreeSet::new();
1130
1131        if let Some(ids) = self.outgoing_at(node_id) {
1132            rel_ids.extend(ids.iter().copied());
1133        }
1134        if let Some(ids) = self.incoming_at(node_id) {
1135            rel_ids.extend(ids.iter().copied());
1136        }
1137
1138        rel_ids
1139    }
1140
1141    /// Replay a node creation using the id captured in a durable mutation
1142    /// event. This intentionally does not emit a new mutation event: callers
1143    /// must invoke it before installing a recorder on the graph.
1144    #[doc(hidden)]
1145    pub fn replay_create_node(
1146        &mut self,
1147        id: NodeId,
1148        labels: Vec<String>,
1149        properties: Properties,
1150    ) -> Result<NodeRecord, String> {
1151        if self.recorder.is_some() {
1152            return Err(
1153                "cannot replay node creation while a mutation recorder is installed".into(),
1154            );
1155        }
1156        if self.node_at(id).is_some() {
1157            return Err(format!("node id {id} already exists"));
1158        }
1159
1160        let labels = Self::normalize_labels(labels);
1161        let node = NodeRecord {
1162            id,
1163            labels: labels.clone(),
1164            properties,
1165        };
1166
1167        self.on_node_replayed(&node);
1168        self.put_node(id, node.clone());
1169        // ensure_node_slot grew both adjacency Vecs to cover this id when
1170        // we put_node above.
1171        self.bump_next_node_id_past(id)?;
1172
1173        Ok(node)
1174    }
1175
1176    /// Replay a relationship creation using the id captured in a durable
1177    /// mutation event. This intentionally does not emit a new mutation event:
1178    /// callers must invoke it before installing a recorder on the graph.
1179    #[doc(hidden)]
1180    pub fn replay_create_relationship(
1181        &mut self,
1182        id: RelationshipId,
1183        src: NodeId,
1184        dst: NodeId,
1185        rel_type: &str,
1186        properties: Properties,
1187    ) -> Result<RelationshipRecord, String> {
1188        if self.recorder.is_some() {
1189            return Err(
1190                "cannot replay relationship creation while a mutation recorder is installed".into(),
1191            );
1192        }
1193        if self.rel_at(id).is_some() {
1194            return Err(format!("relationship id {id} already exists"));
1195        }
1196        if self.node_at(src).is_none() {
1197            return Err(format!(
1198                "relationship {id} references missing source node {src}"
1199            ));
1200        }
1201        if self.node_at(dst).is_none() {
1202            return Err(format!(
1203                "relationship {id} references missing target node {dst}"
1204            ));
1205        }
1206
1207        let trimmed = rel_type.trim();
1208        if trimmed.is_empty() {
1209            return Err(format!("relationship {id} has an empty type"));
1210        }
1211
1212        let rel = RelationshipRecord {
1213            id,
1214            src,
1215            dst,
1216            rel_type: trimmed.to_string(),
1217            properties,
1218        };
1219
1220        self.on_relationship_replayed(&rel);
1221        self.put_rel(id, rel.clone());
1222        self.bump_next_rel_id_past(id)?;
1223
1224        Ok(rel)
1225    }
1226
1227    #[cfg(test)]
1228    pub(super) fn assert_property_indexes_match_scan(&self) {
1229        let indexes = self.indexes_read();
1230        assert_eq!(
1231            indexes.node_properties.active_keys.len(),
1232            self.active_node_property_index_count(),
1233            "node property index counter diverged from active key set"
1234        );
1235        assert_eq!(
1236            indexes.relationship_properties.active_keys.len(),
1237            self.active_relationship_property_index_count(),
1238            "relationship property index counter diverged from active key set"
1239        );
1240
1241        let mut expected_nodes = PropertyIndexState {
1242            active_keys: indexes.node_properties.active_keys.clone(),
1243            ..PropertyIndexState::default()
1244        };
1245        for (id, node) in self.iter_nodes() {
1246            for (key, value) in &node.properties {
1247                if expected_nodes.is_active(key) {
1248                    expected_nodes.insert_with_scopes(
1249                        id,
1250                        node.labels.iter().map(String::as_str),
1251                        key,
1252                        value,
1253                    );
1254                }
1255            }
1256        }
1257        assert_eq!(
1258            indexes.node_properties.values, expected_nodes.values,
1259            "node property index values diverged from scan"
1260        );
1261        assert_eq!(
1262            indexes.node_properties.scoped_values, expected_nodes.scoped_values,
1263            "node property scoped index values diverged from scan"
1264        );
1265
1266        let mut expected_relationships = PropertyIndexState {
1267            active_keys: indexes.relationship_properties.active_keys.clone(),
1268            ..PropertyIndexState::default()
1269        };
1270        for (id, rel) in self.iter_rels() {
1271            for (key, value) in &rel.properties {
1272                if expected_relationships.is_active(key) {
1273                    expected_relationships.insert_with_scopes(
1274                        id,
1275                        [rel.rel_type.as_str()],
1276                        key,
1277                        value,
1278                    );
1279                }
1280            }
1281        }
1282        assert_eq!(
1283            indexes.relationship_properties.values, expected_relationships.values,
1284            "relationship property index values diverged from scan"
1285        );
1286        assert_eq!(
1287            indexes.relationship_properties.scoped_values, expected_relationships.scoped_values,
1288            "relationship property scoped index values diverged from scan"
1289        );
1290    }
1291}