Skip to main content

lora_store/
memory.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::{Arc, RwLock, RwLockWriteGuard};
4
5use lora_ast::Direction;
6
7use crate::snapshot::{read_snapshot, write_snapshot, SNAPSHOT_FORMAT_VERSION};
8use crate::{
9    BorrowedGraphStorage, GraphStorage, GraphStorageMut, LoraBinary, MutationEvent,
10    MutationRecorder, NodeId, NodeRecord, Properties, PropertyValue, RelationshipId,
11    RelationshipRecord, SnapshotError, SnapshotMeta, SnapshotPayload, Snapshotable,
12};
13
14type PropertyValueBuckets = HashMap<PropertyIndexKey, BTreeSet<u64>>;
15type PropertyIndex = HashMap<String, PropertyValueBuckets>;
16type ScopedPropertyIndex = HashMap<String, PropertyIndex>;
17
18#[derive(Default)]
19struct PropertyIndexRegistry {
20    node_properties: PropertyIndexState,
21    relationship_properties: PropertyIndexState,
22}
23
24impl std::fmt::Debug for PropertyIndexRegistry {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("PropertyIndexRegistry")
27            .field("node_properties", &self.node_properties)
28            .field("relationship_properties", &self.relationship_properties)
29            .finish()
30    }
31}
32
33impl Clone for PropertyIndexRegistry {
34    fn clone(&self) -> Self {
35        Self {
36            node_properties: self.node_properties.clone(),
37            relationship_properties: self.relationship_properties.clone(),
38        }
39    }
40}
41
42#[derive(Debug, Default, Clone)]
43struct PropertyIndexState {
44    active_keys: BTreeSet<String>,
45    values: PropertyIndex,
46    scoped_values: ScopedPropertyIndex,
47}
48
49impl PropertyIndexState {
50    fn is_active(&self, key: &str) -> bool {
51        self.active_keys.contains(key)
52    }
53
54    fn activate(&mut self, key: &str) -> bool {
55        self.active_keys.insert(key.to_string())
56    }
57
58    fn insert_value(
59        values: &mut PropertyIndex,
60        entity_id: u64,
61        key: &str,
62        value: PropertyIndexKey,
63    ) {
64        values
65            .entry(key.to_string())
66            .or_default()
67            .entry(value)
68            .or_default()
69            .insert(entity_id);
70    }
71
72    fn remove_value(
73        values: &mut PropertyIndex,
74        entity_id: u64,
75        key: &str,
76        value: &PropertyIndexKey,
77    ) {
78        let mut remove_key = false;
79        if let Some(buckets) = values.get_mut(key) {
80            if let Some(ids) = buckets.get_mut(value) {
81                ids.remove(&entity_id);
82                if ids.is_empty() {
83                    buckets.remove(value);
84                }
85            }
86            remove_key = buckets.is_empty();
87        }
88        if remove_key {
89            values.remove(key);
90        }
91    }
92
93    fn insert_scoped(&mut self, entity_id: u64, scope: &str, key: &str, value: &PropertyValue) {
94        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
95            return;
96        };
97
98        Self::insert_value(
99            self.scoped_values.entry(scope.to_string()).or_default(),
100            entity_id,
101            key,
102            indexed_value,
103        );
104    }
105
106    fn insert_with_scopes<'a>(
107        &mut self,
108        entity_id: u64,
109        scopes: impl IntoIterator<Item = &'a str>,
110        key: &str,
111        value: &PropertyValue,
112    ) {
113        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
114            return;
115        };
116
117        Self::insert_value(&mut self.values, entity_id, key, indexed_value.clone());
118        for scope in scopes {
119            Self::insert_value(
120                self.scoped_values.entry(scope.to_string()).or_default(),
121                entity_id,
122                key,
123                indexed_value.clone(),
124            );
125        }
126    }
127
128    fn remove_scoped(&mut self, entity_id: u64, scope: &str, key: &str, value: &PropertyValue) {
129        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
130            return;
131        };
132
133        let mut remove_scope = false;
134        if let Some(values) = self.scoped_values.get_mut(scope) {
135            Self::remove_value(values, entity_id, key, &indexed_value);
136            remove_scope = values.is_empty();
137        }
138        if remove_scope {
139            self.scoped_values.remove(scope);
140        }
141    }
142
143    fn remove_with_scopes<'a>(
144        &mut self,
145        entity_id: u64,
146        scopes: impl IntoIterator<Item = &'a str>,
147        key: &str,
148        value: &PropertyValue,
149    ) {
150        let Some(indexed_value) = PropertyIndexKey::from_value(value) else {
151            return;
152        };
153
154        Self::remove_value(&mut self.values, entity_id, key, &indexed_value);
155        for scope in scopes {
156            let mut remove_scope = false;
157            if let Some(values) = self.scoped_values.get_mut(scope) {
158                Self::remove_value(values, entity_id, key, &indexed_value);
159                remove_scope = values.is_empty();
160            }
161            if remove_scope {
162                self.scoped_values.remove(scope);
163            }
164        }
165    }
166
167    fn ids_for(&self, key: &str, value: &PropertyValue) -> Option<&BTreeSet<u64>> {
168        let indexed_value = PropertyIndexKey::from_value(value)?;
169        self.values
170            .get(key)
171            .and_then(|values| values.get(&indexed_value))
172    }
173
174    fn scoped_ids_for(
175        &self,
176        scope: &str,
177        key: &str,
178        value: &PropertyValue,
179    ) -> Option<&BTreeSet<u64>> {
180        let indexed_value = PropertyIndexKey::from_value(value)?;
181        self.scoped_values
182            .get(scope)
183            .and_then(|values| values.get(key))
184            .and_then(|values| values.get(&indexed_value))
185    }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, Hash)]
189enum PropertyIndexKey {
190    Null,
191    Bool(bool),
192    Int(i64),
193    Float(u64),
194    String(String),
195    Binary(LoraBinary),
196    List(Vec<PropertyIndexKey>),
197    Map(BTreeMap<String, PropertyIndexKey>),
198}
199
200impl PropertyIndexKey {
201    fn from_value(value: &PropertyValue) -> Option<Self> {
202        match value {
203            PropertyValue::Null => Some(Self::Null),
204            PropertyValue::Bool(v) => Some(Self::Bool(*v)),
205            PropertyValue::Int(v) => Some(Self::Int(*v)),
206            PropertyValue::Float(v) => {
207                if v.is_nan() {
208                    None
209                } else if *v == 0.0 {
210                    Some(Self::Float(0.0f64.to_bits()))
211                } else {
212                    Some(Self::Float(v.to_bits()))
213                }
214            }
215            PropertyValue::String(v) => Some(Self::String(v.clone())),
216            PropertyValue::Binary(v) => Some(Self::Binary(v.clone())),
217            PropertyValue::List(values) => values
218                .iter()
219                .map(Self::from_value)
220                .collect::<Option<Vec<_>>>()
221                .map(Self::List),
222            PropertyValue::Map(values) => values
223                .iter()
224                .map(|(k, v)| Self::from_value(v).map(|indexed| (k.clone(), indexed)))
225                .collect::<Option<BTreeMap<_, _>>>()
226                .map(Self::Map),
227            // Temporal, spatial, and vector values have richer equality
228            // semantics and/or no stable hash representation in the storage
229            // crate today. Those continue to use the scan fallback.
230            PropertyValue::Date(_)
231            | PropertyValue::Time(_)
232            | PropertyValue::LocalTime(_)
233            | PropertyValue::DateTime(_)
234            | PropertyValue::LocalDateTime(_)
235            | PropertyValue::Duration(_)
236            | PropertyValue::Point(_)
237            | PropertyValue::Vector(_) => None,
238        }
239    }
240}
241
242#[derive(Default)]
243pub struct InMemoryGraph {
244    next_node_id: NodeId,
245    next_rel_id: RelationshipId,
246
247    nodes: BTreeMap<NodeId, NodeRecord>,
248    relationships: BTreeMap<RelationshipId, RelationshipRecord>,
249
250    // adjacency
251    outgoing: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
252    incoming: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
253
254    // secondary indexes
255    nodes_by_label: BTreeMap<String, BTreeSet<NodeId>>,
256    relationships_by_type: BTreeMap<String, BTreeSet<RelationshipId>>,
257    indexes: RwLock<PropertyIndexRegistry>,
258    active_node_property_indexes: AtomicUsize,
259    active_relationship_property_indexes: AtomicUsize,
260
261    /// Optional mutation observer. When `Some`, every committed mutation
262    /// fans out to this recorder *after* the in-memory state has been
263    /// updated. The recorder is not part of the graph's identity, so Clone
264    /// and snapshot restore both reset it to `None`.
265    recorder: Option<Arc<dyn MutationRecorder>>,
266}
267
268impl std::fmt::Debug for InMemoryGraph {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        f.debug_struct("InMemoryGraph")
271            .field("next_node_id", &self.next_node_id)
272            .field("next_rel_id", &self.next_rel_id)
273            .field("nodes", &self.nodes)
274            .field("relationships", &self.relationships)
275            .field("outgoing", &self.outgoing)
276            .field("incoming", &self.incoming)
277            .field("nodes_by_label", &self.nodes_by_label)
278            .field("relationships_by_type", &self.relationships_by_type)
279            .field("indexes", &self.indexes)
280            .field(
281                "active_node_property_indexes",
282                &self.active_node_property_index_count(),
283            )
284            .field(
285                "active_relationship_property_indexes",
286                &self.active_relationship_property_index_count(),
287            )
288            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
289            .finish()
290    }
291}
292
293impl Clone for InMemoryGraph {
294    fn clone(&self) -> Self {
295        // Deliberately drop the recorder on clone: a cloned store is a
296        // separate identity; it should not silently share the observer.
297        Self {
298            next_node_id: self.next_node_id,
299            next_rel_id: self.next_rel_id,
300            nodes: self.nodes.clone(),
301            relationships: self.relationships.clone(),
302            outgoing: self.outgoing.clone(),
303            incoming: self.incoming.clone(),
304            nodes_by_label: self.nodes_by_label.clone(),
305            relationships_by_type: self.relationships_by_type.clone(),
306            indexes: RwLock::new(if self.has_active_property_indexes() {
307                self.indexes_read().clone()
308            } else {
309                PropertyIndexRegistry::default()
310            }),
311            active_node_property_indexes: AtomicUsize::new(self.active_node_property_index_count()),
312            active_relationship_property_indexes: AtomicUsize::new(
313                self.active_relationship_property_index_count(),
314            ),
315            recorder: None,
316        }
317    }
318}
319
320impl InMemoryGraph {
321    pub fn new() -> Self {
322        Self::default()
323    }
324
325    pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
326        // BTreeMap/BTreeSet do not support capacity reservation.
327        Self::default()
328    }
329
330    pub fn contains_node(&self, node_id: NodeId) -> bool {
331        self.nodes.contains_key(&node_id)
332    }
333
334    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
335        self.relationships.contains_key(&rel_id)
336    }
337
338    /// Install (or clear) the mutation recorder. Passing `None` detaches any
339    /// currently-installed recorder. The recorder observes every committed
340    /// mutation *after* it has been applied.
341    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
342        self.recorder = recorder;
343    }
344
345    /// Handle to the currently-installed recorder, if any.
346    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
347        self.recorder.as_ref()
348    }
349
350    /// Emit a mutation event only if a recorder is installed. The event is
351    /// built lazily — callers pass a closure, so when no recorder is
352    /// attached we pay only a `None` check and the cost of constructing the
353    /// event (labels/properties clones) is avoided.
354    #[inline]
355    fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
356        if let Some(rec) = &self.recorder {
357            rec.record(build());
358        }
359    }
360
361    fn alloc_node_id(&mut self) -> NodeId {
362        let id = self.next_node_id;
363        self.next_node_id += 1;
364        id
365    }
366
367    fn alloc_rel_id(&mut self) -> RelationshipId {
368        let id = self.next_rel_id;
369        self.next_rel_id += 1;
370        id
371    }
372
373    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
374        let next = id
375            .checked_add(1)
376            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
377        self.next_node_id = self.next_node_id.max(next);
378        Ok(())
379    }
380
381    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
382        let next = id
383            .checked_add(1)
384            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
385        self.next_rel_id = self.next_rel_id.max(next);
386        Ok(())
387    }
388
389    fn normalize_labels(labels: Vec<String>) -> Vec<String> {
390        let mut seen = BTreeSet::new();
391
392        labels
393            .into_iter()
394            .map(|s| s.trim().to_string())
395            .filter(|s| !s.is_empty())
396            .filter(|s| seen.insert(s.clone()))
397            .collect()
398    }
399
400    fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
401        self.nodes_by_label
402            .entry(label.to_string())
403            .or_default()
404            .insert(node_id);
405    }
406
407    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
408        if let Some(ids) = self.nodes_by_label.get_mut(label) {
409            ids.remove(&node_id);
410            if ids.is_empty() {
411                self.nodes_by_label.remove(label);
412            }
413        }
414    }
415
416    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
417        self.relationships_by_type
418            .entry(rel_type.to_string())
419            .or_default()
420            .insert(rel_id);
421    }
422
423    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
424        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
425            ids.remove(&rel_id);
426            if ids.is_empty() {
427                self.relationships_by_type.remove(rel_type);
428            }
429        }
430    }
431
432    fn indexes_read(&self) -> std::sync::RwLockReadGuard<'_, PropertyIndexRegistry> {
433        self.indexes
434            .read()
435            .unwrap_or_else(|poisoned| poisoned.into_inner())
436    }
437
438    fn indexes_write(&self) -> RwLockWriteGuard<'_, PropertyIndexRegistry> {
439        self.indexes
440            .write()
441            .unwrap_or_else(|poisoned| poisoned.into_inner())
442    }
443
444    fn indexes_mut(&mut self) -> &mut PropertyIndexRegistry {
445        self.indexes
446            .get_mut()
447            .unwrap_or_else(|poisoned| poisoned.into_inner())
448    }
449
450    #[inline]
451    fn active_node_property_index_count(&self) -> usize {
452        self.active_node_property_indexes.load(Ordering::Relaxed)
453    }
454
455    #[inline]
456    fn active_relationship_property_index_count(&self) -> usize {
457        self.active_relationship_property_indexes
458            .load(Ordering::Relaxed)
459    }
460
461    #[inline]
462    fn has_active_property_indexes(&self) -> bool {
463        self.active_node_property_index_count() != 0
464            || self.active_relationship_property_index_count() != 0
465    }
466
467    fn ensure_node_property_index(&self, key: &str) {
468        {
469            let indexes = self.indexes_read();
470            if indexes.node_properties.is_active(key) {
471                return;
472            }
473        }
474
475        let mut indexes = self.indexes_write();
476        if indexes.node_properties.is_active(key) {
477            return;
478        }
479
480        for (id, node) in &self.nodes {
481            if let Some(value) = node.properties.get(key) {
482                indexes.node_properties.insert_with_scopes(
483                    *id,
484                    node.labels.iter().map(String::as_str),
485                    key,
486                    value,
487                );
488            }
489        }
490        if indexes.node_properties.activate(key) {
491            self.active_node_property_indexes
492                .fetch_add(1, Ordering::Relaxed);
493        }
494    }
495
496    fn ensure_relationship_property_index(&self, key: &str) {
497        {
498            let indexes = self.indexes_read();
499            if indexes.relationship_properties.is_active(key) {
500                return;
501            }
502        }
503
504        let mut indexes = self.indexes_write();
505        if indexes.relationship_properties.is_active(key) {
506            return;
507        }
508
509        for (id, rel) in &self.relationships {
510            if let Some(value) = rel.properties.get(key) {
511                indexes.relationship_properties.insert_with_scopes(
512                    *id,
513                    [rel.rel_type.as_str()],
514                    key,
515                    value,
516                );
517            }
518        }
519        if indexes.relationship_properties.activate(key) {
520            self.active_relationship_property_indexes
521                .fetch_add(1, Ordering::Relaxed);
522        }
523    }
524
525    fn rebuild_property_indexes(&mut self) {
526        let mut indexes = PropertyIndexRegistry::default();
527
528        for (id, node) in &self.nodes {
529            for (key, value) in &node.properties {
530                if PropertyIndexKey::from_value(value).is_some() {
531                    indexes.node_properties.activate(key);
532                    indexes.node_properties.insert_with_scopes(
533                        *id,
534                        node.labels.iter().map(String::as_str),
535                        key,
536                        value,
537                    );
538                }
539            }
540        }
541
542        for (id, rel) in &self.relationships {
543            for (key, value) in &rel.properties {
544                if PropertyIndexKey::from_value(value).is_some() {
545                    indexes.relationship_properties.activate(key);
546                    indexes.relationship_properties.insert_with_scopes(
547                        *id,
548                        [rel.rel_type.as_str()],
549                        key,
550                        value,
551                    );
552                }
553            }
554        }
555
556        let node_index_count = indexes.node_properties.active_keys.len();
557        let relationship_index_count = indexes.relationship_properties.active_keys.len();
558        *self.indexes_mut() = indexes;
559        self.active_node_property_indexes
560            .store(node_index_count, Ordering::Relaxed);
561        self.active_relationship_property_indexes
562            .store(relationship_index_count, Ordering::Relaxed);
563    }
564
565    fn index_node_property_eager<'a>(
566        &mut self,
567        node_id: NodeId,
568        labels: impl IntoIterator<Item = &'a str>,
569        key: &str,
570        value: &PropertyValue,
571    ) {
572        if PropertyIndexKey::from_value(value).is_none() {
573            return;
574        }
575
576        let activated = {
577            let indexes = self.indexes_mut();
578            let activated = indexes.node_properties.activate(key);
579            indexes
580                .node_properties
581                .insert_with_scopes(node_id, labels, key, value);
582            activated
583        };
584        if activated {
585            self.active_node_property_indexes
586                .fetch_add(1, Ordering::Relaxed);
587        }
588    }
589
590    fn index_relationship_property_eager<'a>(
591        &mut self,
592        rel_id: RelationshipId,
593        scopes: impl IntoIterator<Item = &'a str>,
594        key: &str,
595        value: &PropertyValue,
596    ) {
597        if PropertyIndexKey::from_value(value).is_none() {
598            return;
599        }
600
601        let activated = {
602            let indexes = self.indexes_mut();
603            let activated = indexes.relationship_properties.activate(key);
604            indexes
605                .relationship_properties
606                .insert_with_scopes(rel_id, scopes, key, value);
607            activated
608        };
609        if activated {
610            self.active_relationship_property_indexes
611                .fetch_add(1, Ordering::Relaxed);
612        }
613    }
614
615    fn index_node_properties_eager<'a>(
616        &mut self,
617        node_id: NodeId,
618        labels: impl IntoIterator<Item = &'a str> + Clone,
619        properties: &Properties,
620    ) {
621        for (key, value) in properties {
622            self.index_node_property_eager(node_id, labels.clone(), key, value);
623        }
624    }
625
626    fn index_relationship_properties_eager<'a>(
627        &mut self,
628        rel_id: RelationshipId,
629        scopes: impl IntoIterator<Item = &'a str> + Clone,
630        properties: &Properties,
631    ) {
632        for (key, value) in properties {
633            self.index_relationship_property_eager(rel_id, scopes.clone(), key, value);
634        }
635    }
636
637    fn index_node_property_if_active<'a>(
638        &mut self,
639        node_id: NodeId,
640        labels: impl IntoIterator<Item = &'a str>,
641        key: &str,
642        value: &PropertyValue,
643    ) {
644        if self.active_node_property_index_count() == 0 {
645            return;
646        }
647        let indexes = self.indexes_mut();
648        if indexes.node_properties.is_active(key) {
649            indexes
650                .node_properties
651                .insert_with_scopes(node_id, labels, key, value);
652        }
653    }
654
655    fn index_node_properties_if_active<'a>(
656        &mut self,
657        node_id: NodeId,
658        labels: impl IntoIterator<Item = &'a str> + Clone,
659        properties: &Properties,
660    ) {
661        if self.active_node_property_index_count() == 0 {
662            return;
663        }
664        let indexes = self.indexes_mut();
665        for (key, value) in properties {
666            if indexes.node_properties.is_active(key) {
667                indexes
668                    .node_properties
669                    .insert_with_scopes(node_id, labels.clone(), key, value);
670            }
671        }
672    }
673
674    fn unindex_node_property_if_active<'a>(
675        &mut self,
676        node_id: NodeId,
677        labels: impl IntoIterator<Item = &'a str>,
678        key: &str,
679        value: &PropertyValue,
680    ) {
681        if self.active_node_property_index_count() == 0 {
682            return;
683        }
684        let indexes = self.indexes_mut();
685        if indexes.node_properties.is_active(key) {
686            indexes
687                .node_properties
688                .remove_with_scopes(node_id, labels, key, value);
689        }
690    }
691
692    fn index_node_scope_properties_if_active(
693        &mut self,
694        node_id: NodeId,
695        scope: &str,
696        properties: &Properties,
697    ) {
698        if self.active_node_property_index_count() == 0 {
699            return;
700        }
701        let indexes = self.indexes_mut();
702        for (key, value) in properties {
703            if indexes.node_properties.is_active(key) {
704                indexes
705                    .node_properties
706                    .insert_scoped(node_id, scope, key, value);
707            }
708        }
709    }
710
711    fn unindex_node_scope_properties_if_active(
712        &mut self,
713        node_id: NodeId,
714        scope: &str,
715        properties: &Properties,
716    ) {
717        if self.active_node_property_index_count() == 0 {
718            return;
719        }
720        let indexes = self.indexes_mut();
721        for (key, value) in properties {
722            if indexes.node_properties.is_active(key) {
723                indexes
724                    .node_properties
725                    .remove_scoped(node_id, scope, key, value);
726            }
727        }
728    }
729
730    fn unindex_active_node_properties<'a>(
731        &mut self,
732        node_id: NodeId,
733        labels: impl IntoIterator<Item = &'a str> + Clone,
734        properties: &Properties,
735    ) {
736        if self.active_node_property_index_count() == 0 {
737            return;
738        }
739        let indexes = self.indexes_mut();
740        for (key, value) in properties {
741            if indexes.node_properties.is_active(key) {
742                indexes
743                    .node_properties
744                    .remove_with_scopes(node_id, labels.clone(), key, value);
745            }
746        }
747    }
748
749    fn index_relationship_property_if_active<'a>(
750        &mut self,
751        rel_id: RelationshipId,
752        scopes: impl IntoIterator<Item = &'a str>,
753        key: &str,
754        value: &PropertyValue,
755    ) {
756        if self.active_relationship_property_index_count() == 0 {
757            return;
758        }
759        let indexes = self.indexes_mut();
760        if indexes.relationship_properties.is_active(key) {
761            indexes
762                .relationship_properties
763                .insert_with_scopes(rel_id, scopes, key, value);
764        }
765    }
766
767    fn index_relationship_properties_if_active<'a>(
768        &mut self,
769        rel_id: RelationshipId,
770        scopes: impl IntoIterator<Item = &'a str> + Clone,
771        properties: &Properties,
772    ) {
773        if self.active_relationship_property_index_count() == 0 {
774            return;
775        }
776        let indexes = self.indexes_mut();
777        for (key, value) in properties {
778            if indexes.relationship_properties.is_active(key) {
779                indexes.relationship_properties.insert_with_scopes(
780                    rel_id,
781                    scopes.clone(),
782                    key,
783                    value,
784                );
785            }
786        }
787    }
788
789    fn unindex_relationship_property_if_active<'a>(
790        &mut self,
791        rel_id: RelationshipId,
792        scopes: impl IntoIterator<Item = &'a str>,
793        key: &str,
794        value: &PropertyValue,
795    ) {
796        if self.active_relationship_property_index_count() == 0 {
797            return;
798        }
799        let indexes = self.indexes_mut();
800        if indexes.relationship_properties.is_active(key) {
801            indexes
802                .relationship_properties
803                .remove_with_scopes(rel_id, scopes, key, value);
804        }
805    }
806
807    fn unindex_active_relationship_properties<'a>(
808        &mut self,
809        rel_id: RelationshipId,
810        scopes: impl IntoIterator<Item = &'a str> + Clone,
811        properties: &Properties,
812    ) {
813        if self.active_relationship_property_index_count() == 0 {
814            return;
815        }
816        let indexes = self.indexes_mut();
817        for (key, value) in properties {
818            if indexes.relationship_properties.is_active(key) {
819                indexes.relationship_properties.remove_with_scopes(
820                    rel_id,
821                    scopes.clone(),
822                    key,
823                    value,
824                );
825            }
826        }
827    }
828
829    fn scan_nodes_by_property(
830        &self,
831        label: Option<&str>,
832        key: &str,
833        value: &PropertyValue,
834    ) -> Vec<NodeRecord> {
835        let candidates: Box<dyn Iterator<Item = &NodeRecord> + '_> = match label {
836            Some(label) => Box::new(
837                self.nodes_by_label
838                    .get(label)
839                    .into_iter()
840                    .flat_map(|ids| ids.iter())
841                    .filter_map(|id| self.nodes.get(id)),
842            ),
843            None => Box::new(self.nodes.values()),
844        };
845
846        candidates
847            .filter(|node| node.properties.get(key) == Some(value))
848            .cloned()
849            .collect()
850    }
851
852    fn scan_relationships_by_property(
853        &self,
854        rel_type: Option<&str>,
855        key: &str,
856        value: &PropertyValue,
857    ) -> Vec<RelationshipRecord> {
858        let candidates: Box<dyn Iterator<Item = &RelationshipRecord> + '_> = match rel_type {
859            Some(rel_type) => Box::new(
860                self.relationships_by_type
861                    .get(rel_type)
862                    .into_iter()
863                    .flat_map(|ids| ids.iter())
864                    .filter_map(|id| self.relationships.get(id)),
865            ),
866            None => Box::new(self.relationships.values()),
867        };
868
869        candidates
870            .filter(|rel| rel.properties.get(key) == Some(value))
871            .cloned()
872            .collect()
873    }
874
875    fn attach_relationship(&mut self, rel: &RelationshipRecord) {
876        self.outgoing.entry(rel.src).or_default().insert(rel.id);
877        self.incoming.entry(rel.dst).or_default().insert(rel.id);
878        self.insert_relationship_type_index(rel.id, &rel.rel_type);
879    }
880
881    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
882        if let Some(ids) = self.outgoing.get_mut(&rel.src) {
883            ids.remove(&rel.id);
884            if ids.is_empty() {
885                self.outgoing.remove(&rel.src);
886            }
887        }
888
889        if let Some(ids) = self.incoming.get_mut(&rel.dst) {
890            ids.remove(&rel.id);
891            if ids.is_empty() {
892                self.incoming.remove(&rel.dst);
893            }
894        }
895
896        self.remove_relationship_type_index(rel.id, &rel.rel_type);
897    }
898
899    fn relationship_ids_for_direction(
900        &self,
901        node_id: NodeId,
902        direction: Direction,
903    ) -> Vec<RelationshipId> {
904        match direction {
905            Direction::Left => self
906                .incoming
907                .get(&node_id)
908                .map(|ids| ids.iter().copied().collect())
909                .unwrap_or_default(),
910
911            Direction::Right => self
912                .outgoing
913                .get(&node_id)
914                .map(|ids| ids.iter().copied().collect())
915                .unwrap_or_default(),
916
917            Direction::Undirected => {
918                let mut ids = BTreeSet::new();
919
920                if let Some(out) = self.outgoing.get(&node_id) {
921                    ids.extend(out.iter().copied());
922                }
923                if let Some(inc) = self.incoming.get(&node_id) {
924                    ids.extend(inc.iter().copied());
925                }
926
927                ids.into_iter().collect()
928            }
929        }
930    }
931
932    fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
933        if rel.src == node_id {
934            Some(rel.dst)
935        } else if rel.dst == node_id {
936            Some(rel.src)
937        } else {
938            None
939        }
940    }
941
942    fn has_incident_relationships(&self, node_id: NodeId) -> bool {
943        self.outgoing
944            .get(&node_id)
945            .map(|ids| !ids.is_empty())
946            .unwrap_or(false)
947            || self
948                .incoming
949                .get(&node_id)
950                .map(|ids| !ids.is_empty())
951                .unwrap_or(false)
952    }
953
954    fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
955        let mut rel_ids = BTreeSet::new();
956
957        if let Some(ids) = self.outgoing.get(&node_id) {
958            rel_ids.extend(ids.iter().copied());
959        }
960        if let Some(ids) = self.incoming.get(&node_id) {
961            rel_ids.extend(ids.iter().copied());
962        }
963
964        rel_ids
965    }
966
967    /// Replay a node creation using the id captured in a durable mutation
968    /// event. This intentionally does not emit a new mutation event: callers
969    /// must invoke it before installing a recorder on the graph.
970    #[doc(hidden)]
971    pub fn replay_create_node(
972        &mut self,
973        id: NodeId,
974        labels: Vec<String>,
975        properties: Properties,
976    ) -> Result<NodeRecord, String> {
977        if self.recorder.is_some() {
978            return Err(
979                "cannot replay node creation while a mutation recorder is installed".into(),
980            );
981        }
982        if self.nodes.contains_key(&id) {
983            return Err(format!("node id {id} already exists"));
984        }
985
986        let labels = Self::normalize_labels(labels);
987        let node = NodeRecord {
988            id,
989            labels: labels.clone(),
990            properties,
991        };
992
993        for label in &labels {
994            self.insert_node_label_index(id, label);
995        }
996        self.index_node_properties_eager(
997            id,
998            node.labels.iter().map(String::as_str),
999            &node.properties,
1000        );
1001        self.nodes.insert(id, node.clone());
1002        self.outgoing.entry(id).or_default();
1003        self.incoming.entry(id).or_default();
1004        self.bump_next_node_id_past(id)?;
1005
1006        Ok(node)
1007    }
1008
1009    /// Replay a relationship creation using the id captured in a durable
1010    /// mutation event. This intentionally does not emit a new mutation event:
1011    /// callers must invoke it before installing a recorder on the graph.
1012    #[doc(hidden)]
1013    pub fn replay_create_relationship(
1014        &mut self,
1015        id: RelationshipId,
1016        src: NodeId,
1017        dst: NodeId,
1018        rel_type: &str,
1019        properties: Properties,
1020    ) -> Result<RelationshipRecord, String> {
1021        if self.recorder.is_some() {
1022            return Err(
1023                "cannot replay relationship creation while a mutation recorder is installed".into(),
1024            );
1025        }
1026        if self.relationships.contains_key(&id) {
1027            return Err(format!("relationship id {id} already exists"));
1028        }
1029        if !self.nodes.contains_key(&src) {
1030            return Err(format!(
1031                "relationship {id} references missing source node {src}"
1032            ));
1033        }
1034        if !self.nodes.contains_key(&dst) {
1035            return Err(format!(
1036                "relationship {id} references missing target node {dst}"
1037            ));
1038        }
1039
1040        let trimmed = rel_type.trim();
1041        if trimmed.is_empty() {
1042            return Err(format!("relationship {id} has an empty type"));
1043        }
1044
1045        let rel = RelationshipRecord {
1046            id,
1047            src,
1048            dst,
1049            rel_type: trimmed.to_string(),
1050            properties,
1051        };
1052
1053        self.attach_relationship(&rel);
1054        self.index_relationship_properties_eager(id, [rel.rel_type.as_str()], &rel.properties);
1055        self.relationships.insert(id, rel.clone());
1056        self.bump_next_rel_id_past(id)?;
1057
1058        Ok(rel)
1059    }
1060}
1061
1062impl GraphStorage for InMemoryGraph {
1063    // ---------- Required primitives ----------
1064
1065    fn contains_node(&self, id: NodeId) -> bool {
1066        self.nodes.contains_key(&id)
1067    }
1068
1069    fn node(&self, id: NodeId) -> Option<NodeRecord> {
1070        self.nodes.get(&id).cloned()
1071    }
1072
1073    fn all_node_ids(&self) -> Vec<NodeId> {
1074        self.nodes.keys().copied().collect()
1075    }
1076
1077    fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
1078        match self.nodes_by_label.get(label) {
1079            Some(ids) => ids.iter().copied().collect(),
1080            None => Vec::new(),
1081        }
1082    }
1083
1084    fn contains_relationship(&self, id: RelationshipId) -> bool {
1085        self.relationships.contains_key(&id)
1086    }
1087
1088    fn relationship(&self, id: RelationshipId) -> Option<RelationshipRecord> {
1089        self.relationships.get(&id).cloned()
1090    }
1091
1092    fn all_rel_ids(&self) -> Vec<RelationshipId> {
1093        self.relationships.keys().copied().collect()
1094    }
1095
1096    fn rel_ids_by_type(&self, rel_type: &str) -> Vec<RelationshipId> {
1097        match self.relationships_by_type.get(rel_type) {
1098            Some(ids) => ids.iter().copied().collect(),
1099            None => Vec::new(),
1100        }
1101    }
1102
1103    fn relationship_endpoints(&self, id: RelationshipId) -> Option<(NodeId, NodeId)> {
1104        self.relationships.get(&id).map(|r| (r.src, r.dst))
1105    }
1106
1107    fn expand_ids(
1108        &self,
1109        node_id: NodeId,
1110        direction: Direction,
1111        types: &[String],
1112    ) -> Vec<(RelationshipId, NodeId)> {
1113        if !self.nodes.contains_key(&node_id) {
1114            return Vec::new();
1115        }
1116
1117        // Fast path: no type filter — just join adjacency + rel endpoints.
1118        if types.is_empty() {
1119            return self
1120                .relationship_ids_for_direction(node_id, direction)
1121                .into_iter()
1122                .filter_map(|rel_id| {
1123                    let rel = self.relationships.get(&rel_id)?;
1124                    let other_id = Self::other_endpoint(rel, node_id)?;
1125                    Some((rel_id, other_id))
1126                })
1127                .collect();
1128        }
1129
1130        // Type-filtered: borrow rel_type straight from the stored record.
1131        // For small type lists (the common case) the linear scan beats a
1132        // BTreeSet; we keep it allocation-free.
1133        self.relationship_ids_for_direction(node_id, direction)
1134            .into_iter()
1135            .filter_map(|rel_id| {
1136                let rel = self.relationships.get(&rel_id)?;
1137                if !types.iter().any(|t| t == &rel.rel_type) {
1138                    return None;
1139                }
1140                let other_id = Self::other_endpoint(rel, node_id)?;
1141                Some((rel_id, other_id))
1142            })
1143            .collect()
1144    }
1145
1146    fn all_labels(&self) -> Vec<String> {
1147        self.nodes_by_label.keys().cloned().collect()
1148    }
1149
1150    fn all_relationship_types(&self) -> Vec<String> {
1151        self.relationships_by_type.keys().cloned().collect()
1152    }
1153
1154    // ---------- Optimization hooks: zero-clone borrow access ----------
1155
1156    fn with_node<F, R>(&self, id: NodeId, f: F) -> Option<R>
1157    where
1158        F: FnOnce(&NodeRecord) -> R,
1159        Self: Sized,
1160    {
1161        self.nodes.get(&id).map(f)
1162    }
1163
1164    fn with_relationship<F, R>(&self, id: RelationshipId, f: F) -> Option<R>
1165    where
1166        F: FnOnce(&RelationshipRecord) -> R,
1167        Self: Sized,
1168    {
1169        self.relationships.get(&id).map(f)
1170    }
1171
1172    // ---------- Overrides: counts + existence ----------
1173
1174    fn node_count(&self) -> usize {
1175        self.nodes.len()
1176    }
1177
1178    fn relationship_count(&self) -> usize {
1179        self.relationships.len()
1180    }
1181
1182    fn has_node(&self, id: NodeId) -> bool {
1183        self.nodes.contains_key(&id)
1184    }
1185
1186    fn has_relationship(&self, id: RelationshipId) -> bool {
1187        self.relationships.contains_key(&id)
1188    }
1189
1190    // ---------- Overrides: record-returning scans (direct iteration) ----------
1191
1192    fn all_nodes(&self) -> Vec<NodeRecord> {
1193        self.nodes.values().cloned().collect()
1194    }
1195
1196    fn nodes_by_label(&self, label: &str) -> Vec<NodeRecord> {
1197        self.nodes_by_label
1198            .get(label)
1199            .into_iter()
1200            .flat_map(|ids| ids.iter())
1201            .filter_map(|id| self.nodes.get(id).cloned())
1202            .collect()
1203    }
1204
1205    fn all_relationships(&self) -> Vec<RelationshipRecord> {
1206        self.relationships.values().cloned().collect()
1207    }
1208
1209    fn relationships_by_type(&self, rel_type: &str) -> Vec<RelationshipRecord> {
1210        self.relationships_by_type
1211            .get(rel_type)
1212            .into_iter()
1213            .flat_map(|ids| ids.iter())
1214            .filter_map(|id| self.relationships.get(id).cloned())
1215            .collect()
1216    }
1217
1218    fn find_nodes_by_property(
1219        &self,
1220        label: Option<&str>,
1221        key: &str,
1222        value: &PropertyValue,
1223    ) -> Vec<NodeRecord>
1224    where
1225        Self: Sized,
1226    {
1227        if PropertyIndexKey::from_value(value).is_none() {
1228            return self.scan_nodes_by_property(label, key, value);
1229        }
1230
1231        self.ensure_node_property_index(key);
1232        let indexes = self.indexes_read();
1233
1234        match label {
1235            Some(label) => {
1236                let Some(ids) = indexes.node_properties.scoped_ids_for(label, key, value) else {
1237                    return Vec::new();
1238                };
1239                ids.iter()
1240                    .filter_map(|id| self.nodes.get(id).cloned())
1241                    .collect()
1242            }
1243            None => indexes
1244                .node_properties
1245                .ids_for(key, value)
1246                .into_iter()
1247                .flat_map(|ids| ids.iter())
1248                .filter_map(|id| self.nodes.get(id).cloned())
1249                .collect(),
1250        }
1251    }
1252
1253    fn find_node_ids_by_property(
1254        &self,
1255        label: Option<&str>,
1256        key: &str,
1257        value: &PropertyValue,
1258    ) -> Vec<NodeId>
1259    where
1260        Self: Sized,
1261    {
1262        if PropertyIndexKey::from_value(value).is_none() {
1263            return self
1264                .scan_nodes_by_property(label, key, value)
1265                .into_iter()
1266                .map(|n| n.id)
1267                .collect();
1268        }
1269
1270        self.ensure_node_property_index(key);
1271        let indexes = self.indexes_read();
1272
1273        match label {
1274            Some(label) => indexes
1275                .node_properties
1276                .scoped_ids_for(label, key, value)
1277                .map(|ids| ids.iter().copied().collect())
1278                .unwrap_or_default(),
1279            None => indexes
1280                .node_properties
1281                .ids_for(key, value)
1282                .map(|ids| ids.iter().copied().collect())
1283                .unwrap_or_default(),
1284        }
1285    }
1286
1287    fn find_relationships_by_property(
1288        &self,
1289        rel_type: Option<&str>,
1290        key: &str,
1291        value: &PropertyValue,
1292    ) -> Vec<RelationshipRecord>
1293    where
1294        Self: Sized,
1295    {
1296        if PropertyIndexKey::from_value(value).is_none() {
1297            return self.scan_relationships_by_property(rel_type, key, value);
1298        }
1299
1300        self.ensure_relationship_property_index(key);
1301        let indexes = self.indexes_read();
1302
1303        match rel_type {
1304            Some(rel_type) => {
1305                let Some(ids) = indexes
1306                    .relationship_properties
1307                    .scoped_ids_for(rel_type, key, value)
1308                else {
1309                    return Vec::new();
1310                };
1311                ids.iter()
1312                    .filter_map(|id| self.relationships.get(id).cloned())
1313                    .collect()
1314            }
1315            None => indexes
1316                .relationship_properties
1317                .ids_for(key, value)
1318                .into_iter()
1319                .flat_map(|ids| ids.iter())
1320                .filter_map(|id| self.relationships.get(id).cloned())
1321                .collect(),
1322        }
1323    }
1324
1325    fn find_relationship_ids_by_property(
1326        &self,
1327        rel_type: Option<&str>,
1328        key: &str,
1329        value: &PropertyValue,
1330    ) -> Vec<RelationshipId>
1331    where
1332        Self: Sized,
1333    {
1334        if PropertyIndexKey::from_value(value).is_none() {
1335            return self
1336                .scan_relationships_by_property(rel_type, key, value)
1337                .into_iter()
1338                .map(|r| r.id)
1339                .collect();
1340        }
1341
1342        self.ensure_relationship_property_index(key);
1343        let indexes = self.indexes_read();
1344
1345        match rel_type {
1346            Some(rel_type) => indexes
1347                .relationship_properties
1348                .scoped_ids_for(rel_type, key, value)
1349                .map(|ids| ids.iter().copied().collect())
1350                .unwrap_or_default(),
1351            None => indexes
1352                .relationship_properties
1353                .ids_for(key, value)
1354                .map(|ids| ids.iter().copied().collect())
1355                .unwrap_or_default(),
1356        }
1357    }
1358
1359    fn node_exists_with_label_and_property(
1360        &self,
1361        label: &str,
1362        key: &str,
1363        value: &PropertyValue,
1364    ) -> bool
1365    where
1366        Self: Sized,
1367    {
1368        if PropertyIndexKey::from_value(value).is_none() {
1369            return !self
1370                .scan_nodes_by_property(Some(label), key, value)
1371                .is_empty();
1372        }
1373
1374        self.ensure_node_property_index(key);
1375        let indexes = self.indexes_read();
1376        indexes
1377            .node_properties
1378            .scoped_ids_for(label, key, value)
1379            .map(|ids| !ids.is_empty())
1380            .unwrap_or(false)
1381    }
1382
1383    fn relationship_exists_with_type_and_property(
1384        &self,
1385        rel_type: &str,
1386        key: &str,
1387        value: &PropertyValue,
1388    ) -> bool
1389    where
1390        Self: Sized,
1391    {
1392        if PropertyIndexKey::from_value(value).is_none() {
1393            return !self
1394                .scan_relationships_by_property(Some(rel_type), key, value)
1395                .is_empty();
1396        }
1397
1398        self.ensure_relationship_property_index(key);
1399        let indexes = self.indexes_read();
1400        indexes
1401            .relationship_properties
1402            .scoped_ids_for(rel_type, key, value)
1403            .map(|ids| !ids.is_empty())
1404            .unwrap_or(false)
1405    }
1406
1407    // ---------- Overrides: traversal (direct adjacency) ----------
1408
1409    fn relationship_ids_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipId> {
1410        self.relationship_ids_for_direction(node_id, direction)
1411    }
1412
1413    fn outgoing_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
1414        self.outgoing
1415            .get(&node_id)
1416            .into_iter()
1417            .flat_map(|ids| ids.iter())
1418            .filter_map(|id| self.relationships.get(id).cloned())
1419            .collect()
1420    }
1421
1422    fn incoming_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
1423        self.incoming
1424            .get(&node_id)
1425            .into_iter()
1426            .flat_map(|ids| ids.iter())
1427            .filter_map(|id| self.relationships.get(id).cloned())
1428            .collect()
1429    }
1430
1431    fn degree(&self, node_id: NodeId, direction: Direction) -> usize {
1432        match direction {
1433            Direction::Right => self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0),
1434            Direction::Left => self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0),
1435            Direction::Undirected => {
1436                self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0)
1437                    + self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0)
1438            }
1439        }
1440    }
1441
1442    fn expand(
1443        &self,
1444        node_id: NodeId,
1445        direction: Direction,
1446        types: &[String],
1447    ) -> Vec<(RelationshipRecord, NodeRecord)> {
1448        if !self.nodes.contains_key(&node_id) {
1449            return Vec::new();
1450        }
1451
1452        let type_filter: Option<BTreeSet<&str>> = if types.is_empty() {
1453            None
1454        } else {
1455            Some(types.iter().map(String::as_str).collect())
1456        };
1457
1458        self.relationship_ids_for_direction(node_id, direction)
1459            .into_iter()
1460            .filter_map(|rel_id| self.relationships.get(&rel_id))
1461            .filter(|rel| {
1462                type_filter
1463                    .as_ref()
1464                    .map(|allowed| allowed.contains(rel.rel_type.as_str()))
1465                    .unwrap_or(true)
1466            })
1467            .filter_map(|rel| {
1468                let other_id = Self::other_endpoint(rel, node_id)?;
1469                let other = self.nodes.get(&other_id)?;
1470                Some((rel.clone(), other.clone()))
1471            })
1472            .collect()
1473    }
1474    // ---------- Overrides: schema introspection ----------
1475
1476    fn all_node_property_keys(&self) -> Vec<String> {
1477        let mut keys = BTreeSet::new();
1478        for node in self.nodes.values() {
1479            for key in node.properties.keys() {
1480                keys.insert(key.clone());
1481            }
1482        }
1483        keys.into_iter().collect()
1484    }
1485
1486    fn all_relationship_property_keys(&self) -> Vec<String> {
1487        let mut keys = BTreeSet::new();
1488        for rel in self.relationships.values() {
1489            for key in rel.properties.keys() {
1490                keys.insert(key.clone());
1491            }
1492        }
1493        keys.into_iter().collect()
1494    }
1495
1496    fn label_property_keys(&self, label: &str) -> Vec<String> {
1497        let mut keys = BTreeSet::new();
1498
1499        if let Some(ids) = self.nodes_by_label.get(label) {
1500            for id in ids {
1501                if let Some(node) = self.nodes.get(id) {
1502                    for key in node.properties.keys() {
1503                        keys.insert(key.clone());
1504                    }
1505                }
1506            }
1507        }
1508
1509        keys.into_iter().collect()
1510    }
1511
1512    fn rel_type_property_keys(&self, rel_type: &str) -> Vec<String> {
1513        let mut keys = BTreeSet::new();
1514
1515        if let Some(ids) = self.relationships_by_type.get(rel_type) {
1516            for id in ids {
1517                if let Some(rel) = self.relationships.get(id) {
1518                    for key in rel.properties.keys() {
1519                        keys.insert(key.clone());
1520                    }
1521                }
1522            }
1523        }
1524
1525        keys.into_iter().collect()
1526    }
1527}
1528
1529impl BorrowedGraphStorage for InMemoryGraph {
1530    fn node_ref(&self, id: NodeId) -> Option<&NodeRecord> {
1531        self.nodes.get(&id)
1532    }
1533
1534    fn relationship_ref(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
1535        self.relationships.get(&id)
1536    }
1537
1538    fn node_refs(&self) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
1539        Box::new(self.nodes.values())
1540    }
1541
1542    fn node_refs_by_label(&self, label: &str) -> Box<dyn Iterator<Item = &NodeRecord> + '_> {
1543        Box::new(
1544            self.nodes_by_label
1545                .get(label)
1546                .into_iter()
1547                .flat_map(|ids| ids.iter())
1548                .filter_map(|id| self.nodes.get(id)),
1549        )
1550    }
1551
1552    fn relationship_refs(&self) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
1553        Box::new(self.relationships.values())
1554    }
1555
1556    fn relationship_refs_by_type(
1557        &self,
1558        rel_type: &str,
1559    ) -> Box<dyn Iterator<Item = &RelationshipRecord> + '_> {
1560        Box::new(
1561            self.relationships_by_type
1562                .get(rel_type)
1563                .into_iter()
1564                .flat_map(|ids| ids.iter())
1565                .filter_map(|id| self.relationships.get(id)),
1566        )
1567    }
1568}
1569
1570impl GraphStorageMut for InMemoryGraph {
1571    fn create_node(&mut self, labels: Vec<String>, properties: Properties) -> NodeRecord {
1572        let id = self.alloc_node_id();
1573        let labels = Self::normalize_labels(labels);
1574
1575        let node = NodeRecord {
1576            id,
1577            labels: labels.clone(),
1578            properties,
1579        };
1580
1581        for label in &labels {
1582            self.insert_node_label_index(id, label);
1583        }
1584        if self.active_node_property_index_count() != 0 {
1585            self.index_node_properties_if_active(
1586                id,
1587                node.labels.iter().map(String::as_str),
1588                &node.properties,
1589            );
1590        }
1591
1592        self.nodes.insert(id, node.clone());
1593
1594        self.outgoing.entry(id).or_default();
1595        self.incoming.entry(id).or_default();
1596
1597        self.emit(|| MutationEvent::CreateNode {
1598            id,
1599            labels: node.labels.clone(),
1600            properties: node.properties.clone(),
1601        });
1602
1603        node
1604    }
1605
1606    fn create_relationship(
1607        &mut self,
1608        src: NodeId,
1609        dst: NodeId,
1610        rel_type: &str,
1611        properties: Properties,
1612    ) -> Option<RelationshipRecord> {
1613        if !self.nodes.contains_key(&src) || !self.nodes.contains_key(&dst) {
1614            return None;
1615        }
1616
1617        let trimmed = rel_type.trim();
1618        if trimmed.is_empty() {
1619            return None;
1620        }
1621
1622        let id = self.alloc_rel_id();
1623        let rel = RelationshipRecord {
1624            id,
1625            src,
1626            dst,
1627            rel_type: trimmed.to_string(),
1628            properties,
1629        };
1630
1631        self.attach_relationship(&rel);
1632        if self.active_relationship_property_index_count() != 0 {
1633            self.index_relationship_properties_if_active(
1634                id,
1635                [rel.rel_type.as_str()],
1636                &rel.properties,
1637            );
1638        }
1639        self.relationships.insert(id, rel.clone());
1640
1641        self.emit(|| MutationEvent::CreateRelationship {
1642            id,
1643            src,
1644            dst,
1645            rel_type: rel.rel_type.clone(),
1646            properties: rel.properties.clone(),
1647        });
1648
1649        Some(rel)
1650    }
1651
1652    fn set_node_property(&mut self, node_id: NodeId, key: String, value: PropertyValue) -> bool {
1653        if !self.nodes.contains_key(&node_id) {
1654            return false;
1655        }
1656
1657        let recorder_active = self.recorder.is_some();
1658        let (stored_key, stored_value) = if recorder_active {
1659            (Some(key.clone()), Some(value.clone()))
1660        } else {
1661            (None, None)
1662        };
1663
1664        let index_active = self.active_node_property_index_count() != 0
1665            && self.indexes_mut().node_properties.is_active(&key);
1666        let (old, labels) = match self.nodes.get_mut(&node_id) {
1667            Some(node) => {
1668                let labels = if index_active {
1669                    Some(node.labels.clone())
1670                } else {
1671                    None
1672                };
1673                (node.properties.insert(key.clone(), value.clone()), labels)
1674            }
1675            None => return false,
1676        };
1677        if let Some(labels) = labels.as_ref() {
1678            if let Some(old) = old.as_ref() {
1679                self.unindex_node_property_if_active(
1680                    node_id,
1681                    labels.iter().map(String::as_str),
1682                    &key,
1683                    old,
1684                );
1685            }
1686            self.index_node_property_if_active(
1687                node_id,
1688                labels.iter().map(String::as_str),
1689                &key,
1690                &value,
1691            );
1692        }
1693
1694        self.emit(|| MutationEvent::SetNodeProperty {
1695            node_id,
1696            key: stored_key.unwrap(),
1697            value: stored_value.unwrap(),
1698        });
1699
1700        true
1701    }
1702
1703    fn remove_node_property(&mut self, node_id: NodeId, key: &str) -> bool {
1704        let removed = match self.nodes.get_mut(&node_id) {
1705            Some(node) => node.properties.remove(key),
1706            None => return false,
1707        };
1708        let Some(removed) = removed else {
1709            return false;
1710        };
1711
1712        let labels = if self.active_node_property_index_count() != 0
1713            && self.indexes_mut().node_properties.is_active(key)
1714        {
1715            self.nodes.get(&node_id).map(|node| node.labels.clone())
1716        } else {
1717            None
1718        };
1719
1720        if let Some(labels) = labels.as_ref() {
1721            self.unindex_node_property_if_active(
1722                node_id,
1723                labels.iter().map(String::as_str),
1724                key,
1725                &removed,
1726            );
1727        }
1728
1729        self.emit(|| MutationEvent::RemoveNodeProperty {
1730            node_id,
1731            key: key.to_string(),
1732        });
1733
1734        true
1735    }
1736
1737    fn add_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
1738        let label = label.trim();
1739        if label.is_empty() {
1740            return false;
1741        }
1742
1743        let index_has_active_keys = self.active_node_property_index_count() != 0;
1744        let mut scoped_properties = None;
1745        let applied = match self.nodes.get_mut(&node_id) {
1746            Some(node) => {
1747                if node.labels.iter().any(|l| l == label) {
1748                    return false;
1749                }
1750
1751                node.labels.push(label.to_string());
1752                if index_has_active_keys {
1753                    scoped_properties = Some(node.properties.clone());
1754                }
1755                self.insert_node_label_index(node_id, label);
1756                true
1757            }
1758            None => false,
1759        };
1760        if applied {
1761            if let Some(properties) = scoped_properties.as_ref() {
1762                self.index_node_scope_properties_if_active(node_id, label, properties);
1763            }
1764            self.emit(|| MutationEvent::AddNodeLabel {
1765                node_id,
1766                label: label.to_string(),
1767            });
1768        }
1769        applied
1770    }
1771
1772    fn remove_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
1773        let index_has_active_keys = self.active_node_property_index_count() != 0;
1774        let mut scoped_properties = None;
1775        let applied = match self.nodes.get_mut(&node_id) {
1776            Some(node) => {
1777                let original_len = node.labels.len();
1778                node.labels.retain(|l| l != label);
1779
1780                if node.labels.len() != original_len {
1781                    if index_has_active_keys {
1782                        scoped_properties = Some(node.properties.clone());
1783                    }
1784                    self.remove_node_label_index(node_id, label);
1785                    true
1786                } else {
1787                    false
1788                }
1789            }
1790            None => false,
1791        };
1792        if applied {
1793            if let Some(properties) = scoped_properties.as_ref() {
1794                self.unindex_node_scope_properties_if_active(node_id, label, properties);
1795            }
1796            self.emit(|| MutationEvent::RemoveNodeLabel {
1797                node_id,
1798                label: label.to_string(),
1799            });
1800        }
1801        applied
1802    }
1803
1804    fn set_relationship_property(
1805        &mut self,
1806        rel_id: RelationshipId,
1807        key: String,
1808        value: PropertyValue,
1809    ) -> bool {
1810        if !self.relationships.contains_key(&rel_id) {
1811            return false;
1812        }
1813
1814        let recorder_active = self.recorder.is_some();
1815        let (stored_key, stored_value) = if recorder_active {
1816            (Some(key.clone()), Some(value.clone()))
1817        } else {
1818            (None, None)
1819        };
1820
1821        let index_active = self.active_relationship_property_index_count() != 0
1822            && self.indexes_mut().relationship_properties.is_active(&key);
1823        let (old, rel_type) = match self.relationships.get_mut(&rel_id) {
1824            Some(rel) => {
1825                let rel_type = if index_active {
1826                    Some(rel.rel_type.clone())
1827                } else {
1828                    None
1829                };
1830                (rel.properties.insert(key.clone(), value.clone()), rel_type)
1831            }
1832            None => return false,
1833        };
1834        if let Some(rel_type) = rel_type.as_deref() {
1835            if let Some(old) = old.as_ref() {
1836                self.unindex_relationship_property_if_active(rel_id, [rel_type], &key, old);
1837            }
1838            self.index_relationship_property_if_active(rel_id, [rel_type], &key, &value);
1839        }
1840
1841        self.emit(|| MutationEvent::SetRelationshipProperty {
1842            rel_id,
1843            key: stored_key.unwrap(),
1844            value: stored_value.unwrap(),
1845        });
1846
1847        true
1848    }
1849
1850    fn remove_relationship_property(&mut self, rel_id: RelationshipId, key: &str) -> bool {
1851        let removed = match self.relationships.get_mut(&rel_id) {
1852            Some(rel) => rel.properties.remove(key),
1853            None => return false,
1854        };
1855        let Some(removed) = removed else {
1856            return false;
1857        };
1858
1859        let rel_type = if self.active_relationship_property_index_count() != 0
1860            && self.indexes_mut().relationship_properties.is_active(key)
1861        {
1862            self.relationships
1863                .get(&rel_id)
1864                .map(|rel| rel.rel_type.clone())
1865        } else {
1866            None
1867        };
1868
1869        if let Some(rel_type) = rel_type.as_deref() {
1870            self.unindex_relationship_property_if_active(rel_id, [rel_type], key, &removed);
1871        }
1872
1873        self.emit(|| MutationEvent::RemoveRelationshipProperty {
1874            rel_id,
1875            key: key.to_string(),
1876        });
1877
1878        true
1879    }
1880
1881    fn delete_relationship(&mut self, rel_id: RelationshipId) -> bool {
1882        let applied = match self.relationships.remove(&rel_id) {
1883            Some(rel) => {
1884                self.detach_relationship_indexes(&rel);
1885                self.unindex_active_relationship_properties(
1886                    rel_id,
1887                    [rel.rel_type.as_str()],
1888                    &rel.properties,
1889                );
1890                true
1891            }
1892            None => false,
1893        };
1894        if applied {
1895            self.emit(|| MutationEvent::DeleteRelationship { rel_id });
1896        }
1897        applied
1898    }
1899
1900    fn delete_node(&mut self, node_id: NodeId) -> bool {
1901        if !self.nodes.contains_key(&node_id) {
1902            return false;
1903        }
1904
1905        if self.has_incident_relationships(node_id) {
1906            return false;
1907        }
1908
1909        let node = match self.nodes.remove(&node_id) {
1910            Some(node) => node,
1911            None => return false,
1912        };
1913
1914        for label in &node.labels {
1915            self.remove_node_label_index(node_id, label);
1916        }
1917        self.unindex_active_node_properties(
1918            node_id,
1919            node.labels.iter().map(String::as_str),
1920            &node.properties,
1921        );
1922
1923        self.outgoing.remove(&node_id);
1924        self.incoming.remove(&node_id);
1925
1926        self.emit(|| MutationEvent::DeleteNode { node_id });
1927
1928        true
1929    }
1930
1931    fn detach_delete_node(&mut self, node_id: NodeId) -> bool {
1932        if !self.nodes.contains_key(&node_id) {
1933            return false;
1934        }
1935
1936        let rel_ids: Vec<_> = self
1937            .incident_relationship_ids(node_id)
1938            .into_iter()
1939            .collect();
1940
1941        // We deliberately fire per-relationship DeleteRelationship events
1942        // here (via `delete_relationship`) and a DetachDeleteNode event at
1943        // the end. A WAL replayer that sees DetachDeleteNode can ignore the
1944        // preceding DeleteRelationship events — or, equivalently, replay
1945        // them and the DetachDeleteNode becomes a no-op on the remaining
1946        // (now-empty) node. The emit-before-delete choice costs one extra
1947        // event per mutation but keeps the replay contract simple:
1948        // "apply every event in order".
1949        for rel_id in rel_ids {
1950            let _ = self.delete_relationship(rel_id);
1951        }
1952
1953        if self.delete_node(node_id) {
1954            self.emit(|| MutationEvent::DetachDeleteNode { node_id });
1955            true
1956        } else {
1957            false
1958        }
1959    }
1960
1961    fn clear(&mut self) {
1962        // Keep the recorder across clear so observers can see the Clear
1963        // event plus whatever follows. Matches WAL semantics where the log
1964        // is the source of truth across a truncation.
1965        let recorder = self.recorder.take();
1966        *self = Self::default();
1967        self.recorder = recorder;
1968        self.emit(|| MutationEvent::Clear);
1969    }
1970}
1971
1972// ---------------------------------------------------------------------------
1973// Snapshotable
1974// ---------------------------------------------------------------------------
1975
1976impl InMemoryGraph {
1977    /// Return the portable graph-state payload without encoding it into the
1978    /// legacy `LORASNAP` file format.
1979    pub fn snapshot_payload(&self) -> SnapshotPayload {
1980        SnapshotPayload {
1981            next_node_id: self.next_node_id,
1982            next_rel_id: self.next_rel_id,
1983            nodes: self.nodes.values().cloned().collect(),
1984            relationships: self.relationships.values().cloned().collect(),
1985        }
1986    }
1987
1988    /// Replace the graph from a portable graph-state payload, preserving the
1989    /// currently installed mutation recorder across the swap.
1990    pub fn load_snapshot_payload(
1991        &mut self,
1992        payload: SnapshotPayload,
1993    ) -> Result<SnapshotMeta, SnapshotError> {
1994        let meta = SnapshotMeta {
1995            format_version: SNAPSHOT_FORMAT_VERSION,
1996            node_count: payload.nodes.len(),
1997            relationship_count: payload.relationships.len(),
1998            wal_lsn: None,
1999        };
2000
2001        // Build the restored graph in a fresh local instance and only
2002        // commit it into `self` at the very end. If a panic fires mid-
2003        // rebuild (e.g. OOM on a HashMap grow) the caller's graph is
2004        // untouched — we never observe a half-populated store.
2005        let mut rebuilt = Self {
2006            next_node_id: payload.next_node_id,
2007            next_rel_id: payload.next_rel_id,
2008            ..Self::default()
2009        };
2010
2011        // Insert nodes + rebuild label index + seed adjacency slots.
2012        for node in payload.nodes {
2013            let id = node.id;
2014            let labels = node.labels.clone();
2015            rebuilt.nodes.insert(id, node);
2016            for label in &labels {
2017                rebuilt.insert_node_label_index(id, label);
2018            }
2019            rebuilt.outgoing.entry(id).or_default();
2020            rebuilt.incoming.entry(id).or_default();
2021        }
2022
2023        // Insert relationships + rebuild adjacency + type index.
2024        for rel in payload.relationships {
2025            rebuilt.attach_relationship(&rel);
2026            rebuilt.relationships.insert(rel.id, rel);
2027        }
2028        rebuilt.rebuild_property_indexes();
2029
2030        // Preserve the existing recorder across the swap — observers of the
2031        // store's identity should not be silently detached by a restore,
2032        // same policy as `clear()`.
2033        rebuilt.recorder = self.recorder.take();
2034        *self = rebuilt;
2035
2036        Ok(meta)
2037    }
2038}
2039
2040impl Snapshotable for InMemoryGraph {
2041    fn save_snapshot<W: std::io::Write>(&self, writer: W) -> Result<SnapshotMeta, SnapshotError> {
2042        let payload = self.snapshot_payload();
2043        write_snapshot(writer, &payload, None)
2044    }
2045
2046    fn save_checkpoint<W: std::io::Write>(
2047        &self,
2048        writer: W,
2049        wal_lsn: u64,
2050    ) -> Result<SnapshotMeta, SnapshotError> {
2051        let payload = self.snapshot_payload();
2052        write_snapshot(writer, &payload, Some(wal_lsn))
2053    }
2054
2055    fn load_snapshot<R: std::io::Read>(
2056        &mut self,
2057        reader: R,
2058    ) -> Result<SnapshotMeta, SnapshotError> {
2059        let (payload, meta) = read_snapshot(reader)?;
2060        self.load_snapshot_payload(payload)?;
2061        Ok(meta)
2062    }
2063}
2064
2065#[cfg(test)]
2066mod tests {
2067    use super::*;
2068
2069    fn props(pairs: &[(&str, PropertyValue)]) -> Properties {
2070        pairs
2071            .iter()
2072            .map(|(k, v)| ((*k).to_string(), v.clone()))
2073            .collect()
2074    }
2075
2076    #[test]
2077    fn create_and_lookup_nodes() {
2078        let mut g = InMemoryGraph::new();
2079
2080        let a = g.create_node(
2081            vec!["Person".into(), "Employee".into()],
2082            props(&[("name", PropertyValue::String("Alice".into()))]),
2083        );
2084        let b = g.create_node(
2085            vec!["Person".into()],
2086            props(&[("name", PropertyValue::String("Bob".into()))]),
2087        );
2088
2089        assert_eq!(a.id, 0);
2090        assert_eq!(b.id, 1);
2091
2092        assert_eq!(g.all_nodes().len(), 2);
2093        assert_eq!(g.nodes_by_label("Person").len(), 2);
2094        assert_eq!(g.nodes_by_label("Employee").len(), 1);
2095        assert_eq!(BorrowedGraphStorage::node_refs(&g).count(), 2);
2096        assert_eq!(
2097            BorrowedGraphStorage::node_refs_by_label(&g, "Person").count(),
2098            2
2099        );
2100        assert!(g.node_has_label(a.id, "Person"));
2101        assert_eq!(
2102            g.node_property(a.id, "name"),
2103            Some(PropertyValue::String("Alice".into()))
2104        );
2105    }
2106
2107    #[test]
2108    fn create_and_expand_relationships() {
2109        let mut g = InMemoryGraph::new();
2110
2111        let a = g.create_node(vec!["Person".into()], Properties::new());
2112        let b = g.create_node(vec!["Person".into()], Properties::new());
2113        let c = g.create_node(vec!["Company".into()], Properties::new());
2114
2115        let r1 = g
2116            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2117            .unwrap();
2118        let r2 = g
2119            .create_relationship(a.id, c.id, "WORKS_AT", Properties::new())
2120            .unwrap();
2121
2122        assert_eq!(g.all_relationships().len(), 2);
2123        assert_eq!(g.relationships_by_type("KNOWS").len(), 1);
2124        assert_eq!(BorrowedGraphStorage::relationship_refs(&g).count(), 2);
2125        assert_eq!(
2126            BorrowedGraphStorage::relationship_refs_by_type(&g, "KNOWS").count(),
2127            1
2128        );
2129        assert_eq!(g.outgoing_relationships(a.id).len(), 2);
2130        assert_eq!(g.incoming_relationships(b.id).len(), 1);
2131
2132        let knows = g.expand(a.id, Direction::Right, &[String::from("KNOWS")]);
2133        assert_eq!(knows.len(), 1);
2134        assert_eq!(knows[0].0.id, r1.id);
2135        assert_eq!(knows[0].1.id, b.id);
2136
2137        let undirected = g.expand(a.id, Direction::Undirected, &[]);
2138        assert_eq!(undirected.len(), 2);
2139
2140        assert_eq!(g.relationship(r2.id).unwrap().dst, c.id);
2141    }
2142
2143    #[test]
2144    fn incoming_and_outgoing_are_distinct() {
2145        let mut g = InMemoryGraph::new();
2146
2147        let a = g.create_node(vec!["Person".into()], Properties::new());
2148        let b = g.create_node(vec!["Person".into()], Properties::new());
2149        let c = g.create_node(vec!["Person".into()], Properties::new());
2150
2151        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
2152            .unwrap();
2153        g.create_relationship(c.id, a.id, "LIKES", Properties::new())
2154            .unwrap();
2155
2156        let outgoing = g.expand(a.id, Direction::Right, &[]);
2157        let incoming = g.expand(a.id, Direction::Left, &[]);
2158
2159        assert_eq!(outgoing.len(), 1);
2160        assert_eq!(incoming.len(), 1);
2161        assert_eq!(outgoing[0].1.id, b.id);
2162        assert_eq!(incoming[0].1.id, c.id);
2163    }
2164
2165    #[test]
2166    fn set_and_remove_properties() {
2167        let mut g = InMemoryGraph::new();
2168
2169        let n = g.create_node(vec!["Person".into()], Properties::new());
2170        assert!(g.set_node_property(n.id, "age".into(), PropertyValue::Int(42)));
2171        assert_eq!(g.node_property(n.id, "age"), Some(PropertyValue::Int(42)));
2172        assert!(g.remove_node_property(n.id, "age"));
2173        assert_eq!(g.node_property(n.id, "age"), None);
2174
2175        let m = g.create_node(vec!["Person".into()], Properties::new());
2176        let r = g
2177            .create_relationship(n.id, m.id, "KNOWS", Properties::new())
2178            .unwrap();
2179
2180        assert!(g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020)));
2181        assert_eq!(
2182            g.relationship_property(r.id, "since"),
2183            Some(PropertyValue::Int(2020))
2184        );
2185        assert!(g.remove_relationship_property(r.id, "since"));
2186        assert_eq!(g.relationship_property(r.id, "since"), None);
2187    }
2188
2189    #[test]
2190    fn node_property_index_tracks_create_set_remove_and_delete() {
2191        let mut g = InMemoryGraph::new();
2192        let alice = g.create_node(
2193            vec!["Person".into()],
2194            props(&[("name", PropertyValue::String("Alice".into()))]),
2195        );
2196        let other_alice = g.create_node(
2197            vec!["Robot".into()],
2198            props(&[("name", PropertyValue::String("Alice".into()))]),
2199        );
2200        let bob = g.create_node(
2201            vec!["Person".into()],
2202            props(&[("name", PropertyValue::String("Bob".into()))]),
2203        );
2204
2205        let alice_value = PropertyValue::String("Alice".into());
2206        assert_eq!(
2207            g.find_nodes_by_property(Some("Person"), "name", &alice_value)
2208                .into_iter()
2209                .map(|n| n.id)
2210                .collect::<Vec<_>>(),
2211            vec![alice.id]
2212        );
2213        assert!(g.node_exists_with_label_and_property("Robot", "name", &alice_value));
2214
2215        assert!(g.set_node_property(
2216            other_alice.id,
2217            "name".into(),
2218            PropertyValue::String("Alicia".into())
2219        ));
2220        assert_eq!(
2221            g.find_nodes_by_property(None, "name", &alice_value)
2222                .into_iter()
2223                .map(|n| n.id)
2224                .collect::<Vec<_>>(),
2225            vec![alice.id]
2226        );
2227
2228        assert!(g.remove_node_property(alice.id, "name"));
2229        assert!(!g.node_exists_with_label_and_property("Person", "name", &alice_value));
2230
2231        assert!(g.delete_node(bob.id));
2232        assert!(!g.node_exists_with_label_and_property(
2233            "Person",
2234            "name",
2235            &PropertyValue::String("Bob".into())
2236        ));
2237    }
2238
2239    #[test]
2240    fn node_property_index_activates_on_lookup_and_tracks_later_create() {
2241        let mut g = InMemoryGraph::new();
2242        let first = g.create_node(
2243            vec!["Person".into()],
2244            props(&[("name", PropertyValue::String("Alice".into()))]),
2245        );
2246
2247        assert!(!g.indexes_read().node_properties.is_active("name"));
2248
2249        let alice = PropertyValue::String("Alice".into());
2250        assert_eq!(
2251            g.find_nodes_by_property(Some("Person"), "name", &alice)
2252                .into_iter()
2253                .map(|node| node.id)
2254                .collect::<Vec<_>>(),
2255            vec![first.id]
2256        );
2257        assert!(g.indexes_read().node_properties.is_active("name"));
2258
2259        let second = g.create_node(
2260            vec!["Person".into()],
2261            props(&[("name", PropertyValue::String("Alice".into()))]),
2262        );
2263        assert_eq!(
2264            g.find_nodes_by_property(Some("Person"), "name", &alice)
2265                .into_iter()
2266                .map(|node| node.id)
2267                .collect::<Vec<_>>(),
2268            vec![first.id, second.id]
2269        );
2270    }
2271
2272    #[test]
2273    fn property_indexes_activate_on_lookup_after_set_for_new_keys() {
2274        let mut g = InMemoryGraph::new();
2275        let node = g.create_node(vec!["Person".into()], Properties::new());
2276
2277        assert!(!g.indexes_read().node_properties.is_active("name"));
2278        assert!(g.set_node_property(
2279            node.id,
2280            "name".into(),
2281            PropertyValue::String("Alice".into())
2282        ));
2283        assert!(!g.indexes_read().node_properties.is_active("name"));
2284        assert_eq!(
2285            g.find_node_ids_by_property(
2286                Some("Person"),
2287                "name",
2288                &PropertyValue::String("Alice".into())
2289            ),
2290            vec![node.id]
2291        );
2292        assert!(g.indexes_read().node_properties.is_active("name"));
2293
2294        let other = g.create_node(vec!["Person".into()], Properties::new());
2295        let rel = g
2296            .create_relationship(node.id, other.id, "KNOWS", Properties::new())
2297            .unwrap();
2298        assert!(!g.indexes_read().relationship_properties.is_active("since"));
2299        assert!(g.set_relationship_property(rel.id, "since".into(), PropertyValue::Int(2020)));
2300        assert!(!g.indexes_read().relationship_properties.is_active("since"));
2301        assert_eq!(
2302            g.find_relationship_ids_by_property(Some("KNOWS"), "since", &PropertyValue::Int(2020)),
2303            vec![rel.id]
2304        );
2305        assert!(g.indexes_read().relationship_properties.is_active("since"));
2306    }
2307
2308    #[test]
2309    fn replay_create_eagerly_activates_property_indexes() {
2310        let mut g = InMemoryGraph::new();
2311        let alice = g
2312            .replay_create_node(
2313                0,
2314                vec!["Person".into()],
2315                props(&[("name", PropertyValue::String("Alice".into()))]),
2316            )
2317            .unwrap();
2318        let bob = g
2319            .replay_create_node(
2320                1,
2321                vec!["Person".into()],
2322                props(&[("name", PropertyValue::String("Bob".into()))]),
2323            )
2324            .unwrap();
2325
2326        assert!(g.indexes_read().node_properties.is_active("name"));
2327        assert_eq!(
2328            g.find_node_ids_by_property(
2329                Some("Person"),
2330                "name",
2331                &PropertyValue::String("Alice".into())
2332            ),
2333            vec![alice.id]
2334        );
2335
2336        let rel = g
2337            .replay_create_relationship(
2338                0,
2339                alice.id,
2340                bob.id,
2341                "KNOWS",
2342                props(&[("since", PropertyValue::Int(2020))]),
2343            )
2344            .unwrap();
2345
2346        assert!(g.indexes_read().relationship_properties.is_active("since"));
2347        assert_eq!(
2348            g.find_relationship_ids_by_property(Some("KNOWS"), "since", &PropertyValue::Int(2020)),
2349            vec![rel.id]
2350        );
2351    }
2352
2353    #[test]
2354    fn node_property_index_tracks_scoped_label_buckets() {
2355        let mut g = InMemoryGraph::new();
2356        let alice = g.create_node(
2357            vec!["Person".into()],
2358            props(&[("name", PropertyValue::String("Alice".into()))]),
2359        );
2360        let robot = g.create_node(
2361            vec!["Robot".into()],
2362            props(&[("name", PropertyValue::String("Alice".into()))]),
2363        );
2364        let alice_value = PropertyValue::String("Alice".into());
2365
2366        assert_eq!(
2367            g.find_node_ids_by_property(Some("Person"), "name", &alice_value),
2368            vec![alice.id]
2369        );
2370        assert_eq!(
2371            g.indexes_read()
2372                .node_properties
2373                .scoped_ids_for("Person", "name", &alice_value)
2374                .cloned()
2375                .unwrap_or_default()
2376                .into_iter()
2377                .collect::<Vec<_>>(),
2378            vec![alice.id]
2379        );
2380
2381        assert!(g.add_node_label(robot.id, "Employee"));
2382        assert_eq!(
2383            g.find_node_ids_by_property(Some("Employee"), "name", &alice_value),
2384            vec![robot.id]
2385        );
2386
2387        assert!(g.remove_node_label(alice.id, "Person"));
2388        assert!(g
2389            .find_node_ids_by_property(Some("Person"), "name", &alice_value)
2390            .is_empty());
2391        assert_eq!(
2392            g.find_node_ids_by_property(None, "name", &alice_value),
2393            vec![alice.id, robot.id]
2394        );
2395    }
2396
2397    #[test]
2398    fn relationship_property_index_tracks_create_set_remove_and_delete() {
2399        let mut g = InMemoryGraph::new();
2400        let a = g.create_node(vec!["Person".into()], Properties::new());
2401        let b = g.create_node(vec!["Person".into()], Properties::new());
2402        let c = g.create_node(vec!["Person".into()], Properties::new());
2403        let first = g
2404            .create_relationship(
2405                a.id,
2406                b.id,
2407                "KNOWS",
2408                props(&[("since", PropertyValue::Int(2020))]),
2409            )
2410            .unwrap();
2411        let second = g
2412            .create_relationship(
2413                b.id,
2414                c.id,
2415                "LIKES",
2416                props(&[("since", PropertyValue::Int(2020))]),
2417            )
2418            .unwrap();
2419
2420        let year = PropertyValue::Int(2020);
2421        assert_eq!(
2422            g.find_relationships_by_property(Some("KNOWS"), "since", &year)
2423                .into_iter()
2424                .map(|r| r.id)
2425                .collect::<Vec<_>>(),
2426            vec![first.id]
2427        );
2428        assert!(g.relationship_exists_with_type_and_property("LIKES", "since", &year));
2429
2430        assert!(g.set_relationship_property(second.id, "since".into(), PropertyValue::Int(2021)));
2431        assert_eq!(
2432            g.find_relationships_by_property(None, "since", &year)
2433                .into_iter()
2434                .map(|r| r.id)
2435                .collect::<Vec<_>>(),
2436            vec![first.id]
2437        );
2438
2439        assert!(g.remove_relationship_property(first.id, "since"));
2440        assert!(!g.relationship_exists_with_type_and_property("KNOWS", "since", &year));
2441
2442        assert!(g.delete_relationship(second.id));
2443        assert!(!g.relationship_exists_with_type_and_property(
2444            "LIKES",
2445            "since",
2446            &PropertyValue::Int(2021)
2447        ));
2448    }
2449
2450    #[test]
2451    fn property_index_falls_back_for_unhashed_values() {
2452        let mut g = InMemoryGraph::new();
2453        let date = PropertyValue::Date(crate::temporal::LoraDate::new(2026, 4, 26).unwrap());
2454        let n = g.create_node(vec!["Event".into()], props(&[("day", date.clone())]));
2455
2456        // Dates are intentionally not hash-indexed yet, so this exercises the
2457        // scan fallback path rather than the secondary index.
2458        assert_eq!(
2459            g.find_nodes_by_property(Some("Event"), "day", &date)
2460                .into_iter()
2461                .map(|node| node.id)
2462                .collect::<Vec<_>>(),
2463            vec![n.id]
2464        );
2465    }
2466
2467    #[test]
2468    fn delete_requires_detach() {
2469        let mut g = InMemoryGraph::new();
2470
2471        let a = g.create_node(vec!["Person".into()], Properties::new());
2472        let b = g.create_node(vec!["Person".into()], Properties::new());
2473        let r = g
2474            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2475            .unwrap();
2476
2477        assert!(!g.delete_node(a.id));
2478        assert!(g.delete_relationship(r.id));
2479        assert!(g.delete_node(a.id));
2480        assert!(g.node(a.id).is_none());
2481    }
2482
2483    #[test]
2484    fn detach_delete_removes_incident_relationships() {
2485        let mut g = InMemoryGraph::new();
2486
2487        let a = g.create_node(vec!["Person".into()], Properties::new());
2488        let b = g.create_node(vec!["Person".into()], Properties::new());
2489        let c = g.create_node(vec!["Person".into()], Properties::new());
2490
2491        let r1 = g
2492            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2493            .unwrap();
2494        let r2 = g
2495            .create_relationship(c.id, a.id, "LIKES", Properties::new())
2496            .unwrap();
2497
2498        assert!(g.detach_delete_node(a.id));
2499        assert!(g.node(a.id).is_none());
2500        assert!(g.relationship(r1.id).is_none());
2501        assert!(g.relationship(r2.id).is_none());
2502        assert_eq!(g.all_relationships().len(), 0);
2503    }
2504
2505    #[test]
2506    fn duplicate_labels_are_normalized_on_create() {
2507        let mut g = InMemoryGraph::new();
2508
2509        let n = g.create_node(
2510            vec!["Person".into(), "Person".into(), "Admin".into()],
2511            Properties::new(),
2512        );
2513
2514        assert_eq!(n.labels, vec!["Person".to_string(), "Admin".to_string()]);
2515        assert_eq!(g.nodes_by_label("Person").len(), 1);
2516        assert_eq!(g.nodes_by_label("Admin").len(), 1);
2517    }
2518
2519    #[test]
2520    fn empty_labels_are_ignored() {
2521        let mut g = InMemoryGraph::new();
2522
2523        let n = g.create_node(
2524            vec!["Person".into(), "".into(), "   ".into()],
2525            Properties::new(),
2526        );
2527
2528        assert_eq!(n.labels, vec!["Person".to_string()]);
2529    }
2530
2531    #[test]
2532    fn empty_relationship_type_is_rejected() {
2533        let mut g = InMemoryGraph::new();
2534
2535        let a = g.create_node(vec!["A".into()], Properties::new());
2536        let b = g.create_node(vec!["B".into()], Properties::new());
2537
2538        assert!(g
2539            .create_relationship(a.id, b.id, "", Properties::new())
2540            .is_none());
2541    }
2542
2543    #[test]
2544    fn storage_schema_helpers_work() {
2545        let mut g = InMemoryGraph::new();
2546
2547        let a = g.create_node(
2548            vec!["Person".into()],
2549            props(&[("name", PropertyValue::String("Alice".into()))]),
2550        );
2551        let b = g.create_node(
2552            vec!["Company".into()],
2553            props(&[("title", PropertyValue::String("Acme".into()))]),
2554        );
2555
2556        g.create_relationship(
2557            a.id,
2558            b.id,
2559            "WORKS_AT",
2560            props(&[("since", PropertyValue::Int(2020))]),
2561        )
2562        .unwrap();
2563
2564        assert!(g.has_label_name("Person"));
2565        assert!(g.has_relationship_type_name("WORKS_AT"));
2566        assert!(g.has_property_key("name"));
2567        assert!(g.has_property_key("since"));
2568        assert!(g.label_has_property_key("Person", "name"));
2569        assert!(g.rel_type_has_property_key("WORKS_AT", "since"));
2570    }
2571
2572    #[test]
2573    fn clear_resets_the_graph() {
2574        let mut g = InMemoryGraph::new();
2575        let a = g.create_node(vec!["Person".into()], Properties::new());
2576        let b = g.create_node(vec!["Person".into()], Properties::new());
2577        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
2578            .unwrap();
2579
2580        assert_eq!(g.node_count(), 2);
2581        assert_eq!(g.relationship_count(), 1);
2582
2583        g.clear();
2584
2585        assert_eq!(g.node_count(), 0);
2586        assert_eq!(g.relationship_count(), 0);
2587        assert_eq!(g.all_labels().len(), 0);
2588    }
2589
2590    #[test]
2591    fn snapshot_roundtrip_preserves_graph_state() {
2592        let mut original = InMemoryGraph::new();
2593        let a = original.create_node(
2594            vec!["Person".into()],
2595            props(&[("name", PropertyValue::String("Alice".into()))]),
2596        );
2597        let b = original.create_node(
2598            vec!["Person".into()],
2599            props(&[("name", PropertyValue::String("Bob".into()))]),
2600        );
2601        let r = original
2602            .create_relationship(
2603                a.id,
2604                b.id,
2605                "KNOWS",
2606                props(&[("since", PropertyValue::Int(2020))]),
2607            )
2608            .unwrap();
2609
2610        let mut buf = Vec::new();
2611        let save_meta = original.save_snapshot(&mut buf).unwrap();
2612        assert_eq!(save_meta.node_count, 2);
2613        assert_eq!(save_meta.relationship_count, 1);
2614        assert_eq!(save_meta.wal_lsn, None);
2615
2616        let mut restored = InMemoryGraph::new();
2617        let load_meta = restored.load_snapshot(&buf[..]).unwrap();
2618        assert_eq!(load_meta, save_meta);
2619        assert!(restored.indexes_read().node_properties.is_active("name"));
2620        assert!(restored
2621            .indexes_read()
2622            .relationship_properties
2623            .is_active("since"));
2624
2625        assert_eq!(restored.node_count(), 2);
2626        assert_eq!(restored.relationship_count(), 1);
2627        assert_eq!(
2628            restored.node_property(a.id, "name"),
2629            Some(PropertyValue::String("Alice".into()))
2630        );
2631        assert_eq!(
2632            restored.relationship_property(r.id, "since"),
2633            Some(PropertyValue::Int(2020))
2634        );
2635
2636        // Adjacency + label index were rebuilt on load.
2637        assert_eq!(restored.outgoing_relationships(a.id).len(), 1);
2638        assert_eq!(restored.nodes_by_label("Person").len(), 2);
2639        assert!(restored.node_exists_with_label_and_property(
2640            "Person",
2641            "name",
2642            &PropertyValue::String("Alice".into())
2643        ));
2644        assert!(restored.relationship_exists_with_type_and_property(
2645            "KNOWS",
2646            "since",
2647            &PropertyValue::Int(2020)
2648        ));
2649
2650        // Counters carry over so new IDs don't collide with pre-snapshot IDs.
2651        let c = restored.create_node(vec!["Person".into()], Properties::new());
2652        assert_eq!(c.id, b.id + 1);
2653    }
2654
2655    #[test]
2656    fn mutation_recorder_observes_every_committed_mutation() {
2657        use std::sync::Mutex;
2658
2659        #[derive(Default)]
2660        struct CapturingRecorder {
2661            events: Mutex<Vec<MutationEvent>>,
2662        }
2663
2664        impl MutationRecorder for CapturingRecorder {
2665            fn record(&self, event: MutationEvent) {
2666                self.events.lock().unwrap().push(event);
2667            }
2668        }
2669
2670        let recorder = Arc::new(CapturingRecorder::default());
2671        let mut g = InMemoryGraph::new();
2672        g.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
2673
2674        let a = g.create_node(vec!["Person".into()], Properties::new());
2675        let b = g.create_node(vec!["Person".into()], Properties::new());
2676        let r = g
2677            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
2678            .unwrap();
2679        g.set_node_property(a.id, "name".into(), PropertyValue::String("Alice".into()));
2680        g.remove_node_property(a.id, "name");
2681        g.add_node_label(a.id, "Admin");
2682        g.remove_node_label(a.id, "Admin");
2683        g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020));
2684        g.remove_relationship_property(r.id, "since");
2685        g.detach_delete_node(a.id);
2686        g.clear();
2687
2688        let events = recorder.events.lock().unwrap().clone();
2689        assert!(matches!(events[0], MutationEvent::CreateNode { .. }));
2690        assert!(matches!(events[1], MutationEvent::CreateNode { .. }));
2691        assert!(matches!(
2692            events[2],
2693            MutationEvent::CreateRelationship { .. }
2694        ));
2695        assert!(matches!(events[3], MutationEvent::SetNodeProperty { .. }));
2696        assert!(matches!(
2697            events[4],
2698            MutationEvent::RemoveNodeProperty { .. }
2699        ));
2700        assert!(matches!(events[5], MutationEvent::AddNodeLabel { .. }));
2701        assert!(matches!(events[6], MutationEvent::RemoveNodeLabel { .. }));
2702        assert!(matches!(
2703            events[7],
2704            MutationEvent::SetRelationshipProperty { .. }
2705        ));
2706        assert!(matches!(
2707            events[8],
2708            MutationEvent::RemoveRelationshipProperty { .. }
2709        ));
2710        // detach_delete_node composes three kinds of events: one
2711        // DeleteRelationship per incident edge, one DeleteNode for the node
2712        // itself, and a final DetachDeleteNode marker. A WAL replayer can
2713        // either apply every step or recognise the marker and skip forward.
2714        assert!(matches!(
2715            events[9],
2716            MutationEvent::DeleteRelationship { .. }
2717        ));
2718        assert!(matches!(events[10], MutationEvent::DeleteNode { .. }));
2719        assert!(matches!(events[11], MutationEvent::DetachDeleteNode { .. }));
2720        assert!(matches!(events.last(), Some(MutationEvent::Clear)));
2721
2722        // Failed mutations (invalid id) do not emit events.
2723        let before = recorder.events.lock().unwrap().len();
2724        assert!(!g.set_node_property(9999, "x".into(), PropertyValue::Int(0)));
2725        assert_eq!(recorder.events.lock().unwrap().len(), before);
2726    }
2727
2728    #[test]
2729    fn snapshot_load_resets_but_keeps_recorder() {
2730        use std::sync::Mutex;
2731
2732        struct CountingRecorder(Mutex<usize>);
2733        impl MutationRecorder for CountingRecorder {
2734            fn record(&self, _: MutationEvent) {
2735                *self.0.lock().unwrap() += 1;
2736            }
2737        }
2738
2739        let counter: Arc<dyn MutationRecorder> = Arc::new(CountingRecorder(Mutex::new(0)));
2740        let mut g = InMemoryGraph::new();
2741        g.set_mutation_recorder(Some(counter));
2742        g.create_node(vec!["A".into()], Properties::new());
2743
2744        let mut buf = Vec::new();
2745        g.save_snapshot(&mut buf).unwrap();
2746
2747        // Load into the same graph — recorder should survive, store state
2748        // should be replaced by the snapshot contents.
2749        g.load_snapshot(&buf[..]).unwrap();
2750        assert!(g.mutation_recorder().is_some());
2751        assert_eq!(g.node_count(), 1);
2752
2753        // Subsequent mutations still feed the recorder.
2754        g.create_node(vec!["B".into()], Properties::new());
2755        // 1 for the initial A + 1 for the post-load B. The restore path
2756        // itself does not emit events (that's a snapshot, not a mutation).
2757    }
2758}