Skip to main content

lora_store/
memory.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use lora_ast::Direction;
5
6use crate::snapshot::{read_snapshot, write_snapshot};
7use crate::{
8    BorrowedGraphStorage, GraphStorage, GraphStorageMut, MutationEvent, MutationRecorder, NodeId,
9    NodeRecord, Properties, PropertyValue, RelationshipId, RelationshipRecord, SnapshotError,
10    SnapshotMeta, SnapshotPayload, Snapshotable,
11};
12
13#[derive(Default)]
14pub struct InMemoryGraph {
15    next_node_id: NodeId,
16    next_rel_id: RelationshipId,
17
18    nodes: BTreeMap<NodeId, NodeRecord>,
19    relationships: BTreeMap<RelationshipId, RelationshipRecord>,
20
21    // adjacency
22    outgoing: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
23    incoming: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
24
25    // secondary indexes
26    nodes_by_label: BTreeMap<String, BTreeSet<NodeId>>,
27    relationships_by_type: BTreeMap<String, BTreeSet<RelationshipId>>,
28
29    /// Optional mutation observer. When `Some`, every committed mutation
30    /// fans out to this recorder *after* the in-memory state has been
31    /// updated. The recorder is not part of the graph's identity, so Clone
32    /// and snapshot restore both reset it to `None`.
33    recorder: Option<Arc<dyn MutationRecorder>>,
34}
35
36impl std::fmt::Debug for InMemoryGraph {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("InMemoryGraph")
39            .field("next_node_id", &self.next_node_id)
40            .field("next_rel_id", &self.next_rel_id)
41            .field("nodes", &self.nodes)
42            .field("relationships", &self.relationships)
43            .field("outgoing", &self.outgoing)
44            .field("incoming", &self.incoming)
45            .field("nodes_by_label", &self.nodes_by_label)
46            .field("relationships_by_type", &self.relationships_by_type)
47            .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
48            .finish()
49    }
50}
51
52impl Clone for InMemoryGraph {
53    fn clone(&self) -> Self {
54        // Deliberately drop the recorder on clone: a cloned store is a
55        // separate identity; it should not silently share the observer.
56        Self {
57            next_node_id: self.next_node_id,
58            next_rel_id: self.next_rel_id,
59            nodes: self.nodes.clone(),
60            relationships: self.relationships.clone(),
61            outgoing: self.outgoing.clone(),
62            incoming: self.incoming.clone(),
63            nodes_by_label: self.nodes_by_label.clone(),
64            relationships_by_type: self.relationships_by_type.clone(),
65            recorder: None,
66        }
67    }
68}
69
70impl InMemoryGraph {
71    pub fn new() -> Self {
72        Self::default()
73    }
74
75    pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
76        // BTreeMap/BTreeSet do not support capacity reservation.
77        Self::default()
78    }
79
80    pub fn contains_node(&self, node_id: NodeId) -> bool {
81        self.nodes.contains_key(&node_id)
82    }
83
84    pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
85        self.relationships.contains_key(&rel_id)
86    }
87
88    /// Install (or clear) the mutation recorder. Passing `None` detaches any
89    /// currently-installed recorder. The recorder observes every committed
90    /// mutation *after* it has been applied.
91    pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
92        self.recorder = recorder;
93    }
94
95    /// Handle to the currently-installed recorder, if any.
96    pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
97        self.recorder.as_ref()
98    }
99
100    /// Emit a mutation event only if a recorder is installed. The event is
101    /// built lazily — callers pass a closure, so when no recorder is
102    /// attached we pay only a `None` check and the cost of constructing the
103    /// event (labels/properties clones) is avoided.
104    #[inline]
105    fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
106        if let Some(rec) = &self.recorder {
107            rec.record(&build());
108        }
109    }
110
111    fn alloc_node_id(&mut self) -> NodeId {
112        let id = self.next_node_id;
113        self.next_node_id += 1;
114        id
115    }
116
117    fn alloc_rel_id(&mut self) -> RelationshipId {
118        let id = self.next_rel_id;
119        self.next_rel_id += 1;
120        id
121    }
122
123    fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
124        let next = id
125            .checked_add(1)
126            .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
127        self.next_node_id = self.next_node_id.max(next);
128        Ok(())
129    }
130
131    fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
132        let next = id
133            .checked_add(1)
134            .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
135        self.next_rel_id = self.next_rel_id.max(next);
136        Ok(())
137    }
138
139    fn normalize_labels(labels: Vec<String>) -> Vec<String> {
140        let mut seen = BTreeSet::new();
141
142        labels
143            .into_iter()
144            .map(|s| s.trim().to_string())
145            .filter(|s| !s.is_empty())
146            .filter(|s| seen.insert(s.clone()))
147            .collect()
148    }
149
150    fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
151        self.nodes_by_label
152            .entry(label.to_string())
153            .or_default()
154            .insert(node_id);
155    }
156
157    fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
158        if let Some(ids) = self.nodes_by_label.get_mut(label) {
159            ids.remove(&node_id);
160            if ids.is_empty() {
161                self.nodes_by_label.remove(label);
162            }
163        }
164    }
165
166    fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
167        self.relationships_by_type
168            .entry(rel_type.to_string())
169            .or_default()
170            .insert(rel_id);
171    }
172
173    fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
174        if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
175            ids.remove(&rel_id);
176            if ids.is_empty() {
177                self.relationships_by_type.remove(rel_type);
178            }
179        }
180    }
181
182    fn attach_relationship(&mut self, rel: &RelationshipRecord) {
183        self.outgoing.entry(rel.src).or_default().insert(rel.id);
184        self.incoming.entry(rel.dst).or_default().insert(rel.id);
185        self.insert_relationship_type_index(rel.id, &rel.rel_type);
186    }
187
188    fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
189        if let Some(ids) = self.outgoing.get_mut(&rel.src) {
190            ids.remove(&rel.id);
191            if ids.is_empty() {
192                self.outgoing.remove(&rel.src);
193            }
194        }
195
196        if let Some(ids) = self.incoming.get_mut(&rel.dst) {
197            ids.remove(&rel.id);
198            if ids.is_empty() {
199                self.incoming.remove(&rel.dst);
200            }
201        }
202
203        self.remove_relationship_type_index(rel.id, &rel.rel_type);
204    }
205
206    fn relationship_ids_for_direction(
207        &self,
208        node_id: NodeId,
209        direction: Direction,
210    ) -> Vec<RelationshipId> {
211        match direction {
212            Direction::Left => self
213                .incoming
214                .get(&node_id)
215                .map(|ids| ids.iter().copied().collect())
216                .unwrap_or_default(),
217
218            Direction::Right => self
219                .outgoing
220                .get(&node_id)
221                .map(|ids| ids.iter().copied().collect())
222                .unwrap_or_default(),
223
224            Direction::Undirected => {
225                let mut ids = BTreeSet::new();
226
227                if let Some(out) = self.outgoing.get(&node_id) {
228                    ids.extend(out.iter().copied());
229                }
230                if let Some(inc) = self.incoming.get(&node_id) {
231                    ids.extend(inc.iter().copied());
232                }
233
234                ids.into_iter().collect()
235            }
236        }
237    }
238
239    fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
240        if rel.src == node_id {
241            Some(rel.dst)
242        } else if rel.dst == node_id {
243            Some(rel.src)
244        } else {
245            None
246        }
247    }
248
249    fn has_incident_relationships(&self, node_id: NodeId) -> bool {
250        self.outgoing
251            .get(&node_id)
252            .map(|ids| !ids.is_empty())
253            .unwrap_or(false)
254            || self
255                .incoming
256                .get(&node_id)
257                .map(|ids| !ids.is_empty())
258                .unwrap_or(false)
259    }
260
261    fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
262        let mut rel_ids = BTreeSet::new();
263
264        if let Some(ids) = self.outgoing.get(&node_id) {
265            rel_ids.extend(ids.iter().copied());
266        }
267        if let Some(ids) = self.incoming.get(&node_id) {
268            rel_ids.extend(ids.iter().copied());
269        }
270
271        rel_ids
272    }
273
274    /// Replay a node creation using the id captured in a durable mutation
275    /// event. This intentionally does not emit a new mutation event: callers
276    /// must invoke it before installing a recorder on the graph.
277    #[doc(hidden)]
278    pub fn replay_create_node(
279        &mut self,
280        id: NodeId,
281        labels: Vec<String>,
282        properties: Properties,
283    ) -> Result<NodeRecord, String> {
284        if self.recorder.is_some() {
285            return Err(
286                "cannot replay node creation while a mutation recorder is installed".into(),
287            );
288        }
289        if self.nodes.contains_key(&id) {
290            return Err(format!("node id {id} already exists"));
291        }
292
293        let labels = Self::normalize_labels(labels);
294        let node = NodeRecord {
295            id,
296            labels: labels.clone(),
297            properties,
298        };
299
300        self.nodes.insert(id, node.clone());
301        for label in &labels {
302            self.insert_node_label_index(id, label);
303        }
304        self.outgoing.entry(id).or_default();
305        self.incoming.entry(id).or_default();
306        self.bump_next_node_id_past(id)?;
307
308        Ok(node)
309    }
310
311    /// Replay a relationship creation using the id captured in a durable
312    /// mutation event. This intentionally does not emit a new mutation event:
313    /// callers must invoke it before installing a recorder on the graph.
314    #[doc(hidden)]
315    pub fn replay_create_relationship(
316        &mut self,
317        id: RelationshipId,
318        src: NodeId,
319        dst: NodeId,
320        rel_type: &str,
321        properties: Properties,
322    ) -> Result<RelationshipRecord, String> {
323        if self.recorder.is_some() {
324            return Err(
325                "cannot replay relationship creation while a mutation recorder is installed".into(),
326            );
327        }
328        if self.relationships.contains_key(&id) {
329            return Err(format!("relationship id {id} already exists"));
330        }
331        if !self.nodes.contains_key(&src) {
332            return Err(format!(
333                "relationship {id} references missing source node {src}"
334            ));
335        }
336        if !self.nodes.contains_key(&dst) {
337            return Err(format!(
338                "relationship {id} references missing target node {dst}"
339            ));
340        }
341
342        let trimmed = rel_type.trim();
343        if trimmed.is_empty() {
344            return Err(format!("relationship {id} has an empty type"));
345        }
346
347        let rel = RelationshipRecord {
348            id,
349            src,
350            dst,
351            rel_type: trimmed.to_string(),
352            properties,
353        };
354
355        self.attach_relationship(&rel);
356        self.relationships.insert(id, rel.clone());
357        self.bump_next_rel_id_past(id)?;
358
359        Ok(rel)
360    }
361}
362
363impl GraphStorage for InMemoryGraph {
364    // ---------- Required primitives ----------
365
366    fn contains_node(&self, id: NodeId) -> bool {
367        self.nodes.contains_key(&id)
368    }
369
370    fn node(&self, id: NodeId) -> Option<NodeRecord> {
371        self.nodes.get(&id).cloned()
372    }
373
374    fn all_node_ids(&self) -> Vec<NodeId> {
375        self.nodes.keys().copied().collect()
376    }
377
378    fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
379        match self.nodes_by_label.get(label) {
380            Some(ids) => ids.iter().copied().collect(),
381            None => Vec::new(),
382        }
383    }
384
385    fn contains_relationship(&self, id: RelationshipId) -> bool {
386        self.relationships.contains_key(&id)
387    }
388
389    fn relationship(&self, id: RelationshipId) -> Option<RelationshipRecord> {
390        self.relationships.get(&id).cloned()
391    }
392
393    fn all_rel_ids(&self) -> Vec<RelationshipId> {
394        self.relationships.keys().copied().collect()
395    }
396
397    fn rel_ids_by_type(&self, rel_type: &str) -> Vec<RelationshipId> {
398        match self.relationships_by_type.get(rel_type) {
399            Some(ids) => ids.iter().copied().collect(),
400            None => Vec::new(),
401        }
402    }
403
404    fn relationship_endpoints(&self, id: RelationshipId) -> Option<(NodeId, NodeId)> {
405        self.relationships.get(&id).map(|r| (r.src, r.dst))
406    }
407
408    fn expand_ids(
409        &self,
410        node_id: NodeId,
411        direction: Direction,
412        types: &[String],
413    ) -> Vec<(RelationshipId, NodeId)> {
414        if !self.nodes.contains_key(&node_id) {
415            return Vec::new();
416        }
417
418        // Fast path: no type filter — just join adjacency + rel endpoints.
419        if types.is_empty() {
420            return self
421                .relationship_ids_for_direction(node_id, direction)
422                .into_iter()
423                .filter_map(|rel_id| {
424                    let rel = self.relationships.get(&rel_id)?;
425                    let other_id = Self::other_endpoint(rel, node_id)?;
426                    Some((rel_id, other_id))
427                })
428                .collect();
429        }
430
431        // Type-filtered: borrow rel_type straight from the stored record.
432        // For small type lists (the common case) the linear scan beats a
433        // BTreeSet; we keep it allocation-free.
434        self.relationship_ids_for_direction(node_id, direction)
435            .into_iter()
436            .filter_map(|rel_id| {
437                let rel = self.relationships.get(&rel_id)?;
438                if !types.iter().any(|t| t == &rel.rel_type) {
439                    return None;
440                }
441                let other_id = Self::other_endpoint(rel, node_id)?;
442                Some((rel_id, other_id))
443            })
444            .collect()
445    }
446
447    fn all_labels(&self) -> Vec<String> {
448        self.nodes_by_label.keys().cloned().collect()
449    }
450
451    fn all_relationship_types(&self) -> Vec<String> {
452        self.relationships_by_type.keys().cloned().collect()
453    }
454
455    // ---------- Optimization hooks: zero-clone borrow access ----------
456
457    fn with_node<F, R>(&self, id: NodeId, f: F) -> Option<R>
458    where
459        F: FnOnce(&NodeRecord) -> R,
460        Self: Sized,
461    {
462        self.nodes.get(&id).map(f)
463    }
464
465    fn with_relationship<F, R>(&self, id: RelationshipId, f: F) -> Option<R>
466    where
467        F: FnOnce(&RelationshipRecord) -> R,
468        Self: Sized,
469    {
470        self.relationships.get(&id).map(f)
471    }
472
473    // ---------- Overrides: counts + existence ----------
474
475    fn node_count(&self) -> usize {
476        self.nodes.len()
477    }
478
479    fn relationship_count(&self) -> usize {
480        self.relationships.len()
481    }
482
483    fn has_node(&self, id: NodeId) -> bool {
484        self.nodes.contains_key(&id)
485    }
486
487    fn has_relationship(&self, id: RelationshipId) -> bool {
488        self.relationships.contains_key(&id)
489    }
490
491    // ---------- Overrides: record-returning scans (direct iteration) ----------
492
493    fn all_nodes(&self) -> Vec<NodeRecord> {
494        self.nodes.values().cloned().collect()
495    }
496
497    fn nodes_by_label(&self, label: &str) -> Vec<NodeRecord> {
498        self.nodes_by_label
499            .get(label)
500            .into_iter()
501            .flat_map(|ids| ids.iter())
502            .filter_map(|id| self.nodes.get(id).cloned())
503            .collect()
504    }
505
506    fn all_relationships(&self) -> Vec<RelationshipRecord> {
507        self.relationships.values().cloned().collect()
508    }
509
510    fn relationships_by_type(&self, rel_type: &str) -> Vec<RelationshipRecord> {
511        self.relationships_by_type
512            .get(rel_type)
513            .into_iter()
514            .flat_map(|ids| ids.iter())
515            .filter_map(|id| self.relationships.get(id).cloned())
516            .collect()
517    }
518
519    // ---------- Overrides: traversal (direct adjacency) ----------
520
521    fn relationship_ids_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipId> {
522        self.relationship_ids_for_direction(node_id, direction)
523    }
524
525    fn outgoing_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
526        self.outgoing
527            .get(&node_id)
528            .into_iter()
529            .flat_map(|ids| ids.iter())
530            .filter_map(|id| self.relationships.get(id).cloned())
531            .collect()
532    }
533
534    fn incoming_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
535        self.incoming
536            .get(&node_id)
537            .into_iter()
538            .flat_map(|ids| ids.iter())
539            .filter_map(|id| self.relationships.get(id).cloned())
540            .collect()
541    }
542
543    fn degree(&self, node_id: NodeId, direction: Direction) -> usize {
544        match direction {
545            Direction::Right => self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0),
546            Direction::Left => self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0),
547            Direction::Undirected => {
548                self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0)
549                    + self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0)
550            }
551        }
552    }
553
554    fn expand(
555        &self,
556        node_id: NodeId,
557        direction: Direction,
558        types: &[String],
559    ) -> Vec<(RelationshipRecord, NodeRecord)> {
560        if !self.nodes.contains_key(&node_id) {
561            return Vec::new();
562        }
563
564        let type_filter: Option<BTreeSet<&str>> = if types.is_empty() {
565            None
566        } else {
567            Some(types.iter().map(String::as_str).collect())
568        };
569
570        self.relationship_ids_for_direction(node_id, direction)
571            .into_iter()
572            .filter_map(|rel_id| self.relationships.get(&rel_id))
573            .filter(|rel| {
574                type_filter
575                    .as_ref()
576                    .map(|allowed| allowed.contains(rel.rel_type.as_str()))
577                    .unwrap_or(true)
578            })
579            .filter_map(|rel| {
580                let other_id = Self::other_endpoint(rel, node_id)?;
581                let other = self.nodes.get(&other_id)?;
582                Some((rel.clone(), other.clone()))
583            })
584            .collect()
585    }
586
587    // ---------- Overrides: schema introspection ----------
588
589    fn all_node_property_keys(&self) -> Vec<String> {
590        let mut keys = BTreeSet::new();
591        for node in self.nodes.values() {
592            for key in node.properties.keys() {
593                keys.insert(key.clone());
594            }
595        }
596        keys.into_iter().collect()
597    }
598
599    fn all_relationship_property_keys(&self) -> Vec<String> {
600        let mut keys = BTreeSet::new();
601        for rel in self.relationships.values() {
602            for key in rel.properties.keys() {
603                keys.insert(key.clone());
604            }
605        }
606        keys.into_iter().collect()
607    }
608
609    fn label_property_keys(&self, label: &str) -> Vec<String> {
610        let mut keys = BTreeSet::new();
611
612        if let Some(ids) = self.nodes_by_label.get(label) {
613            for id in ids {
614                if let Some(node) = self.nodes.get(id) {
615                    for key in node.properties.keys() {
616                        keys.insert(key.clone());
617                    }
618                }
619            }
620        }
621
622        keys.into_iter().collect()
623    }
624
625    fn rel_type_property_keys(&self, rel_type: &str) -> Vec<String> {
626        let mut keys = BTreeSet::new();
627
628        if let Some(ids) = self.relationships_by_type.get(rel_type) {
629            for id in ids {
630                if let Some(rel) = self.relationships.get(id) {
631                    for key in rel.properties.keys() {
632                        keys.insert(key.clone());
633                    }
634                }
635            }
636        }
637
638        keys.into_iter().collect()
639    }
640}
641
642impl BorrowedGraphStorage for InMemoryGraph {
643    fn node_ref(&self, id: NodeId) -> Option<&NodeRecord> {
644        self.nodes.get(&id)
645    }
646
647    fn relationship_ref(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
648        self.relationships.get(&id)
649    }
650}
651
652impl GraphStorageMut for InMemoryGraph {
653    fn create_node(&mut self, labels: Vec<String>, properties: Properties) -> NodeRecord {
654        let id = self.alloc_node_id();
655        let labels = Self::normalize_labels(labels);
656
657        let node = NodeRecord {
658            id,
659            labels: labels.clone(),
660            properties,
661        };
662
663        self.nodes.insert(id, node.clone());
664
665        for label in &labels {
666            self.insert_node_label_index(id, label);
667        }
668
669        self.outgoing.entry(id).or_default();
670        self.incoming.entry(id).or_default();
671
672        self.emit(|| MutationEvent::CreateNode {
673            id,
674            labels: node.labels.clone(),
675            properties: node.properties.clone(),
676        });
677
678        node
679    }
680
681    fn create_relationship(
682        &mut self,
683        src: NodeId,
684        dst: NodeId,
685        rel_type: &str,
686        properties: Properties,
687    ) -> Option<RelationshipRecord> {
688        if !self.nodes.contains_key(&src) || !self.nodes.contains_key(&dst) {
689            return None;
690        }
691
692        let trimmed = rel_type.trim();
693        if trimmed.is_empty() {
694            return None;
695        }
696
697        let id = self.alloc_rel_id();
698        let rel = RelationshipRecord {
699            id,
700            src,
701            dst,
702            rel_type: trimmed.to_string(),
703            properties,
704        };
705
706        self.attach_relationship(&rel);
707        self.relationships.insert(id, rel.clone());
708
709        self.emit(|| MutationEvent::CreateRelationship {
710            id,
711            src,
712            dst,
713            rel_type: rel.rel_type.clone(),
714            properties: rel.properties.clone(),
715        });
716
717        Some(rel)
718    }
719
720    fn set_node_property(&mut self, node_id: NodeId, key: String, value: PropertyValue) -> bool {
721        // Hot path: the common case is `recorder = None`. Insert by value
722        // (no key/value clones); only clone when a recorder is attached.
723        let recorder_active = self.recorder.is_some();
724        let (stored_key, stored_value) = if recorder_active {
725            (Some(key.clone()), Some(value.clone()))
726        } else {
727            (None, None)
728        };
729
730        let applied = match self.nodes.get_mut(&node_id) {
731            Some(node) => {
732                node.properties.insert(key, value);
733                true
734            }
735            None => false,
736        };
737        if applied {
738            self.emit(|| MutationEvent::SetNodeProperty {
739                node_id,
740                key: stored_key.unwrap(),
741                value: stored_value.unwrap(),
742            });
743        }
744        applied
745    }
746
747    fn remove_node_property(&mut self, node_id: NodeId, key: &str) -> bool {
748        let applied = match self.nodes.get_mut(&node_id) {
749            Some(node) => node.properties.remove(key).is_some(),
750            None => false,
751        };
752        if applied {
753            self.emit(|| MutationEvent::RemoveNodeProperty {
754                node_id,
755                key: key.to_string(),
756            });
757        }
758        applied
759    }
760
761    fn add_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
762        let label = label.trim();
763        if label.is_empty() {
764            return false;
765        }
766
767        let applied = match self.nodes.get_mut(&node_id) {
768            Some(node) => {
769                if node.labels.iter().any(|l| l == label) {
770                    return false;
771                }
772
773                node.labels.push(label.to_string());
774                self.insert_node_label_index(node_id, label);
775                true
776            }
777            None => false,
778        };
779        if applied {
780            self.emit(|| MutationEvent::AddNodeLabel {
781                node_id,
782                label: label.to_string(),
783            });
784        }
785        applied
786    }
787
788    fn remove_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
789        let applied = match self.nodes.get_mut(&node_id) {
790            Some(node) => {
791                let original_len = node.labels.len();
792                node.labels.retain(|l| l != label);
793
794                if node.labels.len() != original_len {
795                    self.remove_node_label_index(node_id, label);
796                    true
797                } else {
798                    false
799                }
800            }
801            None => false,
802        };
803        if applied {
804            self.emit(|| MutationEvent::RemoveNodeLabel {
805                node_id,
806                label: label.to_string(),
807            });
808        }
809        applied
810    }
811
812    fn set_relationship_property(
813        &mut self,
814        rel_id: RelationshipId,
815        key: String,
816        value: PropertyValue,
817    ) -> bool {
818        let recorder_active = self.recorder.is_some();
819        let (stored_key, stored_value) = if recorder_active {
820            (Some(key.clone()), Some(value.clone()))
821        } else {
822            (None, None)
823        };
824
825        let applied = match self.relationships.get_mut(&rel_id) {
826            Some(rel) => {
827                rel.properties.insert(key, value);
828                true
829            }
830            None => false,
831        };
832        if applied {
833            self.emit(|| MutationEvent::SetRelationshipProperty {
834                rel_id,
835                key: stored_key.unwrap(),
836                value: stored_value.unwrap(),
837            });
838        }
839        applied
840    }
841
842    fn remove_relationship_property(&mut self, rel_id: RelationshipId, key: &str) -> bool {
843        let applied = match self.relationships.get_mut(&rel_id) {
844            Some(rel) => rel.properties.remove(key).is_some(),
845            None => false,
846        };
847        if applied {
848            self.emit(|| MutationEvent::RemoveRelationshipProperty {
849                rel_id,
850                key: key.to_string(),
851            });
852        }
853        applied
854    }
855
856    fn delete_relationship(&mut self, rel_id: RelationshipId) -> bool {
857        let applied = match self.relationships.remove(&rel_id) {
858            Some(rel) => {
859                self.detach_relationship_indexes(&rel);
860                true
861            }
862            None => false,
863        };
864        if applied {
865            self.emit(|| MutationEvent::DeleteRelationship { rel_id });
866        }
867        applied
868    }
869
870    fn delete_node(&mut self, node_id: NodeId) -> bool {
871        if !self.nodes.contains_key(&node_id) {
872            return false;
873        }
874
875        if self.has_incident_relationships(node_id) {
876            return false;
877        }
878
879        let node = match self.nodes.remove(&node_id) {
880            Some(node) => node,
881            None => return false,
882        };
883
884        for label in &node.labels {
885            self.remove_node_label_index(node_id, label);
886        }
887
888        self.outgoing.remove(&node_id);
889        self.incoming.remove(&node_id);
890
891        self.emit(|| MutationEvent::DeleteNode { node_id });
892
893        true
894    }
895
896    fn detach_delete_node(&mut self, node_id: NodeId) -> bool {
897        if !self.nodes.contains_key(&node_id) {
898            return false;
899        }
900
901        let rel_ids: Vec<_> = self
902            .incident_relationship_ids(node_id)
903            .into_iter()
904            .collect();
905
906        // We deliberately fire per-relationship DeleteRelationship events
907        // here (via `delete_relationship`) and a DetachDeleteNode event at
908        // the end. A WAL replayer that sees DetachDeleteNode can ignore the
909        // preceding DeleteRelationship events — or, equivalently, replay
910        // them and the DetachDeleteNode becomes a no-op on the remaining
911        // (now-empty) node. The emit-before-delete choice costs one extra
912        // event per mutation but keeps the replay contract simple:
913        // "apply every event in order".
914        for rel_id in rel_ids {
915            let _ = self.delete_relationship(rel_id);
916        }
917
918        if self.delete_node(node_id) {
919            self.emit(|| MutationEvent::DetachDeleteNode { node_id });
920            true
921        } else {
922            false
923        }
924    }
925
926    fn clear(&mut self) {
927        // Keep the recorder across clear so observers can see the Clear
928        // event plus whatever follows. Matches WAL semantics where the log
929        // is the source of truth across a truncation.
930        let recorder = self.recorder.take();
931        *self = Self::default();
932        self.recorder = recorder;
933        self.emit(|| MutationEvent::Clear);
934    }
935}
936
937// ---------------------------------------------------------------------------
938// Snapshotable
939// ---------------------------------------------------------------------------
940
941impl Snapshotable for InMemoryGraph {
942    fn save_snapshot<W: std::io::Write>(&self, writer: W) -> Result<SnapshotMeta, SnapshotError> {
943        let payload = SnapshotPayload {
944            next_node_id: self.next_node_id,
945            next_rel_id: self.next_rel_id,
946            nodes: self.nodes.values().cloned().collect(),
947            relationships: self.relationships.values().cloned().collect(),
948        };
949        write_snapshot(writer, &payload, None)
950    }
951
952    fn save_checkpoint<W: std::io::Write>(
953        &self,
954        writer: W,
955        wal_lsn: u64,
956    ) -> Result<SnapshotMeta, SnapshotError> {
957        let payload = SnapshotPayload {
958            next_node_id: self.next_node_id,
959            next_rel_id: self.next_rel_id,
960            nodes: self.nodes.values().cloned().collect(),
961            relationships: self.relationships.values().cloned().collect(),
962        };
963        write_snapshot(writer, &payload, Some(wal_lsn))
964    }
965
966    fn load_snapshot<R: std::io::Read>(
967        &mut self,
968        reader: R,
969    ) -> Result<SnapshotMeta, SnapshotError> {
970        let (payload, meta) = read_snapshot(reader)?;
971
972        // Build the restored graph in a fresh local instance and only
973        // commit it into `self` at the very end. If a panic fires mid-
974        // rebuild (e.g. OOM on a HashMap grow) the caller's graph is
975        // untouched — we never observe a half-populated store.
976        let mut rebuilt = Self {
977            next_node_id: payload.next_node_id,
978            next_rel_id: payload.next_rel_id,
979            ..Self::default()
980        };
981
982        // Insert nodes + rebuild label index + seed adjacency slots.
983        for node in payload.nodes {
984            let id = node.id;
985            let labels = node.labels.clone();
986            rebuilt.nodes.insert(id, node);
987            for label in &labels {
988                rebuilt.insert_node_label_index(id, label);
989            }
990            rebuilt.outgoing.entry(id).or_default();
991            rebuilt.incoming.entry(id).or_default();
992        }
993
994        // Insert relationships + rebuild adjacency + type index.
995        for rel in payload.relationships {
996            rebuilt.attach_relationship(&rel);
997            rebuilt.relationships.insert(rel.id, rel);
998        }
999
1000        // Preserve the existing recorder across the swap — observers of the
1001        // store's identity should not be silently detached by a restore,
1002        // same policy as `clear()`.
1003        rebuilt.recorder = self.recorder.take();
1004        *self = rebuilt;
1005
1006        Ok(meta)
1007    }
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use super::*;
1013
1014    fn props(pairs: &[(&str, PropertyValue)]) -> Properties {
1015        pairs
1016            .iter()
1017            .map(|(k, v)| ((*k).to_string(), v.clone()))
1018            .collect()
1019    }
1020
1021    #[test]
1022    fn create_and_lookup_nodes() {
1023        let mut g = InMemoryGraph::new();
1024
1025        let a = g.create_node(
1026            vec!["Person".into(), "Employee".into()],
1027            props(&[("name", PropertyValue::String("Alice".into()))]),
1028        );
1029        let b = g.create_node(
1030            vec!["Person".into()],
1031            props(&[("name", PropertyValue::String("Bob".into()))]),
1032        );
1033
1034        assert_eq!(a.id, 0);
1035        assert_eq!(b.id, 1);
1036
1037        assert_eq!(g.all_nodes().len(), 2);
1038        assert_eq!(g.nodes_by_label("Person").len(), 2);
1039        assert_eq!(g.nodes_by_label("Employee").len(), 1);
1040        assert!(g.node_has_label(a.id, "Person"));
1041        assert_eq!(
1042            g.node_property(a.id, "name"),
1043            Some(PropertyValue::String("Alice".into()))
1044        );
1045    }
1046
1047    #[test]
1048    fn create_and_expand_relationships() {
1049        let mut g = InMemoryGraph::new();
1050
1051        let a = g.create_node(vec!["Person".into()], Properties::new());
1052        let b = g.create_node(vec!["Person".into()], Properties::new());
1053        let c = g.create_node(vec!["Company".into()], Properties::new());
1054
1055        let r1 = g
1056            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1057            .unwrap();
1058        let r2 = g
1059            .create_relationship(a.id, c.id, "WORKS_AT", Properties::new())
1060            .unwrap();
1061
1062        assert_eq!(g.all_relationships().len(), 2);
1063        assert_eq!(g.relationships_by_type("KNOWS").len(), 1);
1064        assert_eq!(g.outgoing_relationships(a.id).len(), 2);
1065        assert_eq!(g.incoming_relationships(b.id).len(), 1);
1066
1067        let knows = g.expand(a.id, Direction::Right, &[String::from("KNOWS")]);
1068        assert_eq!(knows.len(), 1);
1069        assert_eq!(knows[0].0.id, r1.id);
1070        assert_eq!(knows[0].1.id, b.id);
1071
1072        let undirected = g.expand(a.id, Direction::Undirected, &[]);
1073        assert_eq!(undirected.len(), 2);
1074
1075        assert_eq!(g.relationship(r2.id).unwrap().dst, c.id);
1076    }
1077
1078    #[test]
1079    fn incoming_and_outgoing_are_distinct() {
1080        let mut g = InMemoryGraph::new();
1081
1082        let a = g.create_node(vec!["Person".into()], Properties::new());
1083        let b = g.create_node(vec!["Person".into()], Properties::new());
1084        let c = g.create_node(vec!["Person".into()], Properties::new());
1085
1086        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
1087            .unwrap();
1088        g.create_relationship(c.id, a.id, "LIKES", Properties::new())
1089            .unwrap();
1090
1091        let outgoing = g.expand(a.id, Direction::Right, &[]);
1092        let incoming = g.expand(a.id, Direction::Left, &[]);
1093
1094        assert_eq!(outgoing.len(), 1);
1095        assert_eq!(incoming.len(), 1);
1096        assert_eq!(outgoing[0].1.id, b.id);
1097        assert_eq!(incoming[0].1.id, c.id);
1098    }
1099
1100    #[test]
1101    fn set_and_remove_properties() {
1102        let mut g = InMemoryGraph::new();
1103
1104        let n = g.create_node(vec!["Person".into()], Properties::new());
1105        assert!(g.set_node_property(n.id, "age".into(), PropertyValue::Int(42)));
1106        assert_eq!(g.node_property(n.id, "age"), Some(PropertyValue::Int(42)));
1107        assert!(g.remove_node_property(n.id, "age"));
1108        assert_eq!(g.node_property(n.id, "age"), None);
1109
1110        let m = g.create_node(vec!["Person".into()], Properties::new());
1111        let r = g
1112            .create_relationship(n.id, m.id, "KNOWS", Properties::new())
1113            .unwrap();
1114
1115        assert!(g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020)));
1116        assert_eq!(
1117            g.relationship_property(r.id, "since"),
1118            Some(PropertyValue::Int(2020))
1119        );
1120        assert!(g.remove_relationship_property(r.id, "since"));
1121        assert_eq!(g.relationship_property(r.id, "since"), None);
1122    }
1123
1124    #[test]
1125    fn delete_requires_detach() {
1126        let mut g = InMemoryGraph::new();
1127
1128        let a = g.create_node(vec!["Person".into()], Properties::new());
1129        let b = g.create_node(vec!["Person".into()], Properties::new());
1130        let r = g
1131            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1132            .unwrap();
1133
1134        assert!(!g.delete_node(a.id));
1135        assert!(g.delete_relationship(r.id));
1136        assert!(g.delete_node(a.id));
1137        assert!(g.node(a.id).is_none());
1138    }
1139
1140    #[test]
1141    fn detach_delete_removes_incident_relationships() {
1142        let mut g = InMemoryGraph::new();
1143
1144        let a = g.create_node(vec!["Person".into()], Properties::new());
1145        let b = g.create_node(vec!["Person".into()], Properties::new());
1146        let c = g.create_node(vec!["Person".into()], Properties::new());
1147
1148        let r1 = g
1149            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1150            .unwrap();
1151        let r2 = g
1152            .create_relationship(c.id, a.id, "LIKES", Properties::new())
1153            .unwrap();
1154
1155        assert!(g.detach_delete_node(a.id));
1156        assert!(g.node(a.id).is_none());
1157        assert!(g.relationship(r1.id).is_none());
1158        assert!(g.relationship(r2.id).is_none());
1159        assert_eq!(g.all_relationships().len(), 0);
1160    }
1161
1162    #[test]
1163    fn duplicate_labels_are_normalized_on_create() {
1164        let mut g = InMemoryGraph::new();
1165
1166        let n = g.create_node(
1167            vec!["Person".into(), "Person".into(), "Admin".into()],
1168            Properties::new(),
1169        );
1170
1171        assert_eq!(n.labels, vec!["Person".to_string(), "Admin".to_string()]);
1172        assert_eq!(g.nodes_by_label("Person").len(), 1);
1173        assert_eq!(g.nodes_by_label("Admin").len(), 1);
1174    }
1175
1176    #[test]
1177    fn empty_labels_are_ignored() {
1178        let mut g = InMemoryGraph::new();
1179
1180        let n = g.create_node(
1181            vec!["Person".into(), "".into(), "   ".into()],
1182            Properties::new(),
1183        );
1184
1185        assert_eq!(n.labels, vec!["Person".to_string()]);
1186    }
1187
1188    #[test]
1189    fn empty_relationship_type_is_rejected() {
1190        let mut g = InMemoryGraph::new();
1191
1192        let a = g.create_node(vec!["A".into()], Properties::new());
1193        let b = g.create_node(vec!["B".into()], Properties::new());
1194
1195        assert!(g
1196            .create_relationship(a.id, b.id, "", Properties::new())
1197            .is_none());
1198    }
1199
1200    #[test]
1201    fn storage_schema_helpers_work() {
1202        let mut g = InMemoryGraph::new();
1203
1204        let a = g.create_node(
1205            vec!["Person".into()],
1206            props(&[("name", PropertyValue::String("Alice".into()))]),
1207        );
1208        let b = g.create_node(
1209            vec!["Company".into()],
1210            props(&[("title", PropertyValue::String("Acme".into()))]),
1211        );
1212
1213        g.create_relationship(
1214            a.id,
1215            b.id,
1216            "WORKS_AT",
1217            props(&[("since", PropertyValue::Int(2020))]),
1218        )
1219        .unwrap();
1220
1221        assert!(g.has_label_name("Person"));
1222        assert!(g.has_relationship_type_name("WORKS_AT"));
1223        assert!(g.has_property_key("name"));
1224        assert!(g.has_property_key("since"));
1225        assert!(g.label_has_property_key("Person", "name"));
1226        assert!(g.rel_type_has_property_key("WORKS_AT", "since"));
1227    }
1228
1229    #[test]
1230    fn clear_resets_the_graph() {
1231        let mut g = InMemoryGraph::new();
1232        let a = g.create_node(vec!["Person".into()], Properties::new());
1233        let b = g.create_node(vec!["Person".into()], Properties::new());
1234        g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
1235            .unwrap();
1236
1237        assert_eq!(g.node_count(), 2);
1238        assert_eq!(g.relationship_count(), 1);
1239
1240        g.clear();
1241
1242        assert_eq!(g.node_count(), 0);
1243        assert_eq!(g.relationship_count(), 0);
1244        assert_eq!(g.all_labels().len(), 0);
1245    }
1246
1247    #[test]
1248    fn snapshot_roundtrip_preserves_graph_state() {
1249        let mut original = InMemoryGraph::new();
1250        let a = original.create_node(
1251            vec!["Person".into()],
1252            props(&[("name", PropertyValue::String("Alice".into()))]),
1253        );
1254        let b = original.create_node(
1255            vec!["Person".into()],
1256            props(&[("name", PropertyValue::String("Bob".into()))]),
1257        );
1258        let r = original
1259            .create_relationship(
1260                a.id,
1261                b.id,
1262                "KNOWS",
1263                props(&[("since", PropertyValue::Int(2020))]),
1264            )
1265            .unwrap();
1266
1267        let mut buf = Vec::new();
1268        let save_meta = original.save_snapshot(&mut buf).unwrap();
1269        assert_eq!(save_meta.node_count, 2);
1270        assert_eq!(save_meta.relationship_count, 1);
1271        assert_eq!(save_meta.wal_lsn, None);
1272
1273        let mut restored = InMemoryGraph::new();
1274        let load_meta = restored.load_snapshot(&buf[..]).unwrap();
1275        assert_eq!(load_meta, save_meta);
1276
1277        assert_eq!(restored.node_count(), 2);
1278        assert_eq!(restored.relationship_count(), 1);
1279        assert_eq!(
1280            restored.node_property(a.id, "name"),
1281            Some(PropertyValue::String("Alice".into()))
1282        );
1283        assert_eq!(
1284            restored.relationship_property(r.id, "since"),
1285            Some(PropertyValue::Int(2020))
1286        );
1287
1288        // Adjacency + label index were rebuilt on load.
1289        assert_eq!(restored.outgoing_relationships(a.id).len(), 1);
1290        assert_eq!(restored.nodes_by_label("Person").len(), 2);
1291
1292        // Counters carry over so new IDs don't collide with pre-snapshot IDs.
1293        let c = restored.create_node(vec!["Person".into()], Properties::new());
1294        assert_eq!(c.id, b.id + 1);
1295    }
1296
1297    #[test]
1298    fn mutation_recorder_observes_every_committed_mutation() {
1299        use std::sync::Mutex;
1300
1301        #[derive(Default)]
1302        struct CapturingRecorder {
1303            events: Mutex<Vec<MutationEvent>>,
1304        }
1305
1306        impl MutationRecorder for CapturingRecorder {
1307            fn record(&self, event: &MutationEvent) {
1308                self.events.lock().unwrap().push(event.clone());
1309            }
1310        }
1311
1312        let recorder = Arc::new(CapturingRecorder::default());
1313        let mut g = InMemoryGraph::new();
1314        g.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
1315
1316        let a = g.create_node(vec!["Person".into()], Properties::new());
1317        let b = g.create_node(vec!["Person".into()], Properties::new());
1318        let r = g
1319            .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1320            .unwrap();
1321        g.set_node_property(a.id, "name".into(), PropertyValue::String("Alice".into()));
1322        g.remove_node_property(a.id, "name");
1323        g.add_node_label(a.id, "Admin");
1324        g.remove_node_label(a.id, "Admin");
1325        g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020));
1326        g.remove_relationship_property(r.id, "since");
1327        g.detach_delete_node(a.id);
1328        g.clear();
1329
1330        let events = recorder.events.lock().unwrap().clone();
1331        assert!(matches!(events[0], MutationEvent::CreateNode { .. }));
1332        assert!(matches!(events[1], MutationEvent::CreateNode { .. }));
1333        assert!(matches!(
1334            events[2],
1335            MutationEvent::CreateRelationship { .. }
1336        ));
1337        assert!(matches!(events[3], MutationEvent::SetNodeProperty { .. }));
1338        assert!(matches!(
1339            events[4],
1340            MutationEvent::RemoveNodeProperty { .. }
1341        ));
1342        assert!(matches!(events[5], MutationEvent::AddNodeLabel { .. }));
1343        assert!(matches!(events[6], MutationEvent::RemoveNodeLabel { .. }));
1344        assert!(matches!(
1345            events[7],
1346            MutationEvent::SetRelationshipProperty { .. }
1347        ));
1348        assert!(matches!(
1349            events[8],
1350            MutationEvent::RemoveRelationshipProperty { .. }
1351        ));
1352        // detach_delete_node composes three kinds of events: one
1353        // DeleteRelationship per incident edge, one DeleteNode for the node
1354        // itself, and a final DetachDeleteNode marker. A WAL replayer can
1355        // either apply every step or recognise the marker and skip forward.
1356        assert!(matches!(
1357            events[9],
1358            MutationEvent::DeleteRelationship { .. }
1359        ));
1360        assert!(matches!(events[10], MutationEvent::DeleteNode { .. }));
1361        assert!(matches!(events[11], MutationEvent::DetachDeleteNode { .. }));
1362        assert!(matches!(events.last(), Some(MutationEvent::Clear)));
1363
1364        // Failed mutations (invalid id) do not emit events.
1365        let before = recorder.events.lock().unwrap().len();
1366        assert!(!g.set_node_property(9999, "x".into(), PropertyValue::Int(0)));
1367        assert_eq!(recorder.events.lock().unwrap().len(), before);
1368    }
1369
1370    #[test]
1371    fn snapshot_load_resets_but_keeps_recorder() {
1372        use std::sync::Mutex;
1373
1374        struct CountingRecorder(Mutex<usize>);
1375        impl MutationRecorder for CountingRecorder {
1376            fn record(&self, _: &MutationEvent) {
1377                *self.0.lock().unwrap() += 1;
1378            }
1379        }
1380
1381        let counter: Arc<dyn MutationRecorder> = Arc::new(CountingRecorder(Mutex::new(0)));
1382        let mut g = InMemoryGraph::new();
1383        g.set_mutation_recorder(Some(counter));
1384        g.create_node(vec!["A".into()], Properties::new());
1385
1386        let mut buf = Vec::new();
1387        g.save_snapshot(&mut buf).unwrap();
1388
1389        // Load into the same graph — recorder should survive, store state
1390        // should be replaced by the snapshot contents.
1391        g.load_snapshot(&buf[..]).unwrap();
1392        assert!(g.mutation_recorder().is_some());
1393        assert_eq!(g.node_count(), 1);
1394
1395        // Subsequent mutations still feed the recorder.
1396        g.create_node(vec!["B".into()], Properties::new());
1397        // 1 for the initial A + 1 for the post-load B. The restore path
1398        // itself does not emit events (that's a snapshot, not a mutation).
1399    }
1400}