1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use lora_ast::Direction;
5
6use crate::snapshot::{read_snapshot, write_snapshot};
7use crate::{
8 BorrowedGraphStorage, GraphStorage, GraphStorageMut, MutationEvent, MutationRecorder, NodeId,
9 NodeRecord, Properties, PropertyValue, RelationshipId, RelationshipRecord, SnapshotError,
10 SnapshotMeta, SnapshotPayload, Snapshotable,
11};
12
13#[derive(Default)]
14pub struct InMemoryGraph {
15 next_node_id: NodeId,
16 next_rel_id: RelationshipId,
17
18 nodes: BTreeMap<NodeId, NodeRecord>,
19 relationships: BTreeMap<RelationshipId, RelationshipRecord>,
20
21 outgoing: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
23 incoming: BTreeMap<NodeId, BTreeSet<RelationshipId>>,
24
25 nodes_by_label: BTreeMap<String, BTreeSet<NodeId>>,
27 relationships_by_type: BTreeMap<String, BTreeSet<RelationshipId>>,
28
29 recorder: Option<Arc<dyn MutationRecorder>>,
34}
35
36impl std::fmt::Debug for InMemoryGraph {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("InMemoryGraph")
39 .field("next_node_id", &self.next_node_id)
40 .field("next_rel_id", &self.next_rel_id)
41 .field("nodes", &self.nodes)
42 .field("relationships", &self.relationships)
43 .field("outgoing", &self.outgoing)
44 .field("incoming", &self.incoming)
45 .field("nodes_by_label", &self.nodes_by_label)
46 .field("relationships_by_type", &self.relationships_by_type)
47 .field("recorder", &self.recorder.as_ref().map(|_| "installed"))
48 .finish()
49 }
50}
51
52impl Clone for InMemoryGraph {
53 fn clone(&self) -> Self {
54 Self {
57 next_node_id: self.next_node_id,
58 next_rel_id: self.next_rel_id,
59 nodes: self.nodes.clone(),
60 relationships: self.relationships.clone(),
61 outgoing: self.outgoing.clone(),
62 incoming: self.incoming.clone(),
63 nodes_by_label: self.nodes_by_label.clone(),
64 relationships_by_type: self.relationships_by_type.clone(),
65 recorder: None,
66 }
67 }
68}
69
70impl InMemoryGraph {
71 pub fn new() -> Self {
72 Self::default()
73 }
74
75 pub fn with_capacity_hint(_nodes: usize, _relationships: usize) -> Self {
76 Self::default()
78 }
79
80 pub fn contains_node(&self, node_id: NodeId) -> bool {
81 self.nodes.contains_key(&node_id)
82 }
83
84 pub fn contains_relationship(&self, rel_id: RelationshipId) -> bool {
85 self.relationships.contains_key(&rel_id)
86 }
87
88 pub fn set_mutation_recorder(&mut self, recorder: Option<Arc<dyn MutationRecorder>>) {
92 self.recorder = recorder;
93 }
94
95 pub fn mutation_recorder(&self) -> Option<&Arc<dyn MutationRecorder>> {
97 self.recorder.as_ref()
98 }
99
100 #[inline]
105 fn emit<F: FnOnce() -> MutationEvent>(&self, build: F) {
106 if let Some(rec) = &self.recorder {
107 rec.record(&build());
108 }
109 }
110
111 fn alloc_node_id(&mut self) -> NodeId {
112 let id = self.next_node_id;
113 self.next_node_id += 1;
114 id
115 }
116
117 fn alloc_rel_id(&mut self) -> RelationshipId {
118 let id = self.next_rel_id;
119 self.next_rel_id += 1;
120 id
121 }
122
123 fn bump_next_node_id_past(&mut self, id: NodeId) -> Result<(), String> {
124 let next = id
125 .checked_add(1)
126 .ok_or_else(|| format!("node id {id} leaves no valid next node id"))?;
127 self.next_node_id = self.next_node_id.max(next);
128 Ok(())
129 }
130
131 fn bump_next_rel_id_past(&mut self, id: RelationshipId) -> Result<(), String> {
132 let next = id
133 .checked_add(1)
134 .ok_or_else(|| format!("relationship id {id} leaves no valid next relationship id"))?;
135 self.next_rel_id = self.next_rel_id.max(next);
136 Ok(())
137 }
138
139 fn normalize_labels(labels: Vec<String>) -> Vec<String> {
140 let mut seen = BTreeSet::new();
141
142 labels
143 .into_iter()
144 .map(|s| s.trim().to_string())
145 .filter(|s| !s.is_empty())
146 .filter(|s| seen.insert(s.clone()))
147 .collect()
148 }
149
150 fn insert_node_label_index(&mut self, node_id: NodeId, label: &str) {
151 self.nodes_by_label
152 .entry(label.to_string())
153 .or_default()
154 .insert(node_id);
155 }
156
157 fn remove_node_label_index(&mut self, node_id: NodeId, label: &str) {
158 if let Some(ids) = self.nodes_by_label.get_mut(label) {
159 ids.remove(&node_id);
160 if ids.is_empty() {
161 self.nodes_by_label.remove(label);
162 }
163 }
164 }
165
166 fn insert_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
167 self.relationships_by_type
168 .entry(rel_type.to_string())
169 .or_default()
170 .insert(rel_id);
171 }
172
173 fn remove_relationship_type_index(&mut self, rel_id: RelationshipId, rel_type: &str) {
174 if let Some(ids) = self.relationships_by_type.get_mut(rel_type) {
175 ids.remove(&rel_id);
176 if ids.is_empty() {
177 self.relationships_by_type.remove(rel_type);
178 }
179 }
180 }
181
182 fn attach_relationship(&mut self, rel: &RelationshipRecord) {
183 self.outgoing.entry(rel.src).or_default().insert(rel.id);
184 self.incoming.entry(rel.dst).or_default().insert(rel.id);
185 self.insert_relationship_type_index(rel.id, &rel.rel_type);
186 }
187
188 fn detach_relationship_indexes(&mut self, rel: &RelationshipRecord) {
189 if let Some(ids) = self.outgoing.get_mut(&rel.src) {
190 ids.remove(&rel.id);
191 if ids.is_empty() {
192 self.outgoing.remove(&rel.src);
193 }
194 }
195
196 if let Some(ids) = self.incoming.get_mut(&rel.dst) {
197 ids.remove(&rel.id);
198 if ids.is_empty() {
199 self.incoming.remove(&rel.dst);
200 }
201 }
202
203 self.remove_relationship_type_index(rel.id, &rel.rel_type);
204 }
205
206 fn relationship_ids_for_direction(
207 &self,
208 node_id: NodeId,
209 direction: Direction,
210 ) -> Vec<RelationshipId> {
211 match direction {
212 Direction::Left => self
213 .incoming
214 .get(&node_id)
215 .map(|ids| ids.iter().copied().collect())
216 .unwrap_or_default(),
217
218 Direction::Right => self
219 .outgoing
220 .get(&node_id)
221 .map(|ids| ids.iter().copied().collect())
222 .unwrap_or_default(),
223
224 Direction::Undirected => {
225 let mut ids = BTreeSet::new();
226
227 if let Some(out) = self.outgoing.get(&node_id) {
228 ids.extend(out.iter().copied());
229 }
230 if let Some(inc) = self.incoming.get(&node_id) {
231 ids.extend(inc.iter().copied());
232 }
233
234 ids.into_iter().collect()
235 }
236 }
237 }
238
239 fn other_endpoint(rel: &RelationshipRecord, node_id: NodeId) -> Option<NodeId> {
240 if rel.src == node_id {
241 Some(rel.dst)
242 } else if rel.dst == node_id {
243 Some(rel.src)
244 } else {
245 None
246 }
247 }
248
249 fn has_incident_relationships(&self, node_id: NodeId) -> bool {
250 self.outgoing
251 .get(&node_id)
252 .map(|ids| !ids.is_empty())
253 .unwrap_or(false)
254 || self
255 .incoming
256 .get(&node_id)
257 .map(|ids| !ids.is_empty())
258 .unwrap_or(false)
259 }
260
261 fn incident_relationship_ids(&self, node_id: NodeId) -> BTreeSet<RelationshipId> {
262 let mut rel_ids = BTreeSet::new();
263
264 if let Some(ids) = self.outgoing.get(&node_id) {
265 rel_ids.extend(ids.iter().copied());
266 }
267 if let Some(ids) = self.incoming.get(&node_id) {
268 rel_ids.extend(ids.iter().copied());
269 }
270
271 rel_ids
272 }
273
274 #[doc(hidden)]
278 pub fn replay_create_node(
279 &mut self,
280 id: NodeId,
281 labels: Vec<String>,
282 properties: Properties,
283 ) -> Result<NodeRecord, String> {
284 if self.recorder.is_some() {
285 return Err(
286 "cannot replay node creation while a mutation recorder is installed".into(),
287 );
288 }
289 if self.nodes.contains_key(&id) {
290 return Err(format!("node id {id} already exists"));
291 }
292
293 let labels = Self::normalize_labels(labels);
294 let node = NodeRecord {
295 id,
296 labels: labels.clone(),
297 properties,
298 };
299
300 self.nodes.insert(id, node.clone());
301 for label in &labels {
302 self.insert_node_label_index(id, label);
303 }
304 self.outgoing.entry(id).or_default();
305 self.incoming.entry(id).or_default();
306 self.bump_next_node_id_past(id)?;
307
308 Ok(node)
309 }
310
311 #[doc(hidden)]
315 pub fn replay_create_relationship(
316 &mut self,
317 id: RelationshipId,
318 src: NodeId,
319 dst: NodeId,
320 rel_type: &str,
321 properties: Properties,
322 ) -> Result<RelationshipRecord, String> {
323 if self.recorder.is_some() {
324 return Err(
325 "cannot replay relationship creation while a mutation recorder is installed".into(),
326 );
327 }
328 if self.relationships.contains_key(&id) {
329 return Err(format!("relationship id {id} already exists"));
330 }
331 if !self.nodes.contains_key(&src) {
332 return Err(format!(
333 "relationship {id} references missing source node {src}"
334 ));
335 }
336 if !self.nodes.contains_key(&dst) {
337 return Err(format!(
338 "relationship {id} references missing target node {dst}"
339 ));
340 }
341
342 let trimmed = rel_type.trim();
343 if trimmed.is_empty() {
344 return Err(format!("relationship {id} has an empty type"));
345 }
346
347 let rel = RelationshipRecord {
348 id,
349 src,
350 dst,
351 rel_type: trimmed.to_string(),
352 properties,
353 };
354
355 self.attach_relationship(&rel);
356 self.relationships.insert(id, rel.clone());
357 self.bump_next_rel_id_past(id)?;
358
359 Ok(rel)
360 }
361}
362
363impl GraphStorage for InMemoryGraph {
364 fn contains_node(&self, id: NodeId) -> bool {
367 self.nodes.contains_key(&id)
368 }
369
370 fn node(&self, id: NodeId) -> Option<NodeRecord> {
371 self.nodes.get(&id).cloned()
372 }
373
374 fn all_node_ids(&self) -> Vec<NodeId> {
375 self.nodes.keys().copied().collect()
376 }
377
378 fn node_ids_by_label(&self, label: &str) -> Vec<NodeId> {
379 match self.nodes_by_label.get(label) {
380 Some(ids) => ids.iter().copied().collect(),
381 None => Vec::new(),
382 }
383 }
384
385 fn contains_relationship(&self, id: RelationshipId) -> bool {
386 self.relationships.contains_key(&id)
387 }
388
389 fn relationship(&self, id: RelationshipId) -> Option<RelationshipRecord> {
390 self.relationships.get(&id).cloned()
391 }
392
393 fn all_rel_ids(&self) -> Vec<RelationshipId> {
394 self.relationships.keys().copied().collect()
395 }
396
397 fn rel_ids_by_type(&self, rel_type: &str) -> Vec<RelationshipId> {
398 match self.relationships_by_type.get(rel_type) {
399 Some(ids) => ids.iter().copied().collect(),
400 None => Vec::new(),
401 }
402 }
403
404 fn relationship_endpoints(&self, id: RelationshipId) -> Option<(NodeId, NodeId)> {
405 self.relationships.get(&id).map(|r| (r.src, r.dst))
406 }
407
408 fn expand_ids(
409 &self,
410 node_id: NodeId,
411 direction: Direction,
412 types: &[String],
413 ) -> Vec<(RelationshipId, NodeId)> {
414 if !self.nodes.contains_key(&node_id) {
415 return Vec::new();
416 }
417
418 if types.is_empty() {
420 return self
421 .relationship_ids_for_direction(node_id, direction)
422 .into_iter()
423 .filter_map(|rel_id| {
424 let rel = self.relationships.get(&rel_id)?;
425 let other_id = Self::other_endpoint(rel, node_id)?;
426 Some((rel_id, other_id))
427 })
428 .collect();
429 }
430
431 self.relationship_ids_for_direction(node_id, direction)
435 .into_iter()
436 .filter_map(|rel_id| {
437 let rel = self.relationships.get(&rel_id)?;
438 if !types.iter().any(|t| t == &rel.rel_type) {
439 return None;
440 }
441 let other_id = Self::other_endpoint(rel, node_id)?;
442 Some((rel_id, other_id))
443 })
444 .collect()
445 }
446
447 fn all_labels(&self) -> Vec<String> {
448 self.nodes_by_label.keys().cloned().collect()
449 }
450
451 fn all_relationship_types(&self) -> Vec<String> {
452 self.relationships_by_type.keys().cloned().collect()
453 }
454
455 fn with_node<F, R>(&self, id: NodeId, f: F) -> Option<R>
458 where
459 F: FnOnce(&NodeRecord) -> R,
460 Self: Sized,
461 {
462 self.nodes.get(&id).map(f)
463 }
464
465 fn with_relationship<F, R>(&self, id: RelationshipId, f: F) -> Option<R>
466 where
467 F: FnOnce(&RelationshipRecord) -> R,
468 Self: Sized,
469 {
470 self.relationships.get(&id).map(f)
471 }
472
473 fn node_count(&self) -> usize {
476 self.nodes.len()
477 }
478
479 fn relationship_count(&self) -> usize {
480 self.relationships.len()
481 }
482
483 fn has_node(&self, id: NodeId) -> bool {
484 self.nodes.contains_key(&id)
485 }
486
487 fn has_relationship(&self, id: RelationshipId) -> bool {
488 self.relationships.contains_key(&id)
489 }
490
491 fn all_nodes(&self) -> Vec<NodeRecord> {
494 self.nodes.values().cloned().collect()
495 }
496
497 fn nodes_by_label(&self, label: &str) -> Vec<NodeRecord> {
498 self.nodes_by_label
499 .get(label)
500 .into_iter()
501 .flat_map(|ids| ids.iter())
502 .filter_map(|id| self.nodes.get(id).cloned())
503 .collect()
504 }
505
506 fn all_relationships(&self) -> Vec<RelationshipRecord> {
507 self.relationships.values().cloned().collect()
508 }
509
510 fn relationships_by_type(&self, rel_type: &str) -> Vec<RelationshipRecord> {
511 self.relationships_by_type
512 .get(rel_type)
513 .into_iter()
514 .flat_map(|ids| ids.iter())
515 .filter_map(|id| self.relationships.get(id).cloned())
516 .collect()
517 }
518
519 fn relationship_ids_of(&self, node_id: NodeId, direction: Direction) -> Vec<RelationshipId> {
522 self.relationship_ids_for_direction(node_id, direction)
523 }
524
525 fn outgoing_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
526 self.outgoing
527 .get(&node_id)
528 .into_iter()
529 .flat_map(|ids| ids.iter())
530 .filter_map(|id| self.relationships.get(id).cloned())
531 .collect()
532 }
533
534 fn incoming_relationships(&self, node_id: NodeId) -> Vec<RelationshipRecord> {
535 self.incoming
536 .get(&node_id)
537 .into_iter()
538 .flat_map(|ids| ids.iter())
539 .filter_map(|id| self.relationships.get(id).cloned())
540 .collect()
541 }
542
543 fn degree(&self, node_id: NodeId, direction: Direction) -> usize {
544 match direction {
545 Direction::Right => self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0),
546 Direction::Left => self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0),
547 Direction::Undirected => {
548 self.outgoing.get(&node_id).map(|s| s.len()).unwrap_or(0)
549 + self.incoming.get(&node_id).map(|s| s.len()).unwrap_or(0)
550 }
551 }
552 }
553
554 fn expand(
555 &self,
556 node_id: NodeId,
557 direction: Direction,
558 types: &[String],
559 ) -> Vec<(RelationshipRecord, NodeRecord)> {
560 if !self.nodes.contains_key(&node_id) {
561 return Vec::new();
562 }
563
564 let type_filter: Option<BTreeSet<&str>> = if types.is_empty() {
565 None
566 } else {
567 Some(types.iter().map(String::as_str).collect())
568 };
569
570 self.relationship_ids_for_direction(node_id, direction)
571 .into_iter()
572 .filter_map(|rel_id| self.relationships.get(&rel_id))
573 .filter(|rel| {
574 type_filter
575 .as_ref()
576 .map(|allowed| allowed.contains(rel.rel_type.as_str()))
577 .unwrap_or(true)
578 })
579 .filter_map(|rel| {
580 let other_id = Self::other_endpoint(rel, node_id)?;
581 let other = self.nodes.get(&other_id)?;
582 Some((rel.clone(), other.clone()))
583 })
584 .collect()
585 }
586
587 fn all_node_property_keys(&self) -> Vec<String> {
590 let mut keys = BTreeSet::new();
591 for node in self.nodes.values() {
592 for key in node.properties.keys() {
593 keys.insert(key.clone());
594 }
595 }
596 keys.into_iter().collect()
597 }
598
599 fn all_relationship_property_keys(&self) -> Vec<String> {
600 let mut keys = BTreeSet::new();
601 for rel in self.relationships.values() {
602 for key in rel.properties.keys() {
603 keys.insert(key.clone());
604 }
605 }
606 keys.into_iter().collect()
607 }
608
609 fn label_property_keys(&self, label: &str) -> Vec<String> {
610 let mut keys = BTreeSet::new();
611
612 if let Some(ids) = self.nodes_by_label.get(label) {
613 for id in ids {
614 if let Some(node) = self.nodes.get(id) {
615 for key in node.properties.keys() {
616 keys.insert(key.clone());
617 }
618 }
619 }
620 }
621
622 keys.into_iter().collect()
623 }
624
625 fn rel_type_property_keys(&self, rel_type: &str) -> Vec<String> {
626 let mut keys = BTreeSet::new();
627
628 if let Some(ids) = self.relationships_by_type.get(rel_type) {
629 for id in ids {
630 if let Some(rel) = self.relationships.get(id) {
631 for key in rel.properties.keys() {
632 keys.insert(key.clone());
633 }
634 }
635 }
636 }
637
638 keys.into_iter().collect()
639 }
640}
641
642impl BorrowedGraphStorage for InMemoryGraph {
643 fn node_ref(&self, id: NodeId) -> Option<&NodeRecord> {
644 self.nodes.get(&id)
645 }
646
647 fn relationship_ref(&self, id: RelationshipId) -> Option<&RelationshipRecord> {
648 self.relationships.get(&id)
649 }
650}
651
652impl GraphStorageMut for InMemoryGraph {
653 fn create_node(&mut self, labels: Vec<String>, properties: Properties) -> NodeRecord {
654 let id = self.alloc_node_id();
655 let labels = Self::normalize_labels(labels);
656
657 let node = NodeRecord {
658 id,
659 labels: labels.clone(),
660 properties,
661 };
662
663 self.nodes.insert(id, node.clone());
664
665 for label in &labels {
666 self.insert_node_label_index(id, label);
667 }
668
669 self.outgoing.entry(id).or_default();
670 self.incoming.entry(id).or_default();
671
672 self.emit(|| MutationEvent::CreateNode {
673 id,
674 labels: node.labels.clone(),
675 properties: node.properties.clone(),
676 });
677
678 node
679 }
680
681 fn create_relationship(
682 &mut self,
683 src: NodeId,
684 dst: NodeId,
685 rel_type: &str,
686 properties: Properties,
687 ) -> Option<RelationshipRecord> {
688 if !self.nodes.contains_key(&src) || !self.nodes.contains_key(&dst) {
689 return None;
690 }
691
692 let trimmed = rel_type.trim();
693 if trimmed.is_empty() {
694 return None;
695 }
696
697 let id = self.alloc_rel_id();
698 let rel = RelationshipRecord {
699 id,
700 src,
701 dst,
702 rel_type: trimmed.to_string(),
703 properties,
704 };
705
706 self.attach_relationship(&rel);
707 self.relationships.insert(id, rel.clone());
708
709 self.emit(|| MutationEvent::CreateRelationship {
710 id,
711 src,
712 dst,
713 rel_type: rel.rel_type.clone(),
714 properties: rel.properties.clone(),
715 });
716
717 Some(rel)
718 }
719
720 fn set_node_property(&mut self, node_id: NodeId, key: String, value: PropertyValue) -> bool {
721 let recorder_active = self.recorder.is_some();
724 let (stored_key, stored_value) = if recorder_active {
725 (Some(key.clone()), Some(value.clone()))
726 } else {
727 (None, None)
728 };
729
730 let applied = match self.nodes.get_mut(&node_id) {
731 Some(node) => {
732 node.properties.insert(key, value);
733 true
734 }
735 None => false,
736 };
737 if applied {
738 self.emit(|| MutationEvent::SetNodeProperty {
739 node_id,
740 key: stored_key.unwrap(),
741 value: stored_value.unwrap(),
742 });
743 }
744 applied
745 }
746
747 fn remove_node_property(&mut self, node_id: NodeId, key: &str) -> bool {
748 let applied = match self.nodes.get_mut(&node_id) {
749 Some(node) => node.properties.remove(key).is_some(),
750 None => false,
751 };
752 if applied {
753 self.emit(|| MutationEvent::RemoveNodeProperty {
754 node_id,
755 key: key.to_string(),
756 });
757 }
758 applied
759 }
760
761 fn add_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
762 let label = label.trim();
763 if label.is_empty() {
764 return false;
765 }
766
767 let applied = match self.nodes.get_mut(&node_id) {
768 Some(node) => {
769 if node.labels.iter().any(|l| l == label) {
770 return false;
771 }
772
773 node.labels.push(label.to_string());
774 self.insert_node_label_index(node_id, label);
775 true
776 }
777 None => false,
778 };
779 if applied {
780 self.emit(|| MutationEvent::AddNodeLabel {
781 node_id,
782 label: label.to_string(),
783 });
784 }
785 applied
786 }
787
788 fn remove_node_label(&mut self, node_id: NodeId, label: &str) -> bool {
789 let applied = match self.nodes.get_mut(&node_id) {
790 Some(node) => {
791 let original_len = node.labels.len();
792 node.labels.retain(|l| l != label);
793
794 if node.labels.len() != original_len {
795 self.remove_node_label_index(node_id, label);
796 true
797 } else {
798 false
799 }
800 }
801 None => false,
802 };
803 if applied {
804 self.emit(|| MutationEvent::RemoveNodeLabel {
805 node_id,
806 label: label.to_string(),
807 });
808 }
809 applied
810 }
811
812 fn set_relationship_property(
813 &mut self,
814 rel_id: RelationshipId,
815 key: String,
816 value: PropertyValue,
817 ) -> bool {
818 let recorder_active = self.recorder.is_some();
819 let (stored_key, stored_value) = if recorder_active {
820 (Some(key.clone()), Some(value.clone()))
821 } else {
822 (None, None)
823 };
824
825 let applied = match self.relationships.get_mut(&rel_id) {
826 Some(rel) => {
827 rel.properties.insert(key, value);
828 true
829 }
830 None => false,
831 };
832 if applied {
833 self.emit(|| MutationEvent::SetRelationshipProperty {
834 rel_id,
835 key: stored_key.unwrap(),
836 value: stored_value.unwrap(),
837 });
838 }
839 applied
840 }
841
842 fn remove_relationship_property(&mut self, rel_id: RelationshipId, key: &str) -> bool {
843 let applied = match self.relationships.get_mut(&rel_id) {
844 Some(rel) => rel.properties.remove(key).is_some(),
845 None => false,
846 };
847 if applied {
848 self.emit(|| MutationEvent::RemoveRelationshipProperty {
849 rel_id,
850 key: key.to_string(),
851 });
852 }
853 applied
854 }
855
856 fn delete_relationship(&mut self, rel_id: RelationshipId) -> bool {
857 let applied = match self.relationships.remove(&rel_id) {
858 Some(rel) => {
859 self.detach_relationship_indexes(&rel);
860 true
861 }
862 None => false,
863 };
864 if applied {
865 self.emit(|| MutationEvent::DeleteRelationship { rel_id });
866 }
867 applied
868 }
869
870 fn delete_node(&mut self, node_id: NodeId) -> bool {
871 if !self.nodes.contains_key(&node_id) {
872 return false;
873 }
874
875 if self.has_incident_relationships(node_id) {
876 return false;
877 }
878
879 let node = match self.nodes.remove(&node_id) {
880 Some(node) => node,
881 None => return false,
882 };
883
884 for label in &node.labels {
885 self.remove_node_label_index(node_id, label);
886 }
887
888 self.outgoing.remove(&node_id);
889 self.incoming.remove(&node_id);
890
891 self.emit(|| MutationEvent::DeleteNode { node_id });
892
893 true
894 }
895
896 fn detach_delete_node(&mut self, node_id: NodeId) -> bool {
897 if !self.nodes.contains_key(&node_id) {
898 return false;
899 }
900
901 let rel_ids: Vec<_> = self
902 .incident_relationship_ids(node_id)
903 .into_iter()
904 .collect();
905
906 for rel_id in rel_ids {
915 let _ = self.delete_relationship(rel_id);
916 }
917
918 if self.delete_node(node_id) {
919 self.emit(|| MutationEvent::DetachDeleteNode { node_id });
920 true
921 } else {
922 false
923 }
924 }
925
926 fn clear(&mut self) {
927 let recorder = self.recorder.take();
931 *self = Self::default();
932 self.recorder = recorder;
933 self.emit(|| MutationEvent::Clear);
934 }
935}
936
937impl Snapshotable for InMemoryGraph {
942 fn save_snapshot<W: std::io::Write>(&self, writer: W) -> Result<SnapshotMeta, SnapshotError> {
943 let payload = SnapshotPayload {
944 next_node_id: self.next_node_id,
945 next_rel_id: self.next_rel_id,
946 nodes: self.nodes.values().cloned().collect(),
947 relationships: self.relationships.values().cloned().collect(),
948 };
949 write_snapshot(writer, &payload, None)
950 }
951
952 fn save_checkpoint<W: std::io::Write>(
953 &self,
954 writer: W,
955 wal_lsn: u64,
956 ) -> Result<SnapshotMeta, SnapshotError> {
957 let payload = SnapshotPayload {
958 next_node_id: self.next_node_id,
959 next_rel_id: self.next_rel_id,
960 nodes: self.nodes.values().cloned().collect(),
961 relationships: self.relationships.values().cloned().collect(),
962 };
963 write_snapshot(writer, &payload, Some(wal_lsn))
964 }
965
966 fn load_snapshot<R: std::io::Read>(
967 &mut self,
968 reader: R,
969 ) -> Result<SnapshotMeta, SnapshotError> {
970 let (payload, meta) = read_snapshot(reader)?;
971
972 let mut rebuilt = Self {
977 next_node_id: payload.next_node_id,
978 next_rel_id: payload.next_rel_id,
979 ..Self::default()
980 };
981
982 for node in payload.nodes {
984 let id = node.id;
985 let labels = node.labels.clone();
986 rebuilt.nodes.insert(id, node);
987 for label in &labels {
988 rebuilt.insert_node_label_index(id, label);
989 }
990 rebuilt.outgoing.entry(id).or_default();
991 rebuilt.incoming.entry(id).or_default();
992 }
993
994 for rel in payload.relationships {
996 rebuilt.attach_relationship(&rel);
997 rebuilt.relationships.insert(rel.id, rel);
998 }
999
1000 rebuilt.recorder = self.recorder.take();
1004 *self = rebuilt;
1005
1006 Ok(meta)
1007 }
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012 use super::*;
1013
1014 fn props(pairs: &[(&str, PropertyValue)]) -> Properties {
1015 pairs
1016 .iter()
1017 .map(|(k, v)| ((*k).to_string(), v.clone()))
1018 .collect()
1019 }
1020
1021 #[test]
1022 fn create_and_lookup_nodes() {
1023 let mut g = InMemoryGraph::new();
1024
1025 let a = g.create_node(
1026 vec!["Person".into(), "Employee".into()],
1027 props(&[("name", PropertyValue::String("Alice".into()))]),
1028 );
1029 let b = g.create_node(
1030 vec!["Person".into()],
1031 props(&[("name", PropertyValue::String("Bob".into()))]),
1032 );
1033
1034 assert_eq!(a.id, 0);
1035 assert_eq!(b.id, 1);
1036
1037 assert_eq!(g.all_nodes().len(), 2);
1038 assert_eq!(g.nodes_by_label("Person").len(), 2);
1039 assert_eq!(g.nodes_by_label("Employee").len(), 1);
1040 assert!(g.node_has_label(a.id, "Person"));
1041 assert_eq!(
1042 g.node_property(a.id, "name"),
1043 Some(PropertyValue::String("Alice".into()))
1044 );
1045 }
1046
1047 #[test]
1048 fn create_and_expand_relationships() {
1049 let mut g = InMemoryGraph::new();
1050
1051 let a = g.create_node(vec!["Person".into()], Properties::new());
1052 let b = g.create_node(vec!["Person".into()], Properties::new());
1053 let c = g.create_node(vec!["Company".into()], Properties::new());
1054
1055 let r1 = g
1056 .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1057 .unwrap();
1058 let r2 = g
1059 .create_relationship(a.id, c.id, "WORKS_AT", Properties::new())
1060 .unwrap();
1061
1062 assert_eq!(g.all_relationships().len(), 2);
1063 assert_eq!(g.relationships_by_type("KNOWS").len(), 1);
1064 assert_eq!(g.outgoing_relationships(a.id).len(), 2);
1065 assert_eq!(g.incoming_relationships(b.id).len(), 1);
1066
1067 let knows = g.expand(a.id, Direction::Right, &[String::from("KNOWS")]);
1068 assert_eq!(knows.len(), 1);
1069 assert_eq!(knows[0].0.id, r1.id);
1070 assert_eq!(knows[0].1.id, b.id);
1071
1072 let undirected = g.expand(a.id, Direction::Undirected, &[]);
1073 assert_eq!(undirected.len(), 2);
1074
1075 assert_eq!(g.relationship(r2.id).unwrap().dst, c.id);
1076 }
1077
1078 #[test]
1079 fn incoming_and_outgoing_are_distinct() {
1080 let mut g = InMemoryGraph::new();
1081
1082 let a = g.create_node(vec!["Person".into()], Properties::new());
1083 let b = g.create_node(vec!["Person".into()], Properties::new());
1084 let c = g.create_node(vec!["Person".into()], Properties::new());
1085
1086 g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
1087 .unwrap();
1088 g.create_relationship(c.id, a.id, "LIKES", Properties::new())
1089 .unwrap();
1090
1091 let outgoing = g.expand(a.id, Direction::Right, &[]);
1092 let incoming = g.expand(a.id, Direction::Left, &[]);
1093
1094 assert_eq!(outgoing.len(), 1);
1095 assert_eq!(incoming.len(), 1);
1096 assert_eq!(outgoing[0].1.id, b.id);
1097 assert_eq!(incoming[0].1.id, c.id);
1098 }
1099
1100 #[test]
1101 fn set_and_remove_properties() {
1102 let mut g = InMemoryGraph::new();
1103
1104 let n = g.create_node(vec!["Person".into()], Properties::new());
1105 assert!(g.set_node_property(n.id, "age".into(), PropertyValue::Int(42)));
1106 assert_eq!(g.node_property(n.id, "age"), Some(PropertyValue::Int(42)));
1107 assert!(g.remove_node_property(n.id, "age"));
1108 assert_eq!(g.node_property(n.id, "age"), None);
1109
1110 let m = g.create_node(vec!["Person".into()], Properties::new());
1111 let r = g
1112 .create_relationship(n.id, m.id, "KNOWS", Properties::new())
1113 .unwrap();
1114
1115 assert!(g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020)));
1116 assert_eq!(
1117 g.relationship_property(r.id, "since"),
1118 Some(PropertyValue::Int(2020))
1119 );
1120 assert!(g.remove_relationship_property(r.id, "since"));
1121 assert_eq!(g.relationship_property(r.id, "since"), None);
1122 }
1123
1124 #[test]
1125 fn delete_requires_detach() {
1126 let mut g = InMemoryGraph::new();
1127
1128 let a = g.create_node(vec!["Person".into()], Properties::new());
1129 let b = g.create_node(vec!["Person".into()], Properties::new());
1130 let r = g
1131 .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1132 .unwrap();
1133
1134 assert!(!g.delete_node(a.id));
1135 assert!(g.delete_relationship(r.id));
1136 assert!(g.delete_node(a.id));
1137 assert!(g.node(a.id).is_none());
1138 }
1139
1140 #[test]
1141 fn detach_delete_removes_incident_relationships() {
1142 let mut g = InMemoryGraph::new();
1143
1144 let a = g.create_node(vec!["Person".into()], Properties::new());
1145 let b = g.create_node(vec!["Person".into()], Properties::new());
1146 let c = g.create_node(vec!["Person".into()], Properties::new());
1147
1148 let r1 = g
1149 .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1150 .unwrap();
1151 let r2 = g
1152 .create_relationship(c.id, a.id, "LIKES", Properties::new())
1153 .unwrap();
1154
1155 assert!(g.detach_delete_node(a.id));
1156 assert!(g.node(a.id).is_none());
1157 assert!(g.relationship(r1.id).is_none());
1158 assert!(g.relationship(r2.id).is_none());
1159 assert_eq!(g.all_relationships().len(), 0);
1160 }
1161
1162 #[test]
1163 fn duplicate_labels_are_normalized_on_create() {
1164 let mut g = InMemoryGraph::new();
1165
1166 let n = g.create_node(
1167 vec!["Person".into(), "Person".into(), "Admin".into()],
1168 Properties::new(),
1169 );
1170
1171 assert_eq!(n.labels, vec!["Person".to_string(), "Admin".to_string()]);
1172 assert_eq!(g.nodes_by_label("Person").len(), 1);
1173 assert_eq!(g.nodes_by_label("Admin").len(), 1);
1174 }
1175
1176 #[test]
1177 fn empty_labels_are_ignored() {
1178 let mut g = InMemoryGraph::new();
1179
1180 let n = g.create_node(
1181 vec!["Person".into(), "".into(), " ".into()],
1182 Properties::new(),
1183 );
1184
1185 assert_eq!(n.labels, vec!["Person".to_string()]);
1186 }
1187
1188 #[test]
1189 fn empty_relationship_type_is_rejected() {
1190 let mut g = InMemoryGraph::new();
1191
1192 let a = g.create_node(vec!["A".into()], Properties::new());
1193 let b = g.create_node(vec!["B".into()], Properties::new());
1194
1195 assert!(g
1196 .create_relationship(a.id, b.id, "", Properties::new())
1197 .is_none());
1198 }
1199
1200 #[test]
1201 fn storage_schema_helpers_work() {
1202 let mut g = InMemoryGraph::new();
1203
1204 let a = g.create_node(
1205 vec!["Person".into()],
1206 props(&[("name", PropertyValue::String("Alice".into()))]),
1207 );
1208 let b = g.create_node(
1209 vec!["Company".into()],
1210 props(&[("title", PropertyValue::String("Acme".into()))]),
1211 );
1212
1213 g.create_relationship(
1214 a.id,
1215 b.id,
1216 "WORKS_AT",
1217 props(&[("since", PropertyValue::Int(2020))]),
1218 )
1219 .unwrap();
1220
1221 assert!(g.has_label_name("Person"));
1222 assert!(g.has_relationship_type_name("WORKS_AT"));
1223 assert!(g.has_property_key("name"));
1224 assert!(g.has_property_key("since"));
1225 assert!(g.label_has_property_key("Person", "name"));
1226 assert!(g.rel_type_has_property_key("WORKS_AT", "since"));
1227 }
1228
1229 #[test]
1230 fn clear_resets_the_graph() {
1231 let mut g = InMemoryGraph::new();
1232 let a = g.create_node(vec!["Person".into()], Properties::new());
1233 let b = g.create_node(vec!["Person".into()], Properties::new());
1234 g.create_relationship(a.id, b.id, "KNOWS", Properties::new())
1235 .unwrap();
1236
1237 assert_eq!(g.node_count(), 2);
1238 assert_eq!(g.relationship_count(), 1);
1239
1240 g.clear();
1241
1242 assert_eq!(g.node_count(), 0);
1243 assert_eq!(g.relationship_count(), 0);
1244 assert_eq!(g.all_labels().len(), 0);
1245 }
1246
1247 #[test]
1248 fn snapshot_roundtrip_preserves_graph_state() {
1249 let mut original = InMemoryGraph::new();
1250 let a = original.create_node(
1251 vec!["Person".into()],
1252 props(&[("name", PropertyValue::String("Alice".into()))]),
1253 );
1254 let b = original.create_node(
1255 vec!["Person".into()],
1256 props(&[("name", PropertyValue::String("Bob".into()))]),
1257 );
1258 let r = original
1259 .create_relationship(
1260 a.id,
1261 b.id,
1262 "KNOWS",
1263 props(&[("since", PropertyValue::Int(2020))]),
1264 )
1265 .unwrap();
1266
1267 let mut buf = Vec::new();
1268 let save_meta = original.save_snapshot(&mut buf).unwrap();
1269 assert_eq!(save_meta.node_count, 2);
1270 assert_eq!(save_meta.relationship_count, 1);
1271 assert_eq!(save_meta.wal_lsn, None);
1272
1273 let mut restored = InMemoryGraph::new();
1274 let load_meta = restored.load_snapshot(&buf[..]).unwrap();
1275 assert_eq!(load_meta, save_meta);
1276
1277 assert_eq!(restored.node_count(), 2);
1278 assert_eq!(restored.relationship_count(), 1);
1279 assert_eq!(
1280 restored.node_property(a.id, "name"),
1281 Some(PropertyValue::String("Alice".into()))
1282 );
1283 assert_eq!(
1284 restored.relationship_property(r.id, "since"),
1285 Some(PropertyValue::Int(2020))
1286 );
1287
1288 assert_eq!(restored.outgoing_relationships(a.id).len(), 1);
1290 assert_eq!(restored.nodes_by_label("Person").len(), 2);
1291
1292 let c = restored.create_node(vec!["Person".into()], Properties::new());
1294 assert_eq!(c.id, b.id + 1);
1295 }
1296
1297 #[test]
1298 fn mutation_recorder_observes_every_committed_mutation() {
1299 use std::sync::Mutex;
1300
1301 #[derive(Default)]
1302 struct CapturingRecorder {
1303 events: Mutex<Vec<MutationEvent>>,
1304 }
1305
1306 impl MutationRecorder for CapturingRecorder {
1307 fn record(&self, event: &MutationEvent) {
1308 self.events.lock().unwrap().push(event.clone());
1309 }
1310 }
1311
1312 let recorder = Arc::new(CapturingRecorder::default());
1313 let mut g = InMemoryGraph::new();
1314 g.set_mutation_recorder(Some(recorder.clone() as Arc<dyn MutationRecorder>));
1315
1316 let a = g.create_node(vec!["Person".into()], Properties::new());
1317 let b = g.create_node(vec!["Person".into()], Properties::new());
1318 let r = g
1319 .create_relationship(a.id, b.id, "KNOWS", Properties::new())
1320 .unwrap();
1321 g.set_node_property(a.id, "name".into(), PropertyValue::String("Alice".into()));
1322 g.remove_node_property(a.id, "name");
1323 g.add_node_label(a.id, "Admin");
1324 g.remove_node_label(a.id, "Admin");
1325 g.set_relationship_property(r.id, "since".into(), PropertyValue::Int(2020));
1326 g.remove_relationship_property(r.id, "since");
1327 g.detach_delete_node(a.id);
1328 g.clear();
1329
1330 let events = recorder.events.lock().unwrap().clone();
1331 assert!(matches!(events[0], MutationEvent::CreateNode { .. }));
1332 assert!(matches!(events[1], MutationEvent::CreateNode { .. }));
1333 assert!(matches!(
1334 events[2],
1335 MutationEvent::CreateRelationship { .. }
1336 ));
1337 assert!(matches!(events[3], MutationEvent::SetNodeProperty { .. }));
1338 assert!(matches!(
1339 events[4],
1340 MutationEvent::RemoveNodeProperty { .. }
1341 ));
1342 assert!(matches!(events[5], MutationEvent::AddNodeLabel { .. }));
1343 assert!(matches!(events[6], MutationEvent::RemoveNodeLabel { .. }));
1344 assert!(matches!(
1345 events[7],
1346 MutationEvent::SetRelationshipProperty { .. }
1347 ));
1348 assert!(matches!(
1349 events[8],
1350 MutationEvent::RemoveRelationshipProperty { .. }
1351 ));
1352 assert!(matches!(
1357 events[9],
1358 MutationEvent::DeleteRelationship { .. }
1359 ));
1360 assert!(matches!(events[10], MutationEvent::DeleteNode { .. }));
1361 assert!(matches!(events[11], MutationEvent::DetachDeleteNode { .. }));
1362 assert!(matches!(events.last(), Some(MutationEvent::Clear)));
1363
1364 let before = recorder.events.lock().unwrap().len();
1366 assert!(!g.set_node_property(9999, "x".into(), PropertyValue::Int(0)));
1367 assert_eq!(recorder.events.lock().unwrap().len(), before);
1368 }
1369
1370 #[test]
1371 fn snapshot_load_resets_but_keeps_recorder() {
1372 use std::sync::Mutex;
1373
1374 struct CountingRecorder(Mutex<usize>);
1375 impl MutationRecorder for CountingRecorder {
1376 fn record(&self, _: &MutationEvent) {
1377 *self.0.lock().unwrap() += 1;
1378 }
1379 }
1380
1381 let counter: Arc<dyn MutationRecorder> = Arc::new(CountingRecorder(Mutex::new(0)));
1382 let mut g = InMemoryGraph::new();
1383 g.set_mutation_recorder(Some(counter));
1384 g.create_node(vec!["A".into()], Properties::new());
1385
1386 let mut buf = Vec::new();
1387 g.save_snapshot(&mut buf).unwrap();
1388
1389 g.load_snapshot(&buf[..]).unwrap();
1392 assert!(g.mutation_recorder().is_some());
1393 assert_eq!(g.node_count(), 1);
1394
1395 g.create_node(vec!["B".into()], Properties::new());
1397 }
1400}