Skip to main content

lora_store/
memory.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::{Arc, RwLock, RwLockWriteGuard};
4
5use lora_ast::Direction;
6
7use crate::snapshot::{read_snapshot, write_snapshot};
8use crate::{
9    BorrowedGraphStorage, GraphStorage, GraphStorageMut, MutationEvent, MutationRecorder, NodeId,
10    NodeRecord, Properties, PropertyValue, RelationshipId, RelationshipRecord, SnapshotError,
11    SnapshotMeta, SnapshotPayload, Snapshotable,
12};
13
14type PropertyValueBuckets = HashMap<PropertyIndexKey, BTreeSet<u64>>;
15type PropertyIndex = HashMap<String, PropertyValueBuckets>;
16type ScopedPropertyIndex = HashMap<String, PropertyIndex>;
17
18#[derive(Default)]
19struct PropertyIndexRegistry {
20    node_properties: PropertyIndexState,
21    relationship_properties: PropertyIndexState,
22}
23
24impl std::fmt::Debug for PropertyIndexRegistry {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("PropertyIndexRegistry")
27            .field("node_properties", &self.node_properties)
28            .field("relationship_properties", &self.relationship_properties)
29            .finish()
30    }
31}
32
33impl Clone for PropertyIndexRegistry {
34    fn clone(&self) -> Self {
35        Self {
36            node_properties: self.node_properties.clone(),
37            relationship_properties: self.relationship_properties.clone(),
38        }
39    }
40}
41
42#[derive(Debug, Default, Clone)]
43struct PropertyIndexState {
44    active_keys: BTreeSet<String>,
45    values: PropertyIndex,
46    scoped_values: ScopedPropertyIndex,
47}
48
49impl PropertyIndexState {
50    fn is_active(&self, key: &str) -> bool {
51        self.active_keys.contains(key)
52    }
53
54    fn activate(&mut self, key: &str) -> bool {
55        self.active_keys.insert(key.to_string())
56    }
57
58    fn insert_value(
59        values: &mut PropertyIndex,
60        entity_id: u64,
61        key: &str,
62        value: PropertyIndexKey,
63    ) {
64        values
65            .entry(key.to_string())
66            .or_default()
67            .entry(value)
68            .or_default()
69            .insert(entity_id);
70    }
71
72    fn remove_value(
73        values: &mut PropertyIndex,
74        entity_id: u64,
75        key: &str,
76        value: &PropertyIndexKey,
77    ) {
78        let mut remove_key = false;
79        if let Some(buckets) = values.get_mut(key) {
80            if let Some(ids) = buckets.get_mut(value) {
81                ids.remove(&entity_id);
82                if ids.is_empty() {
83                    buckets.remove(value);
84                }
85            }
86            remove_key = buckets.is_empty();
87        }
88        if remove_key {
89            values.remove(key);
90        }
91    }
92
93    fn insert_scoped(&mut self, entity_id: u64, scope: &str, key: &str, value: &PropertyValue) {
94        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
95            return;
96        };
97
98        Self::insert_value(
99            self.scoped_values.entry(scope.to_string()).or_default(),
100            entity_id,
101            key,
102            indexed_value,
103        );
104    }
105
106    fn insert_with_scopes<'a>(
107        &mut self,
108        entity_id: u64,
109        scopes: impl IntoIterator<Item = &'a str>,
110        key: &str,
111        value: &PropertyValue,
112    ) {
113        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
114            return;
115        };
116
117        Self::insert_value(&mut self.values, entity_id, key, indexed_value.clone());
118        for scope in scopes {
119            Self::insert_value(
120                self.scoped_values.entry(scope.to_string()).or_default(),
121                entity_id,
122                key,
123                indexed_value.clone(),
124            );
125        }
126    }
127
128    fn remove_scoped(&mut self, entity_id: u64, scope: &str, key: &str, value: &PropertyValue) {
129        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
130            return;
131        };
132
133        let mut remove_scope = false;
134        if let Some(values) = self.scoped_values.get_mut(scope) {
135            Self::remove_value(values, entity_id, key, &indexed_value);
136            remove_scope = values.is_empty();
137        }
138        if remove_scope {
139            self.scoped_values.remove(scope);
140        }
141    }
142
143    fn remove_with_scopes<'a>(
144        &mut self,
145        entity_id: u64,
146        scopes: impl IntoIterator<Item = &'a str>,
147        key: &str,
148        value: &PropertyValue,
149    ) {
150        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
151            return;
152        };
153
154        Self::remove_value(&mut self.values, entity_id, key, &indexed_value);
155        for scope in scopes {
156            let mut remove_scope = false;
157            if let Some(values) = self.scoped_values.get_mut(scope) {
158                Self::remove_value(values, entity_id, key, &indexed_value);
159                remove_scope = values.is_empty();
160            }
161            if remove_scope {
162                self.scoped_values.remove(scope);
163            }
164        }
165    }
166
167    fn ids_for(&self, key: &str, value: &PropertyValue) -> Option<&BTreeSet<u64>> {
168        let indexed_value = PropertyIndexKey::from_value(value)?;
169        self.values
170            .get(key)
171            .and_then(|values| values.get(&indexed_value))
172    }
173
174    fn scoped_ids_for(
175        &self,
176        scope: &str,
177        key: &str,
178        value: &PropertyValue,
179    ) -> Option<&BTreeSet<u64>> {
180        let indexed_value = PropertyIndexKey::from_value(value)?;
181        self.scoped_values
182            .get(scope)
183            .and_then(|values| values.get(key))
184            .and_then(|values| values.get(&indexed_value))
185    }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Hash)]
189enum PropertyIndexKey {
190    Null,
191    Bool(bool),
192    Int(i64),
193    Float(u64),
194    String(String),
195    List(Vec<PropertyIndexKey>),
196    Map(BTreeMap<String, PropertyIndexKey>),
197}
198
199impl PropertyIndexKey {
200    fn from_value(value: &PropertyValue) -> Option<Self> {
201        match value {
202            PropertyValue::Null => Some(Self::Null),
203            PropertyValue::Bool(v) => Some(Self::Bool(*v)),
204            PropertyValue::Int(v) => Some(Self::Int(*v)),
205            PropertyValue::Float(v) => {
206                if v.is_nan() {
207                    None
208                } else if *v == 0.0 {
209                    Some(Self::Float(0.0f64.to_bits()))
210                } else {
211                    Some(Self::Float(v.to_bits()))
212                }
213            }
214            PropertyValue::String(v) => Some(Self::String(v.clone())),
215            PropertyValue::List(values) => values
216                .iter()
217                .map(Self::from_value)
218                .collect::<Option<Vec<_>>>()
219                .map(Self::List),
220            PropertyValue::Map(values) => values
221                .iter()
222                .map(|(k, v)| Self::from_value(v).map(|indexed| (k.clone(), indexed)))
223                .collect::<Option<BTreeMap<_, _>>>()
224                .map(Self::Map),
225            // Temporal, spatial, and vector values have richer equality
226            // semantics and/or no stable hash representation in the storage
227            // crate today. Those continue to use the scan fallback.
228            PropertyValue::Date(_)
229            | PropertyValue::Time(_)
230            | PropertyValue::LocalTime(_)
231            | PropertyValue::DateTime(_)
232            | PropertyValue::LocalDateTime(_)
233            | PropertyValue::Duration(_)
234            | PropertyValue::Point(_)
235            | PropertyValue::Vector(_) => None,
236        }
237    }
238}
239
240#[derive(Default)]
241pub struct InMemoryGraph {
242    next_node_id: NodeId,
243    next_rel_id: RelationshipId,
244
245    nodes: BTreeMap<NodeId, NodeRecord>,
246    relationships: BTreeMap<RelationshipId, RelationshipRecord>,
247
248    // adjacency
249    outgoing: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
250    incoming: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
251
252    // secondary indexes
253    nodes_by_label: BTreeMap<String, BTreeSet<NodeId>>,
254    relationships_by_type: BTreeMap<String, BTreeSet<RelationshipId>>,
255    indexes: RwLock<PropertyIndexRegistry>,
256    active_node_property_indexes: AtomicUsize,
257    active_relationship_property_indexes: AtomicUsize,
258
259    /// Optional mutation observer. When `Some`, every committed mutation
260    /// fans out to this recorder *after* the in-memory state has been
261    /// updated. The recorder is not part of the graph's identity, so Clone
262    /// and snapshot restore both reset it to `None`.
263    recorder: Option<Arc<dyn MutationRecorder>>,
264}
265
266impl std::fmt::Debug for InMemoryGraph {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        f.debug_struct("InMemoryGraph")
269            .field("next_node_id", &self.next_node_id)
270            .field("next_rel_id", &self.next_rel_id)
271            .field("nodes", &self.nodes)
272            .field("relationships", &self.relationships)
273            .field("outgoing", &self.outgoing)
274            .field("incoming", &self.incoming)
275            .field("nodes_by_label", &self.nodes_by_label)
276            .field("relationships_by_type", &self.relationships_by_type)
277            .field("indexes", &self.indexes)
278            .field(
279                "active_node_property_indexes",
280                &self.active_node_property_index_count(),
281            )
282            .field(
283                "active_relationship_property_indexes",
284                &self.active_relationship_property_index_count(),
285            )
286            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
287            .finish()
288    }
289}
290
291impl Clone for InMemoryGraph {
292    fn clone(&self) -> Self {
293        // Deliberately drop the recorder on clone: a cloned store is a
294        // separate identity; it should not silently share the observer.
295        Self {
296            next_node_id: self.next_node_id,
297            next_rel_id: self.next_rel_id,
298            nodes: self.nodes.clone(),
299            relationships: self.relationships.clone(),
300            outgoing: self.outgoing.clone(),
301            incoming: self.incoming.clone(),
302            nodes_by_label: self.nodes_by_label.clone(),
303            relationships_by_type: self.relationships_by_type.clone(),
304            indexes: RwLock::new(if self.has_active_property_indexes() {
305                self.indexes_read().clone()
306            } else {
307                PropertyIndexRegistry::default()
308            }),
309            active_node_property_indexes: AtomicUsize::new(self.active_node_property_index_count()),
310            active_relationship_property_indexes: AtomicUsize::new(
311                self.active_relationship_property_index_count(),
312            ),
313            recorder: None,
314        }
315    }
316}
317
318impl InMemoryGraph {
319    pub fn new() -> Self {
320        Self::default()
321    }
322
323    pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
324        // BTreeMap/BTreeSet do not support capacity reservation.
325        Self::default()
326    }
327
328    pub fn contains_node(&self, node_id: NodeId) -> bool {
329        self.nodes.contains_key(&node_id)
330    }
331
332    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
333        self.relationships.contains_key(&rel_id)
334    }
335
336    /// Install (or clear) the mutation recorder. Passing `None` detaches any
337    /// currently-installed recorder. The recorder observes every committed
338    /// mutation *after* it has been applied.
339    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
340        self.recorder = recorder;
341    }
342
343    /// Handle to the currently-installed recorder, if any.
344    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
345        self.recorder.as_ref()
346    }
347
348    /// Emit a mutation event only if a recorder is installed. The event is
349    /// built lazily — callers pass a closure, so when no recorder is
350    /// attached we pay only a `None` check and the cost of constructing the
351    /// event (labels/properties clones) is avoided.
352    #[inline]
353    fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
354        if let Some(rec) = &self.recorder {
355            rec.record(&build());
356        }
357    }
358
359    fn alloc_node_id(&mut self) -> NodeId {
360        let id = self.next_node_id;
361        self.next_node_id += 1;
362        id
363    }
364
365    fn alloc_rel_id(&mut self) -> RelationshipId {
366        let id = self.next_rel_id;
367        self.next_rel_id += 1;
368        id
369    }
370
371    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
372        let next = id
373            .checked_add(1)
374            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
375        self.next_node_id = self.next_node_id.max(next);
376        Ok(())
377    }
378
379    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
380        let next = id
381            .checked_add(1)
382            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
383        self.next_rel_id = self.next_rel_id.max(next);
384        Ok(())
385    }
386
387    fn normalize_labels(labels: Vec<String>) -> Vec<String> {
388        let mut seen = BTreeSet::new();
389
390        labels
391            .into_iter()
392            .map(|s| s.trim().to_string())
393            .filter(|s| !s.is_empty())
394            .filter(|s| seen.insert(s.clone()))
395            .collect()
396    }
397
398    fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
399        self.nodes_by_label
400            .entry(label.to_string())
401            .or_default()
402            .insert(node_id);
403    }
404
405    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
406        if let Some(ids) = self.nodes_by_label.get_mut(label) {
407            ids.remove(&node_id);
408            if ids.is_empty() {
409                self.nodes_by_label.remove(label);
410            }
411        }
412    }
413
414    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
415        self.relationships_by_type
416            .entry(rel_type.to_string())
417            .or_default()
418            .insert(rel_id);
419    }
420
421    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
422        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
423            ids.remove(&rel_id);
424            if ids.is_empty() {
425                self.relationships_by_type.remove(rel_type);
426            }
427        }
428    }
429
430    fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
431        self.indexes
432            .read()
433            .unwrap_or_else(|poisoned| poisoned.into_inner())
434    }
435
436    fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
437        self.indexes
438            .write()
439            .unwrap_or_else(|poisoned| poisoned.into_inner())
440    }
441
442    fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
443        self.indexes
444            .get_mut()
445            .unwrap_or_else(|poisoned| poisoned.into_inner())
446    }
447
448    #[inline]
449    fn active_node_property_index_count(&self) -> usize {
450        self.active_node_property_indexes.load(Ordering::Relaxed)
451    }
452
453    #[inline]
454    fn active_relationship_property_index_count(&self) -> usize {
455        self.active_relationship_property_indexes
456            .load(Ordering::Relaxed)
457    }
458
459    #[inline]
460    fn has_active_property_indexes(&self) -> bool {
461        self.active_node_property_index_count() != 0
462            || self.active_relationship_property_index_count() != 0
463    }
464
465    fn ensure_node_property_index(&self, key: &str) {
466        {
467            let indexes = self.indexes_read();
468            if indexes.node_properties.is_active(key) {
469                return;
470            }
471        }
472
473        let mut indexes = self.indexes_write();
474        if indexes.node_properties.is_active(key) {
475            return;
476        }
477
478        for (id, node) in &self.nodes {
479            if let Some(value) = node.properties.get(key) {
480                indexes.node_properties.insert_with_scopes(
481                    *id,
482                    node.labels.iter().map(String::as_str),
483                    key,
484                    value,
485                );
486            }
487        }
488        if indexes.node_properties.activate(key) {
489            self.active_node_property_indexes
490                .fetch_add(1, Ordering::Relaxed);
491        }
492    }
493
494    fn ensure_relationship_property_index(&self, key: &str) {
495        {
496            let indexes = self.indexes_read();
497            if indexes.relationship_properties.is_active(key) {
498                return;
499            }
500        }
501
502        let mut indexes = self.indexes_write();
503        if indexes.relationship_properties.is_active(key) {
504            return;
505        }
506
507        for (id, rel) in &self.relationships {
508            if let Some(value) = rel.properties.get(key) {
509                indexes.relationship_properties.insert_with_scopes(
510                    *id,
511                    [rel.rel_type.as_str()],
512                    key,
513                    value,
514                );
515            }
516        }
517        if indexes.relationship_properties.activate(key) {
518            self.active_relationship_property_indexes
519                .fetch_add(1, Ordering::Relaxed);
520        }
521    }
522
523    fn rebuild_property_indexes(&mut self) {
524        let mut indexes = PropertyIndexRegistry::default();
525
526        for (id, node) in &self.nodes {
527            for (key, value) in &node.properties {
528                if PropertyIndexKey::from_value(value).is_some() {
529                    indexes.node_properties.activate(key);
530                    indexes.node_properties.insert_with_scopes(
531                        *id,
532                        node.labels.iter().map(String::as_str),
533                        key,
534                        value,
535                    );
536                }
537            }
538        }
539
540        for (id, rel) in &self.relationships {
541            for (key, value) in &rel.properties {
542                if PropertyIndexKey::from_value(value).is_some() {
543                    indexes.relationship_properties.activate(key);
544                    indexes.relationship_properties.insert_with_scopes(
545                        *id,
546                        [rel.rel_type.as_str()],
547                        key,
548                        value,
549                    );
550                }
551            }
552        }
553
554        let node_index_count = indexes.node_properties.active_keys.len();
555        let relationship_index_count = indexes.relationship_properties.active_keys.len();
556        *self.indexes_mut() = indexes;
557        self.active_node_property_indexes
558            .store(node_index_count, Ordering::Relaxed);
559        self.active_relationship_property_indexes
560            .store(relationship_index_count, Ordering::Relaxed);
561    }
562
563    fn index_node_property_eager<'a>(
564        &mut self,
565        node_id: NodeId,
566        labels: impl IntoIterator<Item = &'a str>,
567        key: &str,
568        value: &PropertyValue,
569    ) {
570        if PropertyIndexKey::from_value(value).is_none() {
571            return;
572        }
573
574        let activated = {
575            let indexes = self.indexes_mut();
576            let activated = indexes.node_properties.activate(key);
577            indexes
578                .node_properties
579                .insert_with_scopes(node_id, labels, key, value);
580            activated
581        };
582        if activated {
583            self.active_node_property_indexes
584                .fetch_add(1, Ordering::Relaxed);
585        }
586    }
587
588    fn index_relationship_property_eager<'a>(
589        &mut self,
590        rel_id: RelationshipId,
591        scopes: impl IntoIterator<Item = &'a str>,
592        key: &str,
593        value: &PropertyValue,
594    ) {
595        if PropertyIndexKey::from_value(value).is_none() {
596            return;
597        }
598
599        let activated = {
600            let indexes = self.indexes_mut();
601            let activated = indexes.relationship_properties.activate(key);
602            indexes
603                .relationship_properties
604                .insert_with_scopes(rel_id, scopes, key, value);
605            activated
606        };
607        if activated {
608            self.active_relationship_property_indexes
609                .fetch_add(1, Ordering::Relaxed);
610        }
611    }
612
613    fn index_node_properties_eager<'a>(
614        &mut self,
615        node_id: NodeId,
616        labels: impl IntoIterator<Item = &'a str> + Clone,
617        properties: &Properties,
618    ) {
619        for (key, value) in properties {
620            self.index_node_property_eager(node_id, labels.clone(), key, value);
621        }
622    }
623
624    fn index_relationship_properties_eager<'a>(
625        &mut self,
626        rel_id: RelationshipId,
627        scopes: impl IntoIterator<Item = &'a str> + Clone,
628        properties: &Properties,
629    ) {
630        for (key, value) in properties {
631            self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
632        }
633    }
634
635    fn index_node_property_if_active<'a>(
636        &mut self,
637        node_id: NodeId,
638        labels: impl IntoIterator<Item = &'a str>,
639        key: &str,
640        value: &PropertyValue,
641    ) {
642        if self.active_node_property_index_count() == 0 {
643            return;
644        }
645        let indexes = self.indexes_mut();
646        if indexes.node_properties.is_active(key) {
647            indexes
648                .node_properties
649                .insert_with_scopes(node_id, labels, key, value);
650        }
651    }
652
653    fn index_node_properties_if_active<'a>(
654        &mut self,
655        node_id: NodeId,
656        labels: impl IntoIterator<Item = &'a str> + Clone,
657        properties: &Properties,
658    ) {
659        if self.active_node_property_index_count() == 0 {
660            return;
661        }
662        let indexes = self.indexes_mut();
663        for (key, value) in properties {
664            if indexes.node_properties.is_active(key) {
665                indexes
666                    .node_properties
667                    .insert_with_scopes(node_id, labels.clone(), key, value);
668            }
669        }
670    }
671
672    fn unindex_node_property_if_active<'a>(
673        &mut self,
674        node_id: NodeId,
675        labels: impl IntoIterator<Item = &'a str>,
676        key: &str,
677        value: &PropertyValue,
678    ) {
679        if self.active_node_property_index_count() == 0 {
680            return;
681        }
682        let indexes = self.indexes_mut();
683        if indexes.node_properties.is_active(key) {
684            indexes
685                .node_properties
686                .remove_with_scopes(node_id, labels, key, value);
687        }
688    }
689
690    fn index_node_scope_properties_if_active(
691        &mut self,
692        node_id: NodeId,
693        scope: &str,
694        properties: &Properties,
695    ) {
696        if self.active_node_property_index_count() == 0 {
697            return;
698        }
699        let indexes = self.indexes_mut();
700        for (key, value) in properties {
701            if indexes.node_properties.is_active(key) {
702                indexes
703                    .node_properties
704                    .insert_scoped(node_id, scope, key, value);
705            }
706        }
707    }
708
709    fn unindex_node_scope_properties_if_active(
710        &mut self,
711        node_id: NodeId,
712        scope: &str,
713        properties: &Properties,
714    ) {
715        if self.active_node_property_index_count() == 0 {
716            return;
717        }
718        let indexes = self.indexes_mut();
719        for (key, value) in properties {
720            if indexes.node_properties.is_active(key) {
721                indexes
722                    .node_properties
723                    .remove_scoped(node_id, scope, key, value);
724            }
725        }
726    }
727
728    fn unindex_active_node_properties<'a>(
729        &mut self,
730        node_id: NodeId,
731        labels: impl IntoIterator<Item = &'a str> + Clone,
732        properties: &Properties,
733    ) {
734        if self.active_node_property_index_count() == 0 {
735            return;
736        }
737        let indexes = self.indexes_mut();
738        for (key, value) in properties {
739            if indexes.node_properties.is_active(key) {
740                indexes
741                    .node_properties
742                    .remove_with_scopes(node_id, labels.clone(), key, value);
743            }
744        }
745    }
746
747    fn index_relationship_property_if_active<'a>(
748        &mut self,
749        rel_id: RelationshipId,
750        scopes: impl IntoIterator<Item = &'a str>,
751        key: &str,
752        value: &PropertyValue,
753    ) {
754        if self.active_relationship_property_index_count() == 0 {
755            return;
756        }
757        let indexes = self.indexes_mut();
758        if indexes.relationship_properties.is_active(key) {
759            indexes
760                .relationship_properties
761                .insert_with_scopes(rel_id, scopes, key, value);
762        }
763    }
764
765    fn index_relationship_properties_if_active<'a>(
766        &mut self,
767        rel_id: RelationshipId,
768        scopes: impl IntoIterator<Item = &'a str> + Clone,
769        properties: &Properties,
770    ) {
771        if self.active_relationship_property_index_count() == 0 {
772            return;
773        }
774        let indexes = self.indexes_mut();
775        for (key, value) in properties {
776            if indexes.relationship_properties.is_active(key) {
777                indexes.relationship_properties.insert_with_scopes(
778                    rel_id,
779                    scopes.clone(),
780                    key,
781                    value,
782                );
783            }
784        }
785    }
786
787    fn unindex_relationship_property_if_active<'a>(
788        &mut self,
789        rel_id: RelationshipId,
790        scopes: impl IntoIterator<Item = &'a str>,
791        key: &str,
792        value: &PropertyValue,
793    ) {
794        if self.active_relationship_property_index_count() == 0 {
795            return;
796        }
797        let indexes = self.indexes_mut();
798        if indexes.relationship_properties.is_active(key) {
799            indexes
800                .relationship_properties
801                .remove_with_scopes(rel_id, scopes, key, value);
802        }
803    }
804
805    fn unindex_active_relationship_properties<'a>(
806        &mut self,
807        rel_id: RelationshipId,
808        scopes: impl IntoIterator<Item = &'a str> + Clone,
809        properties: &Properties,
810    ) {
811        if self.active_relationship_property_index_count() == 0 {
812            return;
813        }
814        let indexes = self.indexes_mut();
815        for (key, value) in properties {
816            if indexes.relationship_properties.is_active(key) {
817                indexes.relationship_properties.remove_with_scopes(
818                    rel_id,
819                    scopes.clone(),
820                    key,
821                    value,
822                );
823            }
824        }
825    }
826
827    fn scan_nodes_by_property(
828        &self,
829        label: Option<&str>,
830        key: &str,
831        value: &PropertyValue,
832    ) -> Vec<NodeRecord> {
833        let candidates: Box<dyn Iterator<Item = &NodeRecord> + '_> = match label {
834            Some(label) => Box::new(
835                self.nodes_by_label
836                    .get(label)
837                    .into_iter()
838                    .flat_map(|ids| ids.iter())
839                    .filter_map(|id| self.nodes.get(id)),
840            ),
841            None => Box::new(self.nodes.values()),
842        };
843
844        candidates
845            .filter(|node| node.properties.get(key) == Some(value))
846            .cloned()
847            .collect()
848    }
849
850    fn scan_relationships_by_property(
851        &self,
852        rel_type: Option<&str>,
853        key: &str,
854        value: &PropertyValue,
855    ) -> Vec<RelationshipRecord> {
856        let candidates: Box<dyn Iterator<Item = &RelationshipRecord> + '_> = match rel_type {
857            Some(rel_type) => Box::new(
858                self.relationships_by_type
859                    .get(rel_type)
860                    .into_iter()
861                    .flat_map(|ids| ids.iter())
862                    .filter_map(|id| self.relationships.get(id)),
863            ),
864            None => Box::new(self.relationships.values()),
865        };
866
867        candidates
868            .filter(|rel| rel.properties.get(key) == Some(value))
869            .cloned()
870            .collect()
871    }
872
873    fn attach_relationship(&mut self, rel: &RelationshipRecord) {
874        self.outgoing.entry(rel.src).or_default().insert(rel.id);
875        self.incoming.entry(rel.dst).or_default().insert(rel.id);
876        self.insert_relationship_type_index(rel.id, &rel.rel_type);
877    }
878
879    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
880        if let Some(ids) = self.outgoing.get_mut(&rel.src) {
881            ids.remove(&rel.id);
882            if ids.is_empty() {
883                self.outgoing.remove(&rel.src);
884            }
885        }
886
887        if let Some(ids) = self.incoming.get_mut(&rel.dst) {
888            ids.remove(&rel.id);
889            if ids.is_empty() {
890                self.incoming.remove(&rel.dst);
891            }
892        }
893
894        self.remove_relationship_type_index(rel.id, &rel.rel_type);
895    }
896
897    fn relationship_ids_for_direction(
898        &self,
899        node_id: NodeId,
900        direction: Direction,
901    ) -> Vec<RelationshipId> {
902        match direction {
903            Direction::Left => self
904                .incoming
905                .get(&node_id)
906                .map(|ids| ids.iter().copied().collect())
907                .unwrap_or_default(),
908
909            Direction::Right => self
910                .outgoing
911                .get(&node_id)
912                .map(|ids| ids.iter().copied().collect())
913                .unwrap_or_default(),
914
915            Direction::Undirected => {
916                let mut ids = BTreeSet::new();
917
918                if let Some(out) = self.outgoing.get(&node_id) {
919                    ids.extend(out.iter().copied());
920                }
921                if let Some(inc) = self.incoming.get(&node_id) {
922                    ids.extend(inc.iter().copied());
923                }
924
925                ids.into_iter().collect()
926            }
927        }
928    }
929
930    fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
931        if rel.src == node_id {
932            Some(rel.dst)
933        } else if rel.dst == node_id {
934            Some(rel.src)
935        } else {
936            None
937        }
938    }
939
940    fn has_incident_relationships(&self, node_id: NodeId) -> bool {
941        self.outgoing
942            .get(&node_id)
943            .map(|ids| !ids.is_empty())
944            .unwrap_or(false)
945            || self
946                .incoming
947                .get(&node_id)
948                .map(|ids| !ids.is_empty())
949                .unwrap_or(false)
950    }
951
952    fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
953        let mut rel_ids = BTreeSet::new();
954
955        if let Some(ids) = self.outgoing.get(&node_id) {
956            rel_ids.extend(ids.iter().copied());
957        }
958        if let Some(ids) = self.incoming.get(&node_id) {
959            rel_ids.extend(ids.iter().copied());
960        }
961
962        rel_ids
963    }
964
965    /// Replay a node creation using the id captured in a durable mutation
966    /// event. This intentionally does not emit a new mutation event: callers
967    /// must invoke it before installing a recorder on the graph.
968    #[doc(hidden)]
969    pub fn replay_create_node(
970        &mut self,
971        id: NodeId,
972        labels: Vec<String>,
973        properties: Properties,
974    ) -> Result<NodeRecord, String> {
975        if self.recorder.is_some() {
976            return Err(
977                "cannot replay node creation while a mutation recorder is installed".into(),
978            );
979        }
980        if self.nodes.contains_key(&id) {
981            return Err(format!("node id {id} already exists"));
982        }
983
984        let labels = Self::normalize_labels(labels);
985        let node = NodeRecord {
986            id,
987            labels: labels.clone(),
988            properties,
989        };
990
991        for label in &labels {
992            self.insert_node_label_index(id, label);
993        }
994        self.index_node_properties_eager(
995            id,
996            node.labels.iter().map(String::as_str),
997            &node.properties,
998        );
999        self.nodes.insert(id, node.clone());
1000        self.outgoing.entry(id).or_default();
1001        self.incoming.entry(id).or_default();
1002        self.bump_next_node_id_past(id)?;
1003
1004        Ok(node)
1005    }
1006
1007    /// Replay a relationship creation using the id captured in a durable
1008    /// mutation event. This intentionally does not emit a new mutation event:
1009    /// callers must invoke it before installing a recorder on the graph.
1010    #[doc(hidden)]
1011    pub fn replay_create_relationship(
1012        &mut self,
1013        id: RelationshipId,
1014        src: NodeId,
1015        dst: NodeId,
1016        rel_type: &str,
1017        properties: Properties,
1018    ) -> Result<RelationshipRecord, String> {
1019        if self.recorder.is_some() {
1020            return Err(
1021                "cannot replay relationship creation while a mutation recorder is installed".into(),
1022            );
1023        }
1024        if self.relationships.contains_key(&id) {
1025            return Err(format!("relationship id {id} already exists"));
1026        }
1027        if !self.nodes.contains_key(&src) {
1028            return Err(format!(
1029                "relationship {id} references missing source node {src}"
1030            ));
1031        }
1032        if !self.nodes.contains_key(&dst) {
1033            return Err(format!(
1034                "relationship {id} references missing target node {dst}"
1035            ));
1036        }
1037
1038        let trimmed = rel_type.trim();
1039        if trimmed.is_empty() {
1040            return Err(format!("relationship {id} has an empty type"));
1041        }
1042
1043        let rel = RelationshipRecord {
1044            id,
1045            src,
1046            dst,
1047            rel_type: trimmed.to_string(),
1048            properties,
1049        };
1050
1051        self.attach_relationship(&rel);
1052        self.index_relationship_properties_eager(id, [rel.rel_type.as_str()], &rel.properties);
1053        self.relationships.insert(id, rel.clone());
1054        self.bump_next_rel_id_past(id)?;
1055
1056        Ok(rel)
1057    }
1058}
1059
1060impl GraphStorage for InMemoryGraph {
1061    // ---------- Required primitives ----------
1062
1063    fn contains_node(&self, id: NodeId) -> bool {
1064        self.nodes.contains_key(&id)
1065    }
1066
1067    fn node(&self, id: NodeId) -> Option<NodeRecord> {
1068        self.nodes.get(&id).cloned()
1069    }
1070
1071    fn all_node_ids(&self) -> Vec<NodeId> {
1072        self.nodes.keys().copied().collect()
1073    }
1074
1075    fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
1076        match self.nodes_by_label.get(label) {
1077            Some(ids) => ids.iter().copied().collect(),
1078            None => Vec::new(),
1079        }
1080    }
1081
1082    fn contains_relationship(&self, id: RelationshipId) -> bool {
1083        self.relationships.contains_key(&id)
1084    }
1085
1086    fn relationship(&self, id: RelationshipId) -> Option<RelationshipRecord> {
1087        self.relationships.get(&id).cloned()
1088    }
1089
1090    fn all_rel_ids(&self) -> Vec<RelationshipId> {
1091        self.relationships.keys().copied().collect()
1092    }
1093
1094    fn rel_ids_by_type(&self, rel_type: &str) -> Vec<RelationshipId> {
1095        match self.relationships_by_type.get(rel_type) {
1096            Some(ids) => ids.iter().copied().collect(),
1097            None => Vec::new(),
1098        }
1099    }
1100
1101    fn relationship_endpoints(&self, id: RelationshipId) -> Option<(NodeId, NodeId)> {
1102        self.relationships.get(&id).map(|r| (r.src, r.dst))
1103    }
1104
1105    fn expand_ids(
1106        &self,
1107        node_id: NodeId,
1108        direction: Direction,
1109        types: &[String],
1110    ) -> Vec<(RelationshipId, NodeId)> {
1111        if !self.nodes.contains_key(&node_id) {
1112            return Vec::new();
1113        }
1114
1115        // Fast path: no type filter — just join adjacency + rel endpoints.
1116        if types.is_empty() {
1117            return self
1118                .relationship_ids_for_direction(node_id, direction)
1119                .into_iter()
1120                .filter_map(|rel_id| {
1121                    let rel = self.relationships.get(&rel_id)?;
1122                    let other_id = Self::other_endpoint(rel, node_id)?;
1123                    Some((rel_id, other_id))
1124                })
1125                .collect();
1126        }
1127
1128        // Type-filtered: borrow rel_type straight from the stored record.
1129        // For small type lists (the common case) the linear scan beats a
1130        // BTreeSet; we keep it allocation-free.
1131        self.relationship_ids_for_direction(node_id, direction)
1132            .into_iter()
1133            .filter_map(|rel_id| {
1134                let rel = self.relationships.get(&rel_id)?;
1135                if !types.iter().any(|t| t == &rel.rel_type) {
1136                    return None;
1137                }
1138                let other_id = Self::other_endpoint(rel, node_id)?;
1139                Some((rel_id, other_id))
1140            })
1141            .collect()
1142    }
1143
1144    fn all_labels(&self) -> Vec<String> {
1145        self.nodes_by_label.keys().cloned().collect()
1146    }
1147
1148    fn all_relationship_types(&self) -> Vec<String> {
1149        self.relationships_by_type.keys().cloned().collect()
1150    }
1151
1152    // ---------- Optimization hooks: zero-clone borrow access ----------
1153
1154    fn with_node<F, R>(&self, id: NodeId, f: F) -> Option<R>
1155    where
1156        F: FnOnce(&NodeRecord) -> R,
1157        Self: Sized,
1158    {
1159        self.nodes.get(&id).map(f)
1160    }
1161
1162    fn with_relationship<F, R>(&self, id: RelationshipId, f: F) -> Option<R>
1163    where
1164        F: FnOnce(&RelationshipRecord) -> R,
1165        Self: Sized,
1166    {
1167        self.relationships.get(&id).map(f)
1168    }
1169
1170    // ---------- Overrides: counts + existence ----------
1171
1172    fn node_count(&self) -> usize {
1173        self.nodes.len()
1174    }
1175
1176    fn relationship_count(&self) -> usize {
1177        self.relationships.len()
1178    }
1179
1180    fn has_node(&self, id: NodeId) -> bool {
1181        self.nodes.contains_key(&id)
1182    }
1183
1184    fn has_relationship(&self, id: RelationshipId) -> bool {
1185        self.relationships.contains_key(&id)
1186    }
1187
1188    // ---------- Overrides: record-returning scans (direct iteration) ----------
1189
1190    fn all_nodes(&self) -> Vec<NodeRecord> {
1191        self.nodes.values().cloned().collect()
1192    }
1193
1194    fn nodes_by_label(&self, label: &str) -> Vec<NodeRecord> {
1195        self.nodes_by_label
1196            .get(label)
1197            .into_iter()
1198            .flat_map(|ids| ids.iter())
1199            .filter_map(|id| self.nodes.get(id).cloned())
1200            .collect()
1201    }
1202
1203    fn all_relationships(&self) -> Vec<RelationshipRecord> {
1204        self.relationships.values().cloned().collect()
1205    }
1206
1207    fn relationships_by_type(&self, rel_type: &str) -> Vec<RelationshipRecord> {
1208        self.relationships_by_type
1209            .get(rel_type)
1210            .into_iter()
1211            .flat_map(|ids| ids.iter())
1212            .filter_map(|id| self.relationships.get(id).cloned())
1213            .collect()
1214    }
1215
1216    fn find_nodes_by_property(
1217        &self,
1218        label: Option<&str>,
1219        key: &str,
1220        value: &PropertyValue,
1221    ) -> Vec<NodeRecord>
1222    where
1223        Self: Sized,
1224    {
1225        if PropertyIndexKey::from_value(value).is_none() {
1226            return self.scan_nodes_by_property(label, key, value);
1227        }
1228
1229        self.ensure_node_property_index(key);
1230        let indexes = self.indexes_read();
1231
1232        match label {
1233            Some(label) => {
1234                let Some(ids) = indexes.node_properties.scoped_ids_for(label, key, value) else {
1235                    return Vec::new();
1236                };
1237                ids.iter()
1238                    .filter_map(|id| self.nodes.get(id).cloned())
1239                    .collect()
1240            }
1241            None => indexes
1242                .node_properties
1243                .ids_for(key, value)
1244                .into_iter()
1245                .flat_map(|ids| ids.iter())
1246                .filter_map(|id| self.nodes.get(id).cloned())
1247                .collect(),
1248        }
1249    }
1250
1251    fn find_node_ids_by_property(
1252        &self,
1253        label: Option<&str>,
1254        key: &str,
1255        value: &PropertyValue,
1256    ) -> Vec<NodeId>
1257    where
1258        Self: Sized,
1259    {
1260        if PropertyIndexKey::from_value(value).is_none() {
1261            return self
1262                .scan_nodes_by_property(label, key, value)
1263                .into_iter()
1264                .map(|n| n.id)
1265                .collect();
1266        }
1267
1268        self.ensure_node_property_index(key);
1269        let indexes = self.indexes_read();
1270
1271        match label {
1272            Some(label) => indexes
1273                .node_properties
1274                .scoped_ids_for(label, key, value)
1275                .map(|ids| ids.iter().copied().collect())
1276                .unwrap_or_default(),
1277            None => indexes
1278                .node_properties
1279                .ids_for(key, value)
1280                .map(|ids| ids.iter().copied().collect())
1281                .unwrap_or_default(),
1282        }
1283    }
1284
1285    fn find_relationships_by_property(
1286        &self,
1287        rel_type: Option<&str>,
1288        key: &str,
1289        value: &PropertyValue,
1290    ) -> Vec<RelationshipRecord>
1291    where
1292        Self: Sized,
1293    {
1294        if PropertyIndexKey::from_value(value).is_none() {
1295            return self.scan_relationships_by_property(rel_type, key, value);
1296        }
1297
1298        self.ensure_relationship_property_index(key);
1299        let indexes = self.indexes_read();
1300
1301        match rel_type {
1302            Some(rel_type) => {
1303                let Some(ids) = indexes
1304                    .relationship_properties
1305                    .scoped_ids_for(rel_type, key, value)
1306                else {
1307                    return Vec::new();
1308                };
1309                ids.iter()
1310                    .filter_map(|id| self.relationships.get(id).cloned())
1311                    .collect()
1312            }
1313            None => indexes
1314                .relationship_properties
1315                .ids_for(key, value)
1316                .into_iter()
1317                .flat_map(|ids| ids.iter())
1318                .filter_map(|id| self.relationships.get(id).cloned())
1319                .collect(),
1320        }
1321    }
1322
1323    fn find_relationship_ids_by_property(
1324        &self,
1325        rel_type: Option<&str>,
1326        key: &str,
1327        value: &PropertyValue,
1328    ) -> Vec<RelationshipId>
1329    where
1330        Self: Sized,
1331    {
1332        if PropertyIndexKey::from_value(value).is_none() {
1333            return self
1334                .scan_relationships_by_property(rel_type, key, value)
1335                .into_iter()
1336                .map(|r| r.id)
1337                .collect();
1338        }
1339
1340        self.ensure_relationship_property_index(key);
1341        let indexes = self.indexes_read();
1342
1343        match rel_type {
1344            Some(rel_type) => indexes
1345                .relationship_properties
1346                .scoped_ids_for(rel_type, key, value)
1347                .map(|ids| ids.iter().copied().collect())
1348                .unwrap_or_default(),
1349            None => indexes
1350                .relationship_properties
1351                .ids_for(key, value)
1352                .map(|ids| ids.iter().copied().collect())
1353                .unwrap_or_default(),
1354        }
1355    }
1356
1357    fn node_exists_with_label_and_property(
1358        &self,
1359        label: &str,
1360        key: &str,
1361        value: &PropertyValue,
1362    ) -> bool
1363    where
1364        Self: Sized,
1365    {
1366        if PropertyIndexKey::from_value(value).is_none() {
1367            return !self
1368                .scan_nodes_by_property(Some(label), key, value)
1369                .is_empty();
1370        }
1371
1372        self.ensure_node_property_index(key);
1373        let indexes = self.indexes_read();
1374        indexes
1375            .node_properties
1376            .scoped_ids_for(label, key, value)
1377            .map(|ids| !ids.is_empty())
1378            .unwrap_or(false)
1379    }
1380
1381    fn relationship_exists_with_type_and_property(
1382        &self,
1383        rel_type: &str,
1384        key: &str,
1385        value: &PropertyValue,
1386    ) -> bool
1387    where
1388        Self: Sized,
1389    {
1390        if PropertyIndexKey::from_value(value).is_none() {
1391            return !self
1392                .scan_relationships_by_property(Some(rel_type), key, value)
1393                .is_empty();
1394        }
1395
1396        self.ensure_relationship_property_index(key);
1397        let indexes = self.indexes_read();
1398        indexes
1399            .relationship_properties
1400            .scoped_ids_for(rel_type, key, value)
1401            .map(|ids| !ids.is_empty())
1402            .unwrap_or(false)
1403    }
1404
1405    // ---------- Overrides: traversal (direct adjacency) ----------
1406
1407    fn relationship_ids_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipId> {
1408        self.relationship_ids_for_direction(node_id, direction)
1409    }
1410
1411    fn outgoing_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
1412        self.outgoing
1413            .get(&node_id)
1414            .into_iter()
1415            .flat_map(|ids| ids.iter())
1416            .filter_map(|id| self.relationships.get(id).cloned())
1417            .collect()
1418    }
1419
1420    fn incoming_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
1421        self.incoming
1422            .get(&node_id)
1423            .into_iter()
1424            .flat_map(|ids| ids.iter())
1425            .filter_map(|id| self.relationships.get(id).cloned())
1426            .collect()
1427    }
1428
1429    fn degree(&self, node_id: NodeId, direction: Direction) -> usize {
1430        match direction {
1431            Direction::Right => self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0),
1432            Direction::Left => self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0),
1433            Direction::Undirected => {
1434                self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0)
1435                    + self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0)
1436            }
1437        }
1438    }
1439
1440    fn expand(
1441        &self,
1442        node_id: NodeId,
1443        direction: Direction,
1444        types: &[String],
1445    ) -> Vec<(RelationshipRecord, NodeRecord)> {
1446        if !self.nodes.contains_key(&node_id) {
1447            return Vec::new();
1448        }
1449
1450        let type_filter: Option<BTreeSet<&str>> = if types.is_empty() {
1451            None
1452        } else {
1453            Some(types.iter().map(String::as_str).collect())
1454        };
1455
1456        self.relationship_ids_for_direction(node_id, direction)
1457            .into_iter()
1458            .filter_map(|rel_id| self.relationships.get(&rel_id))
1459            .filter(|rel| {
1460                type_filter
1461                    .as_ref()
1462                    .map(|allowed| allowed.contains(rel.rel_type.as_str()))
1463                    .unwrap_or(true)
1464            })
1465            .filter_map(|rel| {
1466                let other_id = Self::other_endpoint(rel, node_id)?;
1467                let other = self.nodes.get(&other_id)?;
1468                Some((rel.clone(), other.clone()))
1469            })
1470            .collect()
1471    }
1472    // ---------- Overrides: schema introspection ----------
1473
1474    fn all_node_property_keys(&self) -> Vec<String> {
1475        let mut keys = BTreeSet::new();
1476        for node in self.nodes.values() {
1477            for key in node.properties.keys() {
1478                keys.insert(key.clone());
1479            }
1480        }
1481        keys.into_iter().collect()
1482    }
1483
1484    fn all_relationship_property_keys(&self) -> Vec<String> {
1485        let mut keys = BTreeSet::new();
1486        for rel in self.relationships.values() {
1487            for key in rel.properties.keys() {
1488                keys.insert(key.clone());
1489            }
1490        }
1491        keys.into_iter().collect()
1492    }
1493
1494    fn label_property_keys(&self, label: &str) -> Vec<String> {
1495        let mut keys = BTreeSet::new();
1496
1497        if let Some(ids) = self.nodes_by_label.get(label) {
1498            for id in ids {
1499                if let Some(node) = self.nodes.get(id) {
1500                    for key in node.properties.keys() {
1501                        keys.insert(key.clone());
1502                    }
1503                }
1504            }
1505        }
1506
1507        keys.into_iter().collect()
1508    }
1509
1510    fn rel_type_property_keys(&self, rel_type: &str) -> Vec<String> {
1511        let mut keys = BTreeSet::new();
1512
1513        if let Some(ids) = self.relationships_by_type.get(rel_type) {
1514            for id in ids {
1515                if let Some(rel) = self.relationships.get(id) {
1516                    for key in rel.properties.keys() {
1517                        keys.insert(key.clone());
1518                    }
1519                }
1520            }
1521        }
1522
1523        keys.into_iter().collect()
1524    }
1525}
1526
1527impl BorrowedGraphStorage for InMemoryGraph {
1528    fn node_ref(&self, id: NodeId) -> Option<&NodeRecord> {
1529        self.nodes.get(&id)
1530    }
1531
1532    fn relationship_ref(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
1533        self.relationships.get(&id)
1534    }
1535
1536    fn node_refs(&self) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
1537        Box::new(self.nodes.values())
1538    }
1539
1540    fn node_refs_by_label(&self, label: &str) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
1541        Box::new(
1542            self.nodes_by_label
1543                .get(label)
1544                .into_iter()
1545                .flat_map(|ids| ids.iter())
1546                .filter_map(|id| self.nodes.get(id)),
1547        )
1548    }
1549
1550    fn relationship_refs(&self) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
1551        Box::new(self.relationships.values())
1552    }
1553
1554    fn relationship_refs_by_type(
1555        &self,
1556        rel_type: &str,
1557    ) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
1558        Box::new(
1559            self.relationships_by_type
1560                .get(rel_type)
1561                .into_iter()
1562                .flat_map(|ids| ids.iter())
1563                .filter_map(|id| self.relationships.get(id)),
1564        )
1565    }
1566}
1567
1568impl GraphStorageMut for InMemoryGraph {
1569    fn create_node(&mut self, labels: Vec<String>, properties: Properties) -> NodeRecord {
1570        let id = self.alloc_node_id();
1571        let labels = Self::normalize_labels(labels);
1572
1573        let node = NodeRecord {
1574            id,
1575            labels: labels.clone(),
1576            properties,
1577        };
1578
1579        for label in &labels {
1580            self.insert_node_label_index(id, label);
1581        }
1582        if self.active_node_property_index_count() != 0 {
1583            self.index_node_properties_if_active(
1584                id,
1585                node.labels.iter().map(String::as_str),
1586                &node.properties,
1587            );
1588        }
1589
1590        self.nodes.insert(id, node.clone());
1591
1592        self.outgoing.entry(id).or_default();
1593        self.incoming.entry(id).or_default();
1594
1595        self.emit(|| MutationEvent::CreateNode {
1596            id,
1597            labels: node.labels.clone(),
1598            properties: node.properties.clone(),
1599        });
1600
1601        node
1602    }
1603
1604    fn create_relationship(
1605        &mut self,
1606        src: NodeId,
1607        dst: NodeId,
1608        rel_type: &str,
1609        properties: Properties,
1610    ) -> Option<RelationshipRecord> {
1611        if !self.nodes.contains_key(&src) || !self.nodes.contains_key(&dst) {
1612            return None;
1613        }
1614
1615        let trimmed = rel_type.trim();
1616        if trimmed.is_empty() {
1617            return None;
1618        }
1619
1620        let id = self.alloc_rel_id();
1621        let rel = RelationshipRecord {
1622            id,
1623            src,
1624            dst,
1625            rel_type: trimmed.to_string(),
1626            properties,
1627        };
1628
1629        self.attach_relationship(&rel);
1630        if self.active_relationship_property_index_count() != 0 {
1631            self.index_relationship_properties_if_active(
1632                id,
1633                [rel.rel_type.as_str()],
1634                &rel.properties,
1635            );
1636        }
1637        self.relationships.insert(id, rel.clone());
1638
1639        self.emit(|| MutationEvent::CreateRelationship {
1640            id,
1641            src,
1642            dst,
1643            rel_type: rel.rel_type.clone(),
1644            properties: rel.properties.clone(),
1645        });
1646
1647        Some(rel)
1648    }
1649
1650    fn set_node_property(&mut self, node_id: NodeId, key: String, value: PropertyValue) -> bool {
1651        if !self.nodes.contains_key(&node_id) {
1652            return false;
1653        }
1654
1655        let recorder_active = self.recorder.is_some();
1656        let (stored_key, stored_value) = if recorder_active {
1657            (Some(key.clone()), Some(value.clone()))
1658        } else {
1659            (None, None)
1660        };
1661
1662        let index_active = self.active_node_property_index_count() != 0
1663            && self.indexes_mut().node_properties.is_active(&key);
1664        let (old, labels) = match self.nodes.get_mut(&node_id) {
1665            Some(node) => {
1666                let labels = if index_active {
1667                    Some(node.labels.clone())
1668                } else {
1669                    None
1670                };
1671                (node.properties.insert(key.clone(), value.clone()), labels)
1672            }
1673            None => return false,
1674        };
1675        if let Some(labels) = labels.as_ref() {
1676            if let Some(old) = old.as_ref() {
1677                self.unindex_node_property_if_active(
1678                    node_id,
1679                    labels.iter().map(String::as_str),
1680                    &key,
1681                    old,
1682                );
1683            }
1684            self.index_node_property_if_active(
1685                node_id,
1686                labels.iter().map(String::as_str),
1687                &key,
1688                &value,
1689            );
1690        }
1691
1692        self.emit(|| MutationEvent::SetNodeProperty {
1693            node_id,
1694            key: stored_key.unwrap(),
1695            value: stored_value.unwrap(),
1696        });
1697
1698        true
1699    }
1700
1701    fn remove_node_property(&mut self, node_id: NodeId, key: &str) -> bool {
1702        let removed = match self.nodes.get_mut(&node_id) {
1703            Some(node) => node.properties.remove(key),
1704            None => return false,
1705        };
1706        let Some(removed) = removed else {
1707            return false;
1708        };
1709
1710        let labels = if self.active_node_property_index_count() != 0
1711            && self.indexes_mut().node_properties.is_active(key)
1712        {
1713            self.nodes.get(&node_id).map(|node| node.labels.clone())
1714        } else {
1715            None
1716        };
1717
1718        if let Some(labels) = labels.as_ref() {
1719            self.unindex_node_property_if_active(
1720                node_id,
1721                labels.iter().map(String::as_str),
1722                key,
1723                &removed,
1724            );
1725        }
1726
1727        self.emit(|| MutationEvent::RemoveNodeProperty {
1728            node_id,
1729            key: key.to_string(),
1730        });
1731
1732        true
1733    }
1734
1735    fn add_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
1736        let label = label.trim();
1737        if label.is_empty() {
1738            return false;
1739        }
1740
1741        let index_has_active_keys = self.active_node_property_index_count() != 0;
1742        let mut scoped_properties = None;
1743        let applied = match self.nodes.get_mut(&node_id) {
1744            Some(node) => {
1745                if node.labels.iter().any(|l| l == label) {
1746                    return false;
1747                }
1748
1749                node.labels.push(label.to_string());
1750                if index_has_active_keys {
1751                    scoped_properties = Some(node.properties.clone());
1752                }
1753                self.insert_node_label_index(node_id, label);
1754                true
1755            }
1756            None => false,
1757        };
1758        if applied {
1759            if let Some(properties) = scoped_properties.as_ref() {
1760                self.index_node_scope_properties_if_active(node_id, label, properties);
1761            }
1762            self.emit(|| MutationEvent::AddNodeLabel {
1763                node_id,
1764                label: label.to_string(),
1765            });
1766        }
1767        applied
1768    }
1769
1770    fn remove_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
1771        let index_has_active_keys = self.active_node_property_index_count() != 0;
1772        let mut scoped_properties = None;
1773        let applied = match self.nodes.get_mut(&node_id) {
1774            Some(node) => {
1775                let original_len = node.labels.len();
1776                node.labels.retain(|l| l != label);
1777
1778                if node.labels.len() != original_len {
1779                    if index_has_active_keys {
1780                        scoped_properties = Some(node.properties.clone());
1781                    }
1782                    self.remove_node_label_index(node_id, label);
1783                    true
1784                } else {
1785                    false
1786                }
1787            }
1788            None => false,
1789        };
1790        if applied {
1791            if let Some(properties) = scoped_properties.as_ref() {
1792                self.unindex_node_scope_properties_if_active(node_id, label, properties);
1793            }
1794            self.emit(|| MutationEvent::RemoveNodeLabel {
1795                node_id,
1796                label: label.to_string(),
1797            });
1798        }
1799        applied
1800    }
1801
1802    fn set_relationship_property(
1803        &mut self,
1804        rel_id: RelationshipId,
1805        key: String,
1806        value: PropertyValue,
1807    ) -> bool {
1808        if !self.relationships.contains_key(&rel_id) {
1809            return false;
1810        }
1811
1812        let recorder_active = self.recorder.is_some();
1813        let (stored_key, stored_value) = if recorder_active {
1814            (Some(key.clone()), Some(value.clone()))
1815        } else {
1816            (None, None)
1817        };
1818
1819        let index_active = self.active_relationship_property_index_count() != 0
1820            && self.indexes_mut().relationship_properties.is_active(&key);
1821        let (old, rel_type) = match self.relationships.get_mut(&rel_id) {
1822            Some(rel) => {
1823                let rel_type = if index_active {
1824                    Some(rel.rel_type.clone())
1825                } else {
1826                    None
1827                };
1828                (rel.properties.insert(key.clone(), value.clone()), rel_type)
1829            }
1830            None => return false,
1831        };
1832        if let Some(rel_type) = rel_type.as_deref() {
1833            if let Some(old) = old.as_ref() {
1834                self.unindex_relationship_property_if_active(rel_id, [rel_type], &key, old);
1835            }
1836            self.index_relationship_property_if_active(rel_id, [rel_type], &key, &value);
1837        }
1838
1839        self.emit(|| MutationEvent::SetRelationshipProperty {
1840            rel_id,
1841            key: stored_key.unwrap(),
1842            value: stored_value.unwrap(),
1843        });
1844
1845        true
1846    }
1847
1848    fn remove_relationship_property(&mut self, rel_id: RelationshipId, key: &str) -> bool {
1849        let removed = match self.relationships.get_mut(&rel_id) {
1850            Some(rel) => rel.properties.remove(key),
1851            None => return false,
1852        };
1853        let Some(removed) = removed else {
1854            return false;
1855        };
1856
1857        let rel_type = if self.active_relationship_property_index_count() != 0
1858            && self.indexes_mut().relationship_properties.is_active(key)
1859        {
1860            self.relationships
1861                .get(&rel_id)
1862                .map(|rel| rel.rel_type.clone())
1863        } else {
1864            None
1865        };
1866
1867        if let Some(rel_type) = rel_type.as_deref() {
1868            self.unindex_relationship_property_if_active(rel_id, [rel_type], key, &removed);
1869        }
1870
1871        self.emit(|| MutationEvent::RemoveRelationshipProperty {
1872            rel_id,
1873            key: key.to_string(),
1874        });
1875
1876        true
1877    }
1878
1879    fn delete_relationship(&mut self, rel_id: RelationshipId) -> bool {
1880        let applied = match self.relationships.remove(&rel_id) {
1881            Some(rel) => {
1882                self.detach_relationship_indexes(&rel);
1883                self.unindex_active_relationship_properties(
1884                    rel_id,
1885                    [rel.rel_type.as_str()],
1886                    &rel.properties,
1887                );
1888                true
1889            }
1890            None => false,
1891        };
1892        if applied {
1893            self.emit(|| MutationEvent::DeleteRelationship { rel_id });
1894        }
1895        applied
1896    }
1897
1898    fn delete_node(&mut self, node_id: NodeId) -> bool {
1899        if !self.nodes.contains_key(&node_id) {
1900            return false;
1901        }
1902
1903        if self.has_incident_relationships(node_id) {
1904            return false;
1905        }
1906
1907        let node = match self.nodes.remove(&node_id) {
1908            Some(node) => node,
1909            None => return false,
1910        };
1911
1912        for label in &node.labels {
1913            self.remove_node_label_index(node_id, label);
1914        }
1915        self.unindex_active_node_properties(
1916            node_id,
1917            node.labels.iter().map(String::as_str),
1918            &node.properties,
1919        );
1920
1921        self.outgoing.remove(&node_id);
1922        self.incoming.remove(&node_id);
1923
1924        self.emit(|| MutationEvent::DeleteNode { node_id });
1925
1926        true
1927    }
1928
1929    fn detach_delete_node(&mut self, node_id: NodeId) -> bool {
1930        if !self.nodes.contains_key(&node_id) {
1931            return false;
1932        }
1933
1934        let rel_ids: Vec<_> = self
1935            .incident_relationship_ids(node_id)
1936            .into_iter()
1937            .collect();
1938
1939        // We deliberately fire per-relationship DeleteRelationship events
1940        // here (via `delete_relationship`) and a DetachDeleteNode event at
1941        // the end. A WAL replayer that sees DetachDeleteNode can ignore the
1942        // preceding DeleteRelationship events — or, equivalently, replay
1943        // them and the DetachDeleteNode becomes a no-op on the remaining
1944        // (now-empty) node. The emit-before-delete choice costs one extra
1945        // event per mutation but keeps the replay contract simple:
1946        // "apply every event in order".
1947        for rel_id in rel_ids {
1948            let _ = self.delete_relationship(rel_id);
1949        }
1950
1951        if self.delete_node(node_id) {
1952            self.emit(|| MutationEvent::DetachDeleteNode { node_id });
1953            true
1954        } else {
1955            false
1956        }
1957    }
1958
1959    fn clear(&mut self) {
1960        // Keep the recorder across clear so observers can see the Clear
1961        // event plus whatever follows. Matches WAL semantics where the log
1962        // is the source of truth across a truncation.
1963        let recorder = self.recorder.take();
1964        *self = Self::default();
1965        self.recorder = recorder;
1966        self.emit(|| MutationEvent::Clear);
1967    }
1968}
1969
1970// ---------------------------------------------------------------------------
1971// Snapshotable
1972// ---------------------------------------------------------------------------
1973
1974impl Snapshotable for InMemoryGraph {
1975    fn save_snapshot<W: std::io::Write>(&self, writer: W) -> Result<SnapshotMeta, SnapshotError> {
1976        let payload = SnapshotPayload {
1977            next_node_id: self.next_node_id,
1978            next_rel_id: self.next_rel_id,
1979            nodes: self.nodes.values().cloned().collect(),
1980            relationships: self.relationships.values().cloned().collect(),
1981        };
1982        write_snapshot(writer, &payload, None)
1983    }
1984
1985    fn save_checkpoint<W: std::io::Write>(
1986        &self,
1987        writer: W,
1988        wal_lsn: u64,
1989    ) -> Result<SnapshotMeta, SnapshotError> {
1990        let payload = SnapshotPayload {
1991            next_node_id: self.next_node_id,
1992            next_rel_id: self.next_rel_id,
1993            nodes: self.nodes.values().cloned().collect(),
1994            relationships: self.relationships.values().cloned().collect(),
1995        };
1996        write_snapshot(writer, &payload, Some(wal_lsn))
1997    }
1998
1999    fn load_snapshot<R: std::io::Read>(
2000        &mut self,
2001        reader: R,
2002    ) -> Result<SnapshotMeta, SnapshotError> {
2003        let (payload, meta) = read_snapshot(reader)?;
2004
2005        // Build the restored graph in a fresh local instance and only
2006        // commit it into `self` at the very end. If a panic fires mid-
2007        // rebuild (e.g. OOM on a HashMap grow) the caller's graph is
2008        // untouched — we never observe a half-populated store.
2009        let mut rebuilt = Self {
2010            next_node_id: payload.next_node_id,
2011            next_rel_id: payload.next_rel_id,
2012            ..Self::default()
2013        };
2014
2015        // Insert nodes + rebuild label index + seed adjacency slots.
2016        for node in payload.nodes {
2017            let id = node.id;
2018            let labels = node.labels.clone();
2019            rebuilt.nodes.insert(id, node);
2020            for label in &labels {
2021                rebuilt.insert_node_label_index(id, label);
2022            }
2023            rebuilt.outgoing.entry(id).or_default();
2024            rebuilt.incoming.entry(id).or_default();
2025        }
2026
2027        // Insert relationships + rebuild adjacency + type index.
2028        for rel in payload.relationships {
2029            rebuilt.attach_relationship(&rel);
2030            rebuilt.relationships.insert(rel.id, rel);
2031        }
2032        rebuilt.rebuild_property_indexes();
2033
2034        // Preserve the existing recorder across the swap — observers of the
2035        // store's identity should not be silently detached by a restore,
2036        // same policy as `clear()`.
2037        rebuilt.recorder = self.recorder.take();
2038        *self = rebuilt;
2039
2040        Ok(meta)
2041    }
2042}
2043
2044#[cfg(test)]
2045mod tests {
2046    use super::*;
2047
2048    fn props(pairs: &[(&str, PropertyValue)]) -> Properties {
2049        pairs
2050            .iter()
2051            .map(|(k, v)| ((*k).to_string(), v.clone()))
2052            .collect()
2053    }
2054
2055    #[test]
2056    fn create_and_lookup_nodes() {
2057        let mut g = InMemoryGraph::new();
2058
2059        let a = g.create_node(
2060            vec!["Person".into(), "Employee".into()],
2061            props(&[("name", PropertyValue::String("Alice".into()))]),
2062        );
2063        let b = g.create_node(
2064            vec!["Person".into()],
2065            props(&[("name", PropertyValue::String("Bob".into()))]),
2066        );
2067
2068        assert_eq!(a.id, 0);
2069        assert_eq!(b.id, 1);
2070
2071        assert_eq!(g.all_nodes().len(), 2);
2072        assert_eq!(g.nodes_by_label("Person").len(), 2);
2073        assert_eq!(g.nodes_by_label("Employee").len(), 1);
2074        assert_eq!(BorrowedGraphStorage::node_refs(&g).count(), 2);
2075        assert_eq!(
2076            BorrowedGraphStorage::node_refs_by_label(&g, "Person").count(),
2077            2
2078        );
2079        assert!(g.node_has_label(a.id, "Person"));
2080        assert_eq!(
2081            g.node_property(a.id, "name"),
2082            Some(PropertyValue::String("Alice".into()))
2083        );
2084    }
2085
2086    #[test]
2087    fn create_and_expand_relationships() {
2088        let mut g = InMemoryGraph::new();
2089
2090        let a = g.create_node(vec!["Person".into()], Properties::new());
2091        let b = g.create_node(vec!["Person".into()], Properties::new());
2092        let c = g.create_node(vec!["Company".into()], Properties::new());
2093
2094        let r1 = g
2095            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2096            .unwrap();
2097        let r2 = g
2098            .create_relationship(a.id, c.id, "WORKS_AT", Properties::new())
2099            .unwrap();
2100
2101        assert_eq!(g.all_relationships().len(), 2);
2102        assert_eq!(g.relationships_by_type("KNOWS").len(), 1);
2103        assert_eq!(BorrowedGraphStorage::relationship_refs(&g).count(), 2);
2104        assert_eq!(
2105            BorrowedGraphStorage::relationship_refs_by_type(&g, "KNOWS").count(),
2106            1
2107        );
2108        assert_eq!(g.outgoing_relationships(a.id).len(), 2);
2109        assert_eq!(g.incoming_relationships(b.id).len(), 1);
2110
2111        let knows = g.expand(a.id, Direction::Right, &[String::from("KNOWS")]);
2112        assert_eq!(knows.len(), 1);
2113        assert_eq!(knows[0].0.id, r1.id);
2114        assert_eq!(knows[0].1.id, b.id);
2115
2116        let undirected = g.expand(a.id, Direction::Undirected, &[]);
2117        assert_eq!(undirected.len(), 2);
2118
2119        assert_eq!(g.relationship(r2.id).unwrap().dst, c.id);
2120    }
2121
2122    #[test]
2123    fn incoming_and_outgoing_are_distinct() {
2124        let mut g = InMemoryGraph::new();
2125
2126        let a = g.create_node(vec!["Person".into()], Properties::new());
2127        let b = g.create_node(vec!["Person".into()], Properties::new());
2128        let c = g.create_node(vec!["Person".into()], Properties::new());
2129
2130        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
2131            .unwrap();
2132        g.create_relationship(c.id, a.id, "LIKES", Properties::new())
2133            .unwrap();
2134
2135        let outgoing = g.expand(a.id, Direction::Right, &[]);
2136        let incoming = g.expand(a.id, Direction::Left, &[]);
2137
2138        assert_eq!(outgoing.len(), 1);
2139        assert_eq!(incoming.len(), 1);
2140        assert_eq!(outgoing[0].1.id, b.id);
2141        assert_eq!(incoming[0].1.id, c.id);
2142    }
2143
2144    #[test]
2145    fn set_and_remove_properties() {
2146        let mut g = InMemoryGraph::new();
2147
2148        let n = g.create_node(vec!["Person".into()], Properties::new());
2149        assert!(g.set_node_property(n.id, "age".into(), PropertyValue::Int(42)));
2150        assert_eq!(g.node_property(n.id, "age"), Some(PropertyValue::Int(42)));
2151        assert!(g.remove_node_property(n.id, "age"));
2152        assert_eq!(g.node_property(n.id, "age"), None);
2153
2154        let m = g.create_node(vec!["Person".into()], Properties::new());
2155        let r = g
2156            .create_relationship(n.id, m.id, "KNOWS", Properties::new())
2157            .unwrap();
2158
2159        assert!(g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020)));
2160        assert_eq!(
2161            g.relationship_property(r.id, "since"),
2162            Some(PropertyValue::Int(2020))
2163        );
2164        assert!(g.remove_relationship_property(r.id, "since"));
2165        assert_eq!(g.relationship_property(r.id, "since"), None);
2166    }
2167
2168    #[test]
2169    fn node_property_index_tracks_create_set_remove_and_delete() {
2170        let mut g = InMemoryGraph::new();
2171        let alice = g.create_node(
2172            vec!["Person".into()],
2173            props(&[("name", PropertyValue::String("Alice".into()))]),
2174        );
2175        let other_alice = g.create_node(
2176            vec!["Robot".into()],
2177            props(&[("name", PropertyValue::String("Alice".into()))]),
2178        );
2179        let bob = g.create_node(
2180            vec!["Person".into()],
2181            props(&[("name", PropertyValue::String("Bob".into()))]),
2182        );
2183
2184        let alice_value = PropertyValue::String("Alice".into());
2185        assert_eq!(
2186            g.find_nodes_by_property(Some("Person"), "name", &alice_value)
2187                .into_iter()
2188                .map(|n| n.id)
2189                .collect::<Vec<_>>(),
2190            vec![alice.id]
2191        );
2192        assert!(g.node_exists_with_label_and_property("Robot", "name", &alice_value));
2193
2194        assert!(g.set_node_property(
2195            other_alice.id,
2196            "name".into(),
2197            PropertyValue::String("Alicia".into())
2198        ));
2199        assert_eq!(
2200            g.find_nodes_by_property(None, "name", &alice_value)
2201                .into_iter()
2202                .map(|n| n.id)
2203                .collect::<Vec<_>>(),
2204            vec![alice.id]
2205        );
2206
2207        assert!(g.remove_node_property(alice.id, "name"));
2208        assert!(!g.node_exists_with_label_and_property("Person", "name", &alice_value));
2209
2210        assert!(g.delete_node(bob.id));
2211        assert!(!g.node_exists_with_label_and_property(
2212            "Person",
2213            "name",
2214            &PropertyValue::String("Bob".into())
2215        ));
2216    }
2217
2218    #[test]
2219    fn node_property_index_activates_on_lookup_and_tracks_later_create() {
2220        let mut g = InMemoryGraph::new();
2221        let first = g.create_node(
2222            vec!["Person".into()],
2223            props(&[("name", PropertyValue::String("Alice".into()))]),
2224        );
2225
2226        assert!(!g.indexes_read().node_properties.is_active("name"));
2227
2228        let alice = PropertyValue::String("Alice".into());
2229        assert_eq!(
2230            g.find_nodes_by_property(Some("Person"), "name", &alice)
2231                .into_iter()
2232                .map(|node| node.id)
2233                .collect::<Vec<_>>(),
2234            vec![first.id]
2235        );
2236        assert!(g.indexes_read().node_properties.is_active("name"));
2237
2238        let second = g.create_node(
2239            vec!["Person".into()],
2240            props(&[("name", PropertyValue::String("Alice".into()))]),
2241        );
2242        assert_eq!(
2243            g.find_nodes_by_property(Some("Person"), "name", &alice)
2244                .into_iter()
2245                .map(|node| node.id)
2246                .collect::<Vec<_>>(),
2247            vec![first.id, second.id]
2248        );
2249    }
2250
2251    #[test]
2252    fn property_indexes_activate_on_lookup_after_set_for_new_keys() {
2253        let mut g = InMemoryGraph::new();
2254        let node = g.create_node(vec!["Person".into()], Properties::new());
2255
2256        assert!(!g.indexes_read().node_properties.is_active("name"));
2257        assert!(g.set_node_property(
2258            node.id,
2259            "name".into(),
2260            PropertyValue::String("Alice".into())
2261        ));
2262        assert!(!g.indexes_read().node_properties.is_active("name"));
2263        assert_eq!(
2264            g.find_node_ids_by_property(
2265                Some("Person"),
2266                "name",
2267                &PropertyValue::String("Alice".into())
2268            ),
2269            vec![node.id]
2270        );
2271        assert!(g.indexes_read().node_properties.is_active("name"));
2272
2273        let other = g.create_node(vec!["Person".into()], Properties::new());
2274        let rel = g
2275            .create_relationship(node.id, other.id, "KNOWS", Properties::new())
2276            .unwrap();
2277        assert!(!g.indexes_read().relationship_properties.is_active("since"));
2278        assert!(g.set_relationship_property(rel.id, "since".into(), PropertyValue::Int(2020)));
2279        assert!(!g.indexes_read().relationship_properties.is_active("since"));
2280        assert_eq!(
2281            g.find_relationship_ids_by_property(Some("KNOWS"), "since", &PropertyValue::Int(2020)),
2282            vec![rel.id]
2283        );
2284        assert!(g.indexes_read().relationship_properties.is_active("since"));
2285    }
2286
2287    #[test]
2288    fn replay_create_eagerly_activates_property_indexes() {
2289        let mut g = InMemoryGraph::new();
2290        let alice = g
2291            .replay_create_node(
2292                0,
2293                vec!["Person".into()],
2294                props(&[("name", PropertyValue::String("Alice".into()))]),
2295            )
2296            .unwrap();
2297        let bob = g
2298            .replay_create_node(
2299                1,
2300                vec!["Person".into()],
2301                props(&[("name", PropertyValue::String("Bob".into()))]),
2302            )
2303            .unwrap();
2304
2305        assert!(g.indexes_read().node_properties.is_active("name"));
2306        assert_eq!(
2307            g.find_node_ids_by_property(
2308                Some("Person"),
2309                "name",
2310                &PropertyValue::String("Alice".into())
2311            ),
2312            vec![alice.id]
2313        );
2314
2315        let rel = g
2316            .replay_create_relationship(
2317                0,
2318                alice.id,
2319                bob.id,
2320                "KNOWS",
2321                props(&[("since", PropertyValue::Int(2020))]),
2322            )
2323            .unwrap();
2324
2325        assert!(g.indexes_read().relationship_properties.is_active("since"));
2326        assert_eq!(
2327            g.find_relationship_ids_by_property(Some("KNOWS"), "since", &PropertyValue::Int(2020)),
2328            vec![rel.id]
2329        );
2330    }
2331
2332    #[test]
2333    fn node_property_index_tracks_scoped_label_buckets() {
2334        let mut g = InMemoryGraph::new();
2335        let alice = g.create_node(
2336            vec!["Person".into()],
2337            props(&[("name", PropertyValue::String("Alice".into()))]),
2338        );
2339        let robot = g.create_node(
2340            vec!["Robot".into()],
2341            props(&[("name", PropertyValue::String("Alice".into()))]),
2342        );
2343        let alice_value = PropertyValue::String("Alice".into());
2344
2345        assert_eq!(
2346            g.find_node_ids_by_property(Some("Person"), "name", &alice_value),
2347            vec![alice.id]
2348        );
2349        assert_eq!(
2350            g.indexes_read()
2351                .node_properties
2352                .scoped_ids_for("Person", "name", &alice_value)
2353                .cloned()
2354                .unwrap_or_default()
2355                .into_iter()
2356                .collect::<Vec<_>>(),
2357            vec![alice.id]
2358        );
2359
2360        assert!(g.add_node_label(robot.id, "Employee"));
2361        assert_eq!(
2362            g.find_node_ids_by_property(Some("Employee"), "name", &alice_value),
2363            vec![robot.id]
2364        );
2365
2366        assert!(g.remove_node_label(alice.id, "Person"));
2367        assert!(g
2368            .find_node_ids_by_property(Some("Person"), "name", &alice_value)
2369            .is_empty());
2370        assert_eq!(
2371            g.find_node_ids_by_property(None, "name", &alice_value),
2372            vec![alice.id, robot.id]
2373        );
2374    }
2375
2376    #[test]
2377    fn relationship_property_index_tracks_create_set_remove_and_delete() {
2378        let mut g = InMemoryGraph::new();
2379        let a = g.create_node(vec!["Person".into()], Properties::new());
2380        let b = g.create_node(vec!["Person".into()], Properties::new());
2381        let c = g.create_node(vec!["Person".into()], Properties::new());
2382        let first = g
2383            .create_relationship(
2384                a.id,
2385                b.id,
2386                "KNOWS",
2387                props(&[("since", PropertyValue::Int(2020))]),
2388            )
2389            .unwrap();
2390        let second = g
2391            .create_relationship(
2392                b.id,
2393                c.id,
2394                "LIKES",
2395                props(&[("since", PropertyValue::Int(2020))]),
2396            )
2397            .unwrap();
2398
2399        let year = PropertyValue::Int(2020);
2400        assert_eq!(
2401            g.find_relationships_by_property(Some("KNOWS"), "since", &year)
2402                .into_iter()
2403                .map(|r| r.id)
2404                .collect::<Vec<_>>(),
2405            vec![first.id]
2406        );
2407        assert!(g.relationship_exists_with_type_and_property("LIKES", "since", &year));
2408
2409        assert!(g.set_relationship_property(second.id, "since".into(), PropertyValue::Int(2021)));
2410        assert_eq!(
2411            g.find_relationships_by_property(None, "since", &year)
2412                .into_iter()
2413                .map(|r| r.id)
2414                .collect::<Vec<_>>(),
2415            vec![first.id]
2416        );
2417
2418        assert!(g.remove_relationship_property(first.id, "since"));
2419        assert!(!g.relationship_exists_with_type_and_property("KNOWS", "since", &year));
2420
2421        assert!(g.delete_relationship(second.id));
2422        assert!(!g.relationship_exists_with_type_and_property(
2423            "LIKES",
2424            "since",
2425            &PropertyValue::Int(2021)
2426        ));
2427    }
2428
2429    #[test]
2430    fn property_index_falls_back_for_unhashed_values() {
2431        let mut g = InMemoryGraph::new();
2432        let date = PropertyValue::Date(crate::temporal::LoraDate::new(2026, 4, 26).unwrap());
2433        let n = g.create_node(vec!["Event".into()], props(&[("day", date.clone())]));
2434
2435        // Dates are intentionally not hash-indexed yet, so this exercises the
2436        // scan fallback path rather than the secondary index.
2437        assert_eq!(
2438            g.find_nodes_by_property(Some("Event"), "day", &date)
2439                .into_iter()
2440                .map(|node| node.id)
2441                .collect::<Vec<_>>(),
2442            vec![n.id]
2443        );
2444    }
2445
2446    #[test]
2447    fn delete_requires_detach() {
2448        let mut g = InMemoryGraph::new();
2449
2450        let a = g.create_node(vec!["Person".into()], Properties::new());
2451        let b = g.create_node(vec!["Person".into()], Properties::new());
2452        let r = g
2453            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2454            .unwrap();
2455
2456        assert!(!g.delete_node(a.id));
2457        assert!(g.delete_relationship(r.id));
2458        assert!(g.delete_node(a.id));
2459        assert!(g.node(a.id).is_none());
2460    }
2461
2462    #[test]
2463    fn detach_delete_removes_incident_relationships() {
2464        let mut g = InMemoryGraph::new();
2465
2466        let a = g.create_node(vec!["Person".into()], Properties::new());
2467        let b = g.create_node(vec!["Person".into()], Properties::new());
2468        let c = g.create_node(vec!["Person".into()], Properties::new());
2469
2470        let r1 = g
2471            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2472            .unwrap();
2473        let r2 = g
2474            .create_relationship(c.id, a.id, "LIKES", Properties::new())
2475            .unwrap();
2476
2477        assert!(g.detach_delete_node(a.id));
2478        assert!(g.node(a.id).is_none());
2479        assert!(g.relationship(r1.id).is_none());
2480        assert!(g.relationship(r2.id).is_none());
2481        assert_eq!(g.all_relationships().len(), 0);
2482    }
2483
2484    #[test]
2485    fn duplicate_labels_are_normalized_on_create() {
2486        let mut g = InMemoryGraph::new();
2487
2488        let n = g.create_node(
2489            vec!["Person".into(), "Person".into(), "Admin".into()],
2490            Properties::new(),
2491        );
2492
2493        assert_eq!(n.labels, vec!["Person".to_string(), "Admin".to_string()]);
2494        assert_eq!(g.nodes_by_label("Person").len(), 1);
2495        assert_eq!(g.nodes_by_label("Admin").len(), 1);
2496    }
2497
2498    #[test]
2499    fn empty_labels_are_ignored() {
2500        let mut g = InMemoryGraph::new();
2501
2502        let n = g.create_node(
2503            vec!["Person".into(), "".into(), "   ".into()],
2504            Properties::new(),
2505        );
2506
2507        assert_eq!(n.labels, vec!["Person".to_string()]);
2508    }
2509
2510    #[test]
2511    fn empty_relationship_type_is_rejected() {
2512        let mut g = InMemoryGraph::new();
2513
2514        let a = g.create_node(vec!["A".into()], Properties::new());
2515        let b = g.create_node(vec!["B".into()], Properties::new());
2516
2517        assert!(g
2518            .create_relationship(a.id, b.id, "", Properties::new())
2519            .is_none());
2520    }
2521
2522    #[test]
2523    fn storage_schema_helpers_work() {
2524        let mut g = InMemoryGraph::new();
2525
2526        let a = g.create_node(
2527            vec!["Person".into()],
2528            props(&[("name", PropertyValue::String("Alice".into()))]),
2529        );
2530        let b = g.create_node(
2531            vec!["Company".into()],
2532            props(&[("title", PropertyValue::String("Acme".into()))]),
2533        );
2534
2535        g.create_relationship(
2536            a.id,
2537            b.id,
2538            "WORKS_AT",
2539            props(&[("since", PropertyValue::Int(2020))]),
2540        )
2541        .unwrap();
2542
2543        assert!(g.has_label_name("Person"));
2544        assert!(g.has_relationship_type_name("WORKS_AT"));
2545        assert!(g.has_property_key("name"));
2546        assert!(g.has_property_key("since"));
2547        assert!(g.label_has_property_key("Person", "name"));
2548        assert!(g.rel_type_has_property_key("WORKS_AT", "since"));
2549    }
2550
2551    #[test]
2552    fn clear_resets_the_graph() {
2553        let mut g = InMemoryGraph::new();
2554        let a = g.create_node(vec!["Person".into()], Properties::new());
2555        let b = g.create_node(vec!["Person".into()], Properties::new());
2556        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
2557            .unwrap();
2558
2559        assert_eq!(g.node_count(), 2);
2560        assert_eq!(g.relationship_count(), 1);
2561
2562        g.clear();
2563
2564        assert_eq!(g.node_count(), 0);
2565        assert_eq!(g.relationship_count(), 0);
2566        assert_eq!(g.all_labels().len(), 0);
2567    }
2568
2569    #[test]
2570    fn snapshot_roundtrip_preserves_graph_state() {
2571        let mut original = InMemoryGraph::new();
2572        let a = original.create_node(
2573            vec!["Person".into()],
2574            props(&[("name", PropertyValue::String("Alice".into()))]),
2575        );
2576        let b = original.create_node(
2577            vec!["Person".into()],
2578            props(&[("name", PropertyValue::String("Bob".into()))]),
2579        );
2580        let r = original
2581            .create_relationship(
2582                a.id,
2583                b.id,
2584                "KNOWS",
2585                props(&[("since", PropertyValue::Int(2020))]),
2586            )
2587            .unwrap();
2588
2589        let mut buf = Vec::new();
2590        let save_meta = original.save_snapshot(&mut buf).unwrap();
2591        assert_eq!(save_meta.node_count, 2);
2592        assert_eq!(save_meta.relationship_count, 1);
2593        assert_eq!(save_meta.wal_lsn, None);
2594
2595        let mut restored = InMemoryGraph::new();
2596        let load_meta = restored.load_snapshot(&buf[..]).unwrap();
2597        assert_eq!(load_meta, save_meta);
2598        assert!(restored.indexes_read().node_properties.is_active("name"));
2599        assert!(restored
2600            .indexes_read()
2601            .relationship_properties
2602            .is_active("since"));
2603
2604        assert_eq!(restored.node_count(), 2);
2605        assert_eq!(restored.relationship_count(), 1);
2606        assert_eq!(
2607            restored.node_property(a.id, "name"),
2608            Some(PropertyValue::String("Alice".into()))
2609        );
2610        assert_eq!(
2611            restored.relationship_property(r.id, "since"),
2612            Some(PropertyValue::Int(2020))
2613        );
2614
2615        // Adjacency + label index were rebuilt on load.
2616        assert_eq!(restored.outgoing_relationships(a.id).len(), 1);
2617        assert_eq!(restored.nodes_by_label("Person").len(), 2);
2618        assert!(restored.node_exists_with_label_and_property(
2619            "Person",
2620            "name",
2621            &PropertyValue::String("Alice".into())
2622        ));
2623        assert!(restored.relationship_exists_with_type_and_property(
2624            "KNOWS",
2625            "since",
2626            &PropertyValue::Int(2020)
2627        ));
2628
2629        // Counters carry over so new IDs don't collide with pre-snapshot IDs.
2630        let c = restored.create_node(vec!["Person".into()], Properties::new());
2631        assert_eq!(c.id, b.id + 1);
2632    }
2633
2634    #[test]
2635    fn mutation_recorder_observes_every_committed_mutation() {
2636        use std::sync::Mutex;
2637
2638        #[derive(Default)]
2639        struct CapturingRecorder {
2640            events: Mutex<Vec<MutationEvent>>,
2641        }
2642
2643        impl MutationRecorder for CapturingRecorder {
2644            fn record(&self, event: &MutationEvent) {
2645                self.events.lock().unwrap().push(event.clone());
2646            }
2647        }
2648
2649        let recorder = Arc::new(CapturingRecorder::default());
2650        let mut g = InMemoryGraph::new();
2651        g.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
2652
2653        let a = g.create_node(vec!["Person".into()], Properties::new());
2654        let b = g.create_node(vec!["Person".into()], Properties::new());
2655        let r = g
2656            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2657            .unwrap();
2658        g.set_node_property(a.id, "name".into(), PropertyValue::String("Alice".into()));
2659        g.remove_node_property(a.id, "name");
2660        g.add_node_label(a.id, "Admin");
2661        g.remove_node_label(a.id, "Admin");
2662        g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020));
2663        g.remove_relationship_property(r.id, "since");
2664        g.detach_delete_node(a.id);
2665        g.clear();
2666
2667        let events = recorder.events.lock().unwrap().clone();
2668        assert!(matches!(events[0], MutationEvent::CreateNode { .. }));
2669        assert!(matches!(events[1], MutationEvent::CreateNode { .. }));
2670        assert!(matches!(
2671            events[2],
2672            MutationEvent::CreateRelationship { .. }
2673        ));
2674        assert!(matches!(events[3], MutationEvent::SetNodeProperty { .. }));
2675        assert!(matches!(
2676            events[4],
2677            MutationEvent::RemoveNodeProperty { .. }
2678        ));
2679        assert!(matches!(events[5], MutationEvent::AddNodeLabel { .. }));
2680        assert!(matches!(events[6], MutationEvent::RemoveNodeLabel { .. }));
2681        assert!(matches!(
2682            events[7],
2683            MutationEvent::SetRelationshipProperty { .. }
2684        ));
2685        assert!(matches!(
2686            events[8],
2687            MutationEvent::RemoveRelationshipProperty { .. }
2688        ));
2689        // detach_delete_node composes three kinds of events: one
2690        // DeleteRelationship per incident edge, one DeleteNode for the node
2691        // itself, and a final DetachDeleteNode marker. A WAL replayer can
2692        // either apply every step or recognise the marker and skip forward.
2693        assert!(matches!(
2694            events[9],
2695            MutationEvent::DeleteRelationship { .. }
2696        ));
2697        assert!(matches!(events[10], MutationEvent::DeleteNode { .. }));
2698        assert!(matches!(events[11], MutationEvent::DetachDeleteNode { .. }));
2699        assert!(matches!(events.last(), Some(MutationEvent::Clear)));
2700
2701        // Failed mutations (invalid id) do not emit events.
2702        let before = recorder.events.lock().unwrap().len();
2703        assert!(!g.set_node_property(9999, "x".into(), PropertyValue::Int(0)));
2704        assert_eq!(recorder.events.lock().unwrap().len(), before);
2705    }
2706
2707    #[test]
2708    fn snapshot_load_resets_but_keeps_recorder() {
2709        use std::sync::Mutex;
2710
2711        struct CountingRecorder(Mutex<usize>);
2712        impl MutationRecorder for CountingRecorder {
2713            fn record(&self, _: &MutationEvent) {
2714                *self.0.lock().unwrap() += 1;
2715            }
2716        }
2717
2718        let counter: Arc<dyn MutationRecorder> = Arc::new(CountingRecorder(Mutex::new(0)));
2719        let mut g = InMemoryGraph::new();
2720        g.set_mutation_recorder(Some(counter));
2721        g.create_node(vec!["A".into()], Properties::new());
2722
2723        let mut buf = Vec::new();
2724        g.save_snapshot(&mut buf).unwrap();
2725
2726        // Load into the same graph — recorder should survive, store state
2727        // should be replaced by the snapshot contents.
2728        g.load_snapshot(&buf[..]).unwrap();
2729        assert!(g.mutation_recorder().is_some());
2730        assert_eq!(g.node_count(), 1);
2731
2732        // Subsequent mutations still feed the recorder.
2733        g.create_node(vec!["B".into()], Properties::new());
2734        // 1 for the initial A + 1 for the post-load B. The restore path
2735        // itself does not emit events (that's a snapshot, not a mutation).
2736    }
2737}