Skip to main content

lora_store/
memory.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use lora_ast::Direction;
5
6use crate::snapshot::{read_snapshot, write_snapshot};
7use crate::{
8    BorrowedGraphStorage, GraphStorage, GraphStorageMut, MutationEvent, MutationRecorder, NodeId,
9    NodeRecord, Properties, PropertyValue, RelationshipId, RelationshipRecord, SnapshotError,
10    SnapshotMeta, SnapshotPayload, Snapshotable,
11};
12
13#[derive(Default)]
14pub struct InMemoryGraph {
15    next_node_id: NodeId,
16    next_rel_id: RelationshipId,
17
18    nodes: BTreeMap<NodeId, NodeRecord>,
19    relationships: BTreeMap<RelationshipId, RelationshipRecord>,
20
21    // adjacency
22    outgoing: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
23    incoming: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
24
25    // secondary indexes
26    nodes_by_label: BTreeMap<String, BTreeSet<NodeId>>,
27    relationships_by_type: BTreeMap<String, BTreeSet<RelationshipId>>,
28
29    /// Optional mutation observer. When `Some`, every committed mutation
30    /// fans out to this recorder *after* the in-memory state has been
31    /// updated. The recorder is not part of the graph's identity, so Clone
32    /// and snapshot restore both reset it to `None`.
33    recorder: Option<Arc<dyn MutationRecorder>>,
34}
35
36impl std::fmt::Debug for InMemoryGraph {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("InMemoryGraph")
39            .field("next_node_id", &self.next_node_id)
40            .field("next_rel_id", &self.next_rel_id)
41            .field("nodes", &self.nodes)
42            .field("relationships", &self.relationships)
43            .field("outgoing", &self.outgoing)
44            .field("incoming", &self.incoming)
45            .field("nodes_by_label", &self.nodes_by_label)
46            .field("relationships_by_type", &self.relationships_by_type)
47            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
48            .finish()
49    }
50}
51
52impl Clone for InMemoryGraph {
53    fn clone(&self) -> Self {
54        // Deliberately drop the recorder on clone: a cloned store is a
55        // separate identity; it should not silently share the observer.
56        Self {
57            next_node_id: self.next_node_id,
58            next_rel_id: self.next_rel_id,
59            nodes: self.nodes.clone(),
60            relationships: self.relationships.clone(),
61            outgoing: self.outgoing.clone(),
62            incoming: self.incoming.clone(),
63            nodes_by_label: self.nodes_by_label.clone(),
64            relationships_by_type: self.relationships_by_type.clone(),
65            recorder: None,
66        }
67    }
68}
69
70impl InMemoryGraph {
71    pub fn new() -> Self {
72        Self::default()
73    }
74
75    pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
76        // BTreeMap/BTreeSet do not support capacity reservation.
77        Self::default()
78    }
79
80    pub fn contains_node(&self, node_id: NodeId) -> bool {
81        self.nodes.contains_key(&node_id)
82    }
83
84    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
85        self.relationships.contains_key(&rel_id)
86    }
87
88    /// Install (or clear) the mutation recorder. Passing `None` detaches any
89    /// currently-installed recorder. The recorder observes every committed
90    /// mutation *after* it has been applied.
91    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
92        self.recorder = recorder;
93    }
94
95    /// Handle to the currently-installed recorder, if any.
96    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
97        self.recorder.as_ref()
98    }
99
100    /// Emit a mutation event only if a recorder is installed. The event is
101    /// built lazily — callers pass a closure, so when no recorder is
102    /// attached we pay only a `None` check and the cost of constructing the
103    /// event (labels/properties clones) is avoided.
104    #[inline]
105    fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
106        if let Some(rec) = &self.recorder {
107            rec.record(&build());
108        }
109    }
110
111    fn alloc_node_id(&mut self) -> NodeId {
112        let id = self.next_node_id;
113        self.next_node_id += 1;
114        id
115    }
116
117    fn alloc_rel_id(&mut self) -> RelationshipId {
118        let id = self.next_rel_id;
119        self.next_rel_id += 1;
120        id
121    }
122
123    fn normalize_labels(labels: Vec<String>) -> Vec<String> {
124        let mut seen = BTreeSet::new();
125
126        labels
127            .into_iter()
128            .map(|s| s.trim().to_string())
129            .filter(|s| !s.is_empty())
130            .filter(|s| seen.insert(s.clone()))
131            .collect()
132    }
133
134    fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
135        self.nodes_by_label
136            .entry(label.to_string())
137            .or_default()
138            .insert(node_id);
139    }
140
141    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
142        if let Some(ids) = self.nodes_by_label.get_mut(label) {
143            ids.remove(&node_id);
144            if ids.is_empty() {
145                self.nodes_by_label.remove(label);
146            }
147        }
148    }
149
150    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
151        self.relationships_by_type
152            .entry(rel_type.to_string())
153            .or_default()
154            .insert(rel_id);
155    }
156
157    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
158        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
159            ids.remove(&rel_id);
160            if ids.is_empty() {
161                self.relationships_by_type.remove(rel_type);
162            }
163        }
164    }
165
166    fn attach_relationship(&mut self, rel: &RelationshipRecord) {
167        self.outgoing.entry(rel.src).or_default().insert(rel.id);
168        self.incoming.entry(rel.dst).or_default().insert(rel.id);
169        self.insert_relationship_type_index(rel.id, &rel.rel_type);
170    }
171
172    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
173        if let Some(ids) = self.outgoing.get_mut(&rel.src) {
174            ids.remove(&rel.id);
175            if ids.is_empty() {
176                self.outgoing.remove(&rel.src);
177            }
178        }
179
180        if let Some(ids) = self.incoming.get_mut(&rel.dst) {
181            ids.remove(&rel.id);
182            if ids.is_empty() {
183                self.incoming.remove(&rel.dst);
184            }
185        }
186
187        self.remove_relationship_type_index(rel.id, &rel.rel_type);
188    }
189
190    fn relationship_ids_for_direction(
191        &self,
192        node_id: NodeId,
193        direction: Direction,
194    ) -> Vec<RelationshipId> {
195        match direction {
196            Direction::Left => self
197                .incoming
198                .get(&node_id)
199                .map(|ids| ids.iter().copied().collect())
200                .unwrap_or_default(),
201
202            Direction::Right => self
203                .outgoing
204                .get(&node_id)
205                .map(|ids| ids.iter().copied().collect())
206                .unwrap_or_default(),
207
208            Direction::Undirected => {
209                let mut ids = BTreeSet::new();
210
211                if let Some(out) = self.outgoing.get(&node_id) {
212                    ids.extend(out.iter().copied());
213                }
214                if let Some(inc) = self.incoming.get(&node_id) {
215                    ids.extend(inc.iter().copied());
216                }
217
218                ids.into_iter().collect()
219            }
220        }
221    }
222
223    fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
224        if rel.src == node_id {
225            Some(rel.dst)
226        } else if rel.dst == node_id {
227            Some(rel.src)
228        } else {
229            None
230        }
231    }
232
233    fn has_incident_relationships(&self, node_id: NodeId) -> bool {
234        self.outgoing
235            .get(&node_id)
236            .map(|ids| !ids.is_empty())
237            .unwrap_or(false)
238            || self
239                .incoming
240                .get(&node_id)
241                .map(|ids| !ids.is_empty())
242                .unwrap_or(false)
243    }
244
245    fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
246        let mut rel_ids = BTreeSet::new();
247
248        if let Some(ids) = self.outgoing.get(&node_id) {
249            rel_ids.extend(ids.iter().copied());
250        }
251        if let Some(ids) = self.incoming.get(&node_id) {
252            rel_ids.extend(ids.iter().copied());
253        }
254
255        rel_ids
256    }
257}
258
259impl GraphStorage for InMemoryGraph {
260    // ---------- Required primitives ----------
261
262    fn contains_node(&self, id: NodeId) -> bool {
263        self.nodes.contains_key(&id)
264    }
265
266    fn node(&self, id: NodeId) -> Option<NodeRecord> {
267        self.nodes.get(&id).cloned()
268    }
269
270    fn all_node_ids(&self) -> Vec<NodeId> {
271        self.nodes.keys().copied().collect()
272    }
273
274    fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
275        match self.nodes_by_label.get(label) {
276            Some(ids) => ids.iter().copied().collect(),
277            None => Vec::new(),
278        }
279    }
280
281    fn contains_relationship(&self, id: RelationshipId) -> bool {
282        self.relationships.contains_key(&id)
283    }
284
285    fn relationship(&self, id: RelationshipId) -> Option<RelationshipRecord> {
286        self.relationships.get(&id).cloned()
287    }
288
289    fn all_rel_ids(&self) -> Vec<RelationshipId> {
290        self.relationships.keys().copied().collect()
291    }
292
293    fn rel_ids_by_type(&self, rel_type: &str) -> Vec<RelationshipId> {
294        match self.relationships_by_type.get(rel_type) {
295            Some(ids) => ids.iter().copied().collect(),
296            None => Vec::new(),
297        }
298    }
299
300    fn relationship_endpoints(&self, id: RelationshipId) -> Option<(NodeId, NodeId)> {
301        self.relationships.get(&id).map(|r| (r.src, r.dst))
302    }
303
304    fn expand_ids(
305        &self,
306        node_id: NodeId,
307        direction: Direction,
308        types: &[String],
309    ) -> Vec<(RelationshipId, NodeId)> {
310        if !self.nodes.contains_key(&node_id) {
311            return Vec::new();
312        }
313
314        // Fast path: no type filter — just join adjacency + rel endpoints.
315        if types.is_empty() {
316            return self
317                .relationship_ids_for_direction(node_id, direction)
318                .into_iter()
319                .filter_map(|rel_id| {
320                    let rel = self.relationships.get(&rel_id)?;
321                    let other_id = Self::other_endpoint(rel, node_id)?;
322                    Some((rel_id, other_id))
323                })
324                .collect();
325        }
326
327        // Type-filtered: borrow rel_type straight from the stored record.
328        // For small type lists (the common case) the linear scan beats a
329        // BTreeSet; we keep it allocation-free.
330        self.relationship_ids_for_direction(node_id, direction)
331            .into_iter()
332            .filter_map(|rel_id| {
333                let rel = self.relationships.get(&rel_id)?;
334                if !types.iter().any(|t| t == &rel.rel_type) {
335                    return None;
336                }
337                let other_id = Self::other_endpoint(rel, node_id)?;
338                Some((rel_id, other_id))
339            })
340            .collect()
341    }
342
343    fn all_labels(&self) -> Vec<String> {
344        self.nodes_by_label.keys().cloned().collect()
345    }
346
347    fn all_relationship_types(&self) -> Vec<String> {
348        self.relationships_by_type.keys().cloned().collect()
349    }
350
351    // ---------- Optimization hooks: zero-clone borrow access ----------
352
353    fn with_node<F, R>(&self, id: NodeId, f: F) -> Option<R>
354    where
355        F: FnOnce(&NodeRecord) -> R,
356        Self: Sized,
357    {
358        self.nodes.get(&id).map(f)
359    }
360
361    fn with_relationship<F, R>(&self, id: RelationshipId, f: F) -> Option<R>
362    where
363        F: FnOnce(&RelationshipRecord) -> R,
364        Self: Sized,
365    {
366        self.relationships.get(&id).map(f)
367    }
368
369    // ---------- Overrides: counts + existence ----------
370
371    fn node_count(&self) -> usize {
372        self.nodes.len()
373    }
374
375    fn relationship_count(&self) -> usize {
376        self.relationships.len()
377    }
378
379    fn has_node(&self, id: NodeId) -> bool {
380        self.nodes.contains_key(&id)
381    }
382
383    fn has_relationship(&self, id: RelationshipId) -> bool {
384        self.relationships.contains_key(&id)
385    }
386
387    // ---------- Overrides: record-returning scans (direct iteration) ----------
388
389    fn all_nodes(&self) -> Vec<NodeRecord> {
390        self.nodes.values().cloned().collect()
391    }
392
393    fn nodes_by_label(&self, label: &str) -> Vec<NodeRecord> {
394        self.nodes_by_label
395            .get(label)
396            .into_iter()
397            .flat_map(|ids| ids.iter())
398            .filter_map(|id| self.nodes.get(id).cloned())
399            .collect()
400    }
401
402    fn all_relationships(&self) -> Vec<RelationshipRecord> {
403        self.relationships.values().cloned().collect()
404    }
405
406    fn relationships_by_type(&self, rel_type: &str) -> Vec<RelationshipRecord> {
407        self.relationships_by_type
408            .get(rel_type)
409            .into_iter()
410            .flat_map(|ids| ids.iter())
411            .filter_map(|id| self.relationships.get(id).cloned())
412            .collect()
413    }
414
415    // ---------- Overrides: traversal (direct adjacency) ----------
416
417    fn relationship_ids_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipId> {
418        self.relationship_ids_for_direction(node_id, direction)
419    }
420
421    fn outgoing_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
422        self.outgoing
423            .get(&node_id)
424            .into_iter()
425            .flat_map(|ids| ids.iter())
426            .filter_map(|id| self.relationships.get(id).cloned())
427            .collect()
428    }
429
430    fn incoming_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
431        self.incoming
432            .get(&node_id)
433            .into_iter()
434            .flat_map(|ids| ids.iter())
435            .filter_map(|id| self.relationships.get(id).cloned())
436            .collect()
437    }
438
439    fn degree(&self, node_id: NodeId, direction: Direction) -> usize {
440        match direction {
441            Direction::Right => self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0),
442            Direction::Left => self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0),
443            Direction::Undirected => {
444                self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0)
445                    + self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0)
446            }
447        }
448    }
449
450    fn expand(
451        &self,
452        node_id: NodeId,
453        direction: Direction,
454        types: &[String],
455    ) -> Vec<(RelationshipRecord, NodeRecord)> {
456        if !self.nodes.contains_key(&node_id) {
457            return Vec::new();
458        }
459
460        let type_filter: Option<BTreeSet<&str>> = if types.is_empty() {
461            None
462        } else {
463            Some(types.iter().map(String::as_str).collect())
464        };
465
466        self.relationship_ids_for_direction(node_id, direction)
467            .into_iter()
468            .filter_map(|rel_id| self.relationships.get(&rel_id))
469            .filter(|rel| {
470                type_filter
471                    .as_ref()
472                    .map(|allowed| allowed.contains(rel.rel_type.as_str()))
473                    .unwrap_or(true)
474            })
475            .filter_map(|rel| {
476                let other_id = Self::other_endpoint(rel, node_id)?;
477                let other = self.nodes.get(&other_id)?;
478                Some((rel.clone(), other.clone()))
479            })
480            .collect()
481    }
482
483    // ---------- Overrides: schema introspection ----------
484
485    fn all_node_property_keys(&self) -> Vec<String> {
486        let mut keys = BTreeSet::new();
487        for node in self.nodes.values() {
488            for key in node.properties.keys() {
489                keys.insert(key.clone());
490            }
491        }
492        keys.into_iter().collect()
493    }
494
495    fn all_relationship_property_keys(&self) -> Vec<String> {
496        let mut keys = BTreeSet::new();
497        for rel in self.relationships.values() {
498            for key in rel.properties.keys() {
499                keys.insert(key.clone());
500            }
501        }
502        keys.into_iter().collect()
503    }
504
505    fn label_property_keys(&self, label: &str) -> Vec<String> {
506        let mut keys = BTreeSet::new();
507
508        if let Some(ids) = self.nodes_by_label.get(label) {
509            for id in ids {
510                if let Some(node) = self.nodes.get(id) {
511                    for key in node.properties.keys() {
512                        keys.insert(key.clone());
513                    }
514                }
515            }
516        }
517
518        keys.into_iter().collect()
519    }
520
521    fn rel_type_property_keys(&self, rel_type: &str) -> Vec<String> {
522        let mut keys = BTreeSet::new();
523
524        if let Some(ids) = self.relationships_by_type.get(rel_type) {
525            for id in ids {
526                if let Some(rel) = self.relationships.get(id) {
527                    for key in rel.properties.keys() {
528                        keys.insert(key.clone());
529                    }
530                }
531            }
532        }
533
534        keys.into_iter().collect()
535    }
536}
537
538impl BorrowedGraphStorage for InMemoryGraph {
539    fn node_ref(&self, id: NodeId) -> Option<&NodeRecord> {
540        self.nodes.get(&id)
541    }
542
543    fn relationship_ref(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
544        self.relationships.get(&id)
545    }
546}
547
548impl GraphStorageMut for InMemoryGraph {
549    fn create_node(&mut self, labels: Vec<String>, properties: Properties) -> NodeRecord {
550        let id = self.alloc_node_id();
551        let labels = Self::normalize_labels(labels);
552
553        let node = NodeRecord {
554            id,
555            labels: labels.clone(),
556            properties,
557        };
558
559        self.nodes.insert(id, node.clone());
560
561        for label in &labels {
562            self.insert_node_label_index(id, label);
563        }
564
565        self.outgoing.entry(id).or_default();
566        self.incoming.entry(id).or_default();
567
568        self.emit(|| MutationEvent::CreateNode {
569            id,
570            labels: node.labels.clone(),
571            properties: node.properties.clone(),
572        });
573
574        node
575    }
576
577    fn create_relationship(
578        &mut self,
579        src: NodeId,
580        dst: NodeId,
581        rel_type: &str,
582        properties: Properties,
583    ) -> Option<RelationshipRecord> {
584        if !self.nodes.contains_key(&src) || !self.nodes.contains_key(&dst) {
585            return None;
586        }
587
588        let trimmed = rel_type.trim();
589        if trimmed.is_empty() {
590            return None;
591        }
592
593        let id = self.alloc_rel_id();
594        let rel = RelationshipRecord {
595            id,
596            src,
597            dst,
598            rel_type: trimmed.to_string(),
599            properties,
600        };
601
602        self.attach_relationship(&rel);
603        self.relationships.insert(id, rel.clone());
604
605        self.emit(|| MutationEvent::CreateRelationship {
606            id,
607            src,
608            dst,
609            rel_type: rel.rel_type.clone(),
610            properties: rel.properties.clone(),
611        });
612
613        Some(rel)
614    }
615
616    fn set_node_property(&mut self, node_id: NodeId, key: String, value: PropertyValue) -> bool {
617        // Hot path: the common case is `recorder = None`. Insert by value
618        // (no key/value clones); only clone when a recorder is attached.
619        let recorder_active = self.recorder.is_some();
620        let (stored_key, stored_value) = if recorder_active {
621            (Some(key.clone()), Some(value.clone()))
622        } else {
623            (None, None)
624        };
625
626        let applied = match self.nodes.get_mut(&node_id) {
627            Some(node) => {
628                node.properties.insert(key, value);
629                true
630            }
631            None => false,
632        };
633        if applied {
634            self.emit(|| MutationEvent::SetNodeProperty {
635                node_id,
636                key: stored_key.unwrap(),
637                value: stored_value.unwrap(),
638            });
639        }
640        applied
641    }
642
643    fn remove_node_property(&mut self, node_id: NodeId, key: &str) -> bool {
644        let applied = match self.nodes.get_mut(&node_id) {
645            Some(node) => node.properties.remove(key).is_some(),
646            None => false,
647        };
648        if applied {
649            self.emit(|| MutationEvent::RemoveNodeProperty {
650                node_id,
651                key: key.to_string(),
652            });
653        }
654        applied
655    }
656
657    fn add_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
658        let label = label.trim();
659        if label.is_empty() {
660            return false;
661        }
662
663        let applied = match self.nodes.get_mut(&node_id) {
664            Some(node) => {
665                if node.labels.iter().any(|l| l == label) {
666                    return false;
667                }
668
669                node.labels.push(label.to_string());
670                self.insert_node_label_index(node_id, label);
671                true
672            }
673            None => false,
674        };
675        if applied {
676            self.emit(|| MutationEvent::AddNodeLabel {
677                node_id,
678                label: label.to_string(),
679            });
680        }
681        applied
682    }
683
684    fn remove_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
685        let applied = match self.nodes.get_mut(&node_id) {
686            Some(node) => {
687                let original_len = node.labels.len();
688                node.labels.retain(|l| l != label);
689
690                if node.labels.len() != original_len {
691                    self.remove_node_label_index(node_id, label);
692                    true
693                } else {
694                    false
695                }
696            }
697            None => false,
698        };
699        if applied {
700            self.emit(|| MutationEvent::RemoveNodeLabel {
701                node_id,
702                label: label.to_string(),
703            });
704        }
705        applied
706    }
707
708    fn set_relationship_property(
709        &mut self,
710        rel_id: RelationshipId,
711        key: String,
712        value: PropertyValue,
713    ) -> bool {
714        let recorder_active = self.recorder.is_some();
715        let (stored_key, stored_value) = if recorder_active {
716            (Some(key.clone()), Some(value.clone()))
717        } else {
718            (None, None)
719        };
720
721        let applied = match self.relationships.get_mut(&rel_id) {
722            Some(rel) => {
723                rel.properties.insert(key, value);
724                true
725            }
726            None => false,
727        };
728        if applied {
729            self.emit(|| MutationEvent::SetRelationshipProperty {
730                rel_id,
731                key: stored_key.unwrap(),
732                value: stored_value.unwrap(),
733            });
734        }
735        applied
736    }
737
738    fn remove_relationship_property(&mut self, rel_id: RelationshipId, key: &str) -> bool {
739        let applied = match self.relationships.get_mut(&rel_id) {
740            Some(rel) => rel.properties.remove(key).is_some(),
741            None => false,
742        };
743        if applied {
744            self.emit(|| MutationEvent::RemoveRelationshipProperty {
745                rel_id,
746                key: key.to_string(),
747            });
748        }
749        applied
750    }
751
752    fn delete_relationship(&mut self, rel_id: RelationshipId) -> bool {
753        let applied = match self.relationships.remove(&rel_id) {
754            Some(rel) => {
755                self.detach_relationship_indexes(&rel);
756                true
757            }
758            None => false,
759        };
760        if applied {
761            self.emit(|| MutationEvent::DeleteRelationship { rel_id });
762        }
763        applied
764    }
765
766    fn delete_node(&mut self, node_id: NodeId) -> bool {
767        if !self.nodes.contains_key(&node_id) {
768            return false;
769        }
770
771        if self.has_incident_relationships(node_id) {
772            return false;
773        }
774
775        let node = match self.nodes.remove(&node_id) {
776            Some(node) => node,
777            None => return false,
778        };
779
780        for label in &node.labels {
781            self.remove_node_label_index(node_id, label);
782        }
783
784        self.outgoing.remove(&node_id);
785        self.incoming.remove(&node_id);
786
787        self.emit(|| MutationEvent::DeleteNode { node_id });
788
789        true
790    }
791
792    fn detach_delete_node(&mut self, node_id: NodeId) -> bool {
793        if !self.nodes.contains_key(&node_id) {
794            return false;
795        }
796
797        let rel_ids: Vec<_> = self
798            .incident_relationship_ids(node_id)
799            .into_iter()
800            .collect();
801
802        // We deliberately fire per-relationship DeleteRelationship events
803        // here (via `delete_relationship`) and a DetachDeleteNode event at
804        // the end. A WAL replayer that sees DetachDeleteNode can ignore the
805        // preceding DeleteRelationship events — or, equivalently, replay
806        // them and the DetachDeleteNode becomes a no-op on the remaining
807        // (now-empty) node. The emit-before-delete choice costs one extra
808        // event per mutation but keeps the replay contract simple:
809        // "apply every event in order".
810        for rel_id in rel_ids {
811            let _ = self.delete_relationship(rel_id);
812        }
813
814        if self.delete_node(node_id) {
815            self.emit(|| MutationEvent::DetachDeleteNode { node_id });
816            true
817        } else {
818            false
819        }
820    }
821
822    fn clear(&mut self) {
823        // Keep the recorder across clear so observers can see the Clear
824        // event plus whatever follows. Matches WAL semantics where the log
825        // is the source of truth across a truncation.
826        let recorder = self.recorder.take();
827        *self = Self::default();
828        self.recorder = recorder;
829        self.emit(|| MutationEvent::Clear);
830    }
831}
832
833// ---------------------------------------------------------------------------
834// Snapshotable
835// ---------------------------------------------------------------------------
836
837impl Snapshotable for InMemoryGraph {
838    fn save_snapshot<W: std::io::Write>(&self, writer: W) -> Result<SnapshotMeta, SnapshotError> {
839        let payload = SnapshotPayload {
840            next_node_id: self.next_node_id,
841            next_rel_id: self.next_rel_id,
842            nodes: self.nodes.values().cloned().collect(),
843            relationships: self.relationships.values().cloned().collect(),
844        };
845        write_snapshot(writer, &payload, None)
846    }
847
848    fn load_snapshot<R: std::io::Read>(
849        &mut self,
850        reader: R,
851    ) -> Result<SnapshotMeta, SnapshotError> {
852        let (payload, meta) = read_snapshot(reader)?;
853
854        // Build the restored graph in a fresh local instance and only
855        // commit it into `self` at the very end. If a panic fires mid-
856        // rebuild (e.g. OOM on a HashMap grow) the caller's graph is
857        // untouched — we never observe a half-populated store.
858        let mut rebuilt = Self {
859            next_node_id: payload.next_node_id,
860            next_rel_id: payload.next_rel_id,
861            ..Self::default()
862        };
863
864        // Insert nodes + rebuild label index + seed adjacency slots.
865        for node in payload.nodes {
866            let id = node.id;
867            let labels = node.labels.clone();
868            rebuilt.nodes.insert(id, node);
869            for label in &labels {
870                rebuilt.insert_node_label_index(id, label);
871            }
872            rebuilt.outgoing.entry(id).or_default();
873            rebuilt.incoming.entry(id).or_default();
874        }
875
876        // Insert relationships + rebuild adjacency + type index.
877        for rel in payload.relationships {
878            rebuilt.attach_relationship(&rel);
879            rebuilt.relationships.insert(rel.id, rel);
880        }
881
882        // Preserve the existing recorder across the swap — observers of the
883        // store's identity should not be silently detached by a restore,
884        // same policy as `clear()`.
885        rebuilt.recorder = self.recorder.take();
886        *self = rebuilt;
887
888        Ok(meta)
889    }
890}
891
892#[cfg(test)]
893mod tests {
894    use super::*;
895
896    fn props(pairs: &[(&str, PropertyValue)]) -> Properties {
897        pairs
898            .iter()
899            .map(|(k, v)| ((*k).to_string(), v.clone()))
900            .collect()
901    }
902
903    #[test]
904    fn create_and_lookup_nodes() {
905        let mut g = InMemoryGraph::new();
906
907        let a = g.create_node(
908            vec!["Person".into(), "Employee".into()],
909            props(&[("name", PropertyValue::String("Alice".into()))]),
910        );
911        let b = g.create_node(
912            vec!["Person".into()],
913            props(&[("name", PropertyValue::String("Bob".into()))]),
914        );
915
916        assert_eq!(a.id, 0);
917        assert_eq!(b.id, 1);
918
919        assert_eq!(g.all_nodes().len(), 2);
920        assert_eq!(g.nodes_by_label("Person").len(), 2);
921        assert_eq!(g.nodes_by_label("Employee").len(), 1);
922        assert!(g.node_has_label(a.id, "Person"));
923        assert_eq!(
924            g.node_property(a.id, "name"),
925            Some(PropertyValue::String("Alice".into()))
926        );
927    }
928
929    #[test]
930    fn create_and_expand_relationships() {
931        let mut g = InMemoryGraph::new();
932
933        let a = g.create_node(vec!["Person".into()], Properties::new());
934        let b = g.create_node(vec!["Person".into()], Properties::new());
935        let c = g.create_node(vec!["Company".into()], Properties::new());
936
937        let r1 = g
938            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
939            .unwrap();
940        let r2 = g
941            .create_relationship(a.id, c.id, "WORKS_AT", Properties::new())
942            .unwrap();
943
944        assert_eq!(g.all_relationships().len(), 2);
945        assert_eq!(g.relationships_by_type("KNOWS").len(), 1);
946        assert_eq!(g.outgoing_relationships(a.id).len(), 2);
947        assert_eq!(g.incoming_relationships(b.id).len(), 1);
948
949        let knows = g.expand(a.id, Direction::Right, &[String::from("KNOWS")]);
950        assert_eq!(knows.len(), 1);
951        assert_eq!(knows[0].0.id, r1.id);
952        assert_eq!(knows[0].1.id, b.id);
953
954        let undirected = g.expand(a.id, Direction::Undirected, &[]);
955        assert_eq!(undirected.len(), 2);
956
957        assert_eq!(g.relationship(r2.id).unwrap().dst, c.id);
958    }
959
960    #[test]
961    fn incoming_and_outgoing_are_distinct() {
962        let mut g = InMemoryGraph::new();
963
964        let a = g.create_node(vec!["Person".into()], Properties::new());
965        let b = g.create_node(vec!["Person".into()], Properties::new());
966        let c = g.create_node(vec!["Person".into()], Properties::new());
967
968        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
969            .unwrap();
970        g.create_relationship(c.id, a.id, "LIKES", Properties::new())
971            .unwrap();
972
973        let outgoing = g.expand(a.id, Direction::Right, &[]);
974        let incoming = g.expand(a.id, Direction::Left, &[]);
975
976        assert_eq!(outgoing.len(), 1);
977        assert_eq!(incoming.len(), 1);
978        assert_eq!(outgoing[0].1.id, b.id);
979        assert_eq!(incoming[0].1.id, c.id);
980    }
981
982    #[test]
983    fn set_and_remove_properties() {
984        let mut g = InMemoryGraph::new();
985
986        let n = g.create_node(vec!["Person".into()], Properties::new());
987        assert!(g.set_node_property(n.id, "age".into(), PropertyValue::Int(42)));
988        assert_eq!(g.node_property(n.id, "age"), Some(PropertyValue::Int(42)));
989        assert!(g.remove_node_property(n.id, "age"));
990        assert_eq!(g.node_property(n.id, "age"), None);
991
992        let m = g.create_node(vec!["Person".into()], Properties::new());
993        let r = g
994            .create_relationship(n.id, m.id, "KNOWS", Properties::new())
995            .unwrap();
996
997        assert!(g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020)));
998        assert_eq!(
999            g.relationship_property(r.id, "since"),
1000            Some(PropertyValue::Int(2020))
1001        );
1002        assert!(g.remove_relationship_property(r.id, "since"));
1003        assert_eq!(g.relationship_property(r.id, "since"), None);
1004    }
1005
1006    #[test]
1007    fn delete_requires_detach() {
1008        let mut g = InMemoryGraph::new();
1009
1010        let a = g.create_node(vec!["Person".into()], Properties::new());
1011        let b = g.create_node(vec!["Person".into()], Properties::new());
1012        let r = g
1013            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1014            .unwrap();
1015
1016        assert!(!g.delete_node(a.id));
1017        assert!(g.delete_relationship(r.id));
1018        assert!(g.delete_node(a.id));
1019        assert!(g.node(a.id).is_none());
1020    }
1021
1022    #[test]
1023    fn detach_delete_removes_incident_relationships() {
1024        let mut g = InMemoryGraph::new();
1025
1026        let a = g.create_node(vec!["Person".into()], Properties::new());
1027        let b = g.create_node(vec!["Person".into()], Properties::new());
1028        let c = g.create_node(vec!["Person".into()], Properties::new());
1029
1030        let r1 = g
1031            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1032            .unwrap();
1033        let r2 = g
1034            .create_relationship(c.id, a.id, "LIKES", Properties::new())
1035            .unwrap();
1036
1037        assert!(g.detach_delete_node(a.id));
1038        assert!(g.node(a.id).is_none());
1039        assert!(g.relationship(r1.id).is_none());
1040        assert!(g.relationship(r2.id).is_none());
1041        assert_eq!(g.all_relationships().len(), 0);
1042    }
1043
1044    #[test]
1045    fn duplicate_labels_are_normalized_on_create() {
1046        let mut g = InMemoryGraph::new();
1047
1048        let n = g.create_node(
1049            vec!["Person".into(), "Person".into(), "Admin".into()],
1050            Properties::new(),
1051        );
1052
1053        assert_eq!(n.labels, vec!["Person".to_string(), "Admin".to_string()]);
1054        assert_eq!(g.nodes_by_label("Person").len(), 1);
1055        assert_eq!(g.nodes_by_label("Admin").len(), 1);
1056    }
1057
1058    #[test]
1059    fn empty_labels_are_ignored() {
1060        let mut g = InMemoryGraph::new();
1061
1062        let n = g.create_node(
1063            vec!["Person".into(), "".into(), "   ".into()],
1064            Properties::new(),
1065        );
1066
1067        assert_eq!(n.labels, vec!["Person".to_string()]);
1068    }
1069
1070    #[test]
1071    fn empty_relationship_type_is_rejected() {
1072        let mut g = InMemoryGraph::new();
1073
1074        let a = g.create_node(vec!["A".into()], Properties::new());
1075        let b = g.create_node(vec!["B".into()], Properties::new());
1076
1077        assert!(g
1078            .create_relationship(a.id, b.id, "", Properties::new())
1079            .is_none());
1080    }
1081
1082    #[test]
1083    fn storage_schema_helpers_work() {
1084        let mut g = InMemoryGraph::new();
1085
1086        let a = g.create_node(
1087            vec!["Person".into()],
1088            props(&[("name", PropertyValue::String("Alice".into()))]),
1089        );
1090        let b = g.create_node(
1091            vec!["Company".into()],
1092            props(&[("title", PropertyValue::String("Acme".into()))]),
1093        );
1094
1095        g.create_relationship(
1096            a.id,
1097            b.id,
1098            "WORKS_AT",
1099            props(&[("since", PropertyValue::Int(2020))]),
1100        )
1101        .unwrap();
1102
1103        assert!(g.has_label_name("Person"));
1104        assert!(g.has_relationship_type_name("WORKS_AT"));
1105        assert!(g.has_property_key("name"));
1106        assert!(g.has_property_key("since"));
1107        assert!(g.label_has_property_key("Person", "name"));
1108        assert!(g.rel_type_has_property_key("WORKS_AT", "since"));
1109    }
1110
1111    #[test]
1112    fn clear_resets_the_graph() {
1113        let mut g = InMemoryGraph::new();
1114        let a = g.create_node(vec!["Person".into()], Properties::new());
1115        let b = g.create_node(vec!["Person".into()], Properties::new());
1116        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
1117            .unwrap();
1118
1119        assert_eq!(g.node_count(), 2);
1120        assert_eq!(g.relationship_count(), 1);
1121
1122        g.clear();
1123
1124        assert_eq!(g.node_count(), 0);
1125        assert_eq!(g.relationship_count(), 0);
1126        assert_eq!(g.all_labels().len(), 0);
1127    }
1128
1129    #[test]
1130    fn snapshot_roundtrip_preserves_graph_state() {
1131        let mut original = InMemoryGraph::new();
1132        let a = original.create_node(
1133            vec!["Person".into()],
1134            props(&[("name", PropertyValue::String("Alice".into()))]),
1135        );
1136        let b = original.create_node(
1137            vec!["Person".into()],
1138            props(&[("name", PropertyValue::String("Bob".into()))]),
1139        );
1140        let r = original
1141            .create_relationship(
1142                a.id,
1143                b.id,
1144                "KNOWS",
1145                props(&[("since", PropertyValue::Int(2020))]),
1146            )
1147            .unwrap();
1148
1149        let mut buf = Vec::new();
1150        let save_meta = original.save_snapshot(&mut buf).unwrap();
1151        assert_eq!(save_meta.node_count, 2);
1152        assert_eq!(save_meta.relationship_count, 1);
1153        assert_eq!(save_meta.wal_lsn, None);
1154
1155        let mut restored = InMemoryGraph::new();
1156        let load_meta = restored.load_snapshot(&buf[..]).unwrap();
1157        assert_eq!(load_meta, save_meta);
1158
1159        assert_eq!(restored.node_count(), 2);
1160        assert_eq!(restored.relationship_count(), 1);
1161        assert_eq!(
1162            restored.node_property(a.id, "name"),
1163            Some(PropertyValue::String("Alice".into()))
1164        );
1165        assert_eq!(
1166            restored.relationship_property(r.id, "since"),
1167            Some(PropertyValue::Int(2020))
1168        );
1169
1170        // Adjacency + label index were rebuilt on load.
1171        assert_eq!(restored.outgoing_relationships(a.id).len(), 1);
1172        assert_eq!(restored.nodes_by_label("Person").len(), 2);
1173
1174        // Counters carry over so new IDs don't collide with pre-snapshot IDs.
1175        let c = restored.create_node(vec!["Person".into()], Properties::new());
1176        assert_eq!(c.id, b.id + 1);
1177    }
1178
1179    #[test]
1180    fn mutation_recorder_observes_every_committed_mutation() {
1181        use std::sync::Mutex;
1182
1183        #[derive(Default)]
1184        struct CapturingRecorder {
1185            events: Mutex<Vec<MutationEvent>>,
1186        }
1187
1188        impl MutationRecorder for CapturingRecorder {
1189            fn record(&self, event: &MutationEvent) {
1190                self.events.lock().unwrap().push(event.clone());
1191            }
1192        }
1193
1194        let recorder = Arc::new(CapturingRecorder::default());
1195        let mut g = InMemoryGraph::new();
1196        g.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
1197
1198        let a = g.create_node(vec!["Person".into()], Properties::new());
1199        let b = g.create_node(vec!["Person".into()], Properties::new());
1200        let r = g
1201            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1202            .unwrap();
1203        g.set_node_property(a.id, "name".into(), PropertyValue::String("Alice".into()));
1204        g.remove_node_property(a.id, "name");
1205        g.add_node_label(a.id, "Admin");
1206        g.remove_node_label(a.id, "Admin");
1207        g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020));
1208        g.remove_relationship_property(r.id, "since");
1209        g.detach_delete_node(a.id);
1210        g.clear();
1211
1212        let events = recorder.events.lock().unwrap().clone();
1213        assert!(matches!(events[0], MutationEvent::CreateNode { .. }));
1214        assert!(matches!(events[1], MutationEvent::CreateNode { .. }));
1215        assert!(matches!(
1216            events[2],
1217            MutationEvent::CreateRelationship { .. }
1218        ));
1219        assert!(matches!(events[3], MutationEvent::SetNodeProperty { .. }));
1220        assert!(matches!(
1221            events[4],
1222            MutationEvent::RemoveNodeProperty { .. }
1223        ));
1224        assert!(matches!(events[5], MutationEvent::AddNodeLabel { .. }));
1225        assert!(matches!(events[6], MutationEvent::RemoveNodeLabel { .. }));
1226        assert!(matches!(
1227            events[7],
1228            MutationEvent::SetRelationshipProperty { .. }
1229        ));
1230        assert!(matches!(
1231            events[8],
1232            MutationEvent::RemoveRelationshipProperty { .. }
1233        ));
1234        // detach_delete_node composes three kinds of events: one
1235        // DeleteRelationship per incident edge, one DeleteNode for the node
1236        // itself, and a final DetachDeleteNode marker. A WAL replayer can
1237        // either apply every step or recognise the marker and skip forward.
1238        assert!(matches!(
1239            events[9],
1240            MutationEvent::DeleteRelationship { .. }
1241        ));
1242        assert!(matches!(events[10], MutationEvent::DeleteNode { .. }));
1243        assert!(matches!(events[11], MutationEvent::DetachDeleteNode { .. }));
1244        assert!(matches!(events.last(), Some(MutationEvent::Clear)));
1245
1246        // Failed mutations (invalid id) do not emit events.
1247        let before = recorder.events.lock().unwrap().len();
1248        assert!(!g.set_node_property(9999, "x".into(), PropertyValue::Int(0)));
1249        assert_eq!(recorder.events.lock().unwrap().len(), before);
1250    }
1251
1252    #[test]
1253    fn snapshot_load_resets_but_keeps_recorder() {
1254        use std::sync::Mutex;
1255
1256        struct CountingRecorder(Mutex<usize>);
1257        impl MutationRecorder for CountingRecorder {
1258            fn record(&self, _: &MutationEvent) {
1259                *self.0.lock().unwrap() += 1;
1260            }
1261        }
1262
1263        let counter: Arc<dyn MutationRecorder> = Arc::new(CountingRecorder(Mutex::new(0)));
1264        let mut g = InMemoryGraph::new();
1265        g.set_mutation_recorder(Some(counter));
1266        g.create_node(vec!["A".into()], Properties::new());
1267
1268        let mut buf = Vec::new();
1269        g.save_snapshot(&mut buf).unwrap();
1270
1271        // Load into the same graph — recorder should survive, store state
1272        // should be replaced by the snapshot contents.
1273        g.load_snapshot(&buf[..]).unwrap();
1274        assert!(g.mutation_recorder().is_some());
1275        assert_eq!(g.node_count(), 1);
1276
1277        // Subsequent mutations still feed the recorder.
1278        g.create_node(vec!["B".into()], Properties::new());
1279        // 1 for the initial A + 1 for the post-load B. The restore path
1280        // itself does not emit events (that's a snapshot, not a mutation).
1281    }
1282}