1#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15#[derive(serde::Serialize, serde::Deserialize)]
17struct Snapshot {
18 version: u8,
19 nodes: Vec<SnapshotNode>,
20 edges: Vec<SnapshotEdge>,
21}
22
23#[derive(serde::Serialize, serde::Deserialize)]
24struct SnapshotNode {
25 id: NodeId,
26 labels: Vec<String>,
27 properties: Vec<(String, Value)>,
28}
29
30#[derive(serde::Serialize, serde::Deserialize)]
31struct SnapshotEdge {
32 id: EdgeId,
33 src: NodeId,
34 dst: NodeId,
35 edge_type: String,
36 properties: Vec<(String, Value)>,
37}
38
39impl super::GrafeoDB {
40 #[cfg(feature = "wal")]
57 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
58 let path = path.as_ref();
59
60 let target_config = Config::persistent(path);
62 let target = Self::with_config(target_config)?;
63
64 for node in self.store.all_nodes() {
66 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
67 target.store.create_node_with_id(node.id, &label_refs);
68
69 target.log_wal(&WalRecord::CreateNode {
71 id: node.id,
72 labels: node.labels.iter().map(|s| s.to_string()).collect(),
73 })?;
74
75 for (key, value) in node.properties {
77 target
78 .store
79 .set_node_property(node.id, key.as_str(), value.clone());
80 target.log_wal(&WalRecord::SetNodeProperty {
81 id: node.id,
82 key: key.to_string(),
83 value,
84 })?;
85 }
86 }
87
88 for edge in self.store.all_edges() {
90 target
91 .store
92 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
93
94 target.log_wal(&WalRecord::CreateEdge {
96 id: edge.id,
97 src: edge.src,
98 dst: edge.dst,
99 edge_type: edge.edge_type.to_string(),
100 })?;
101
102 for (key, value) in edge.properties {
104 target
105 .store
106 .set_edge_property(edge.id, key.as_str(), value.clone());
107 target.log_wal(&WalRecord::SetEdgeProperty {
108 id: edge.id,
109 key: key.to_string(),
110 value,
111 })?;
112 }
113 }
114
115 target.close()?;
117
118 Ok(())
119 }
120
121 pub fn to_memory(&self) -> Result<Self> {
132 let config = Config::in_memory();
133 let target = Self::with_config(config)?;
134
135 for node in self.store.all_nodes() {
137 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
138 target.store.create_node_with_id(node.id, &label_refs);
139
140 for (key, value) in node.properties {
142 target.store.set_node_property(node.id, key.as_str(), value);
143 }
144 }
145
146 for edge in self.store.all_edges() {
148 target
149 .store
150 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
151
152 for (key, value) in edge.properties {
154 target.store.set_edge_property(edge.id, key.as_str(), value);
155 }
156 }
157
158 Ok(target)
159 }
160
161 #[cfg(feature = "wal")]
170 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
171 let source = Self::open(path)?;
173
174 let target = source.to_memory()?;
176
177 source.close()?;
179
180 Ok(target)
181 }
182
183 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
196 let nodes: Vec<SnapshotNode> = self
197 .store
198 .all_nodes()
199 .map(|n| SnapshotNode {
200 id: n.id,
201 labels: n.labels.iter().map(|l| l.to_string()).collect(),
202 properties: n
203 .properties
204 .into_iter()
205 .map(|(k, v)| (k.to_string(), v))
206 .collect(),
207 })
208 .collect();
209
210 let edges: Vec<SnapshotEdge> = self
211 .store
212 .all_edges()
213 .map(|e| SnapshotEdge {
214 id: e.id,
215 src: e.src,
216 dst: e.dst,
217 edge_type: e.edge_type.to_string(),
218 properties: e
219 .properties
220 .into_iter()
221 .map(|(k, v)| (k.to_string(), v))
222 .collect(),
223 })
224 .collect();
225
226 let snapshot = Snapshot {
227 version: 1,
228 nodes,
229 edges,
230 };
231
232 let config = bincode::config::standard();
233 bincode::serde::encode_to_vec(&snapshot, config)
234 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
235 }
236
237 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
251 let config = bincode::config::standard();
252 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
253 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
254
255 if snapshot.version != 1 {
256 return Err(Error::Internal(format!(
257 "unsupported snapshot version: {}",
258 snapshot.version
259 )));
260 }
261
262 let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
264 for node in &snapshot.nodes {
265 if !node_ids.insert(node.id) {
266 return Err(Error::Internal(format!(
267 "snapshot contains duplicate node ID {}",
268 node.id
269 )));
270 }
271 }
272
273 let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
275 for edge in &snapshot.edges {
276 if !edge_ids.insert(edge.id) {
277 return Err(Error::Internal(format!(
278 "snapshot contains duplicate edge ID {}",
279 edge.id
280 )));
281 }
282 if !node_ids.contains(&edge.src) {
283 return Err(Error::Internal(format!(
284 "snapshot edge {} references non-existent source node {}",
285 edge.id, edge.src
286 )));
287 }
288 if !node_ids.contains(&edge.dst) {
289 return Err(Error::Internal(format!(
290 "snapshot edge {} references non-existent destination node {}",
291 edge.id, edge.dst
292 )));
293 }
294 }
295
296 let db = Self::new_in_memory();
298
299 for node in snapshot.nodes {
300 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
301 db.store.create_node_with_id(node.id, &label_refs);
302 for (key, value) in node.properties {
303 db.store.set_node_property(node.id, &key, value);
304 }
305 }
306
307 for edge in snapshot.edges {
308 db.store
309 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
310 for (key, value) in edge.properties {
311 db.store.set_edge_property(edge.id, &key, value);
312 }
313 }
314
315 Ok(db)
316 }
317
318 pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
333 let config = bincode::config::standard();
334 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
335 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
336
337 if snapshot.version != 1 {
338 return Err(Error::Internal(format!(
339 "unsupported snapshot version: {}",
340 snapshot.version
341 )));
342 }
343
344 let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
346 for node in &snapshot.nodes {
347 if !node_ids.insert(node.id) {
348 return Err(Error::Internal(format!(
349 "snapshot contains duplicate node ID {}",
350 node.id
351 )));
352 }
353 }
354
355 let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
357 for edge in &snapshot.edges {
358 if !edge_ids.insert(edge.id) {
359 return Err(Error::Internal(format!(
360 "snapshot contains duplicate edge ID {}",
361 edge.id
362 )));
363 }
364 if !node_ids.contains(&edge.src) {
365 return Err(Error::Internal(format!(
366 "snapshot edge {} references non-existent source node {}",
367 edge.id, edge.src
368 )));
369 }
370 if !node_ids.contains(&edge.dst) {
371 return Err(Error::Internal(format!(
372 "snapshot edge {} references non-existent destination node {}",
373 edge.id, edge.dst
374 )));
375 }
376 }
377
378 self.store.clear();
380
381 for node in snapshot.nodes {
382 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
383 self.store.create_node_with_id(node.id, &label_refs);
384 for (key, value) in node.properties {
385 self.store.set_node_property(node.id, &key, value);
386 }
387 }
388
389 for edge in snapshot.edges {
390 self.store
391 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
392 for (key, value) in edge.properties {
393 self.store.set_edge_property(edge.id, &key, value);
394 }
395 }
396
397 Ok(())
398 }
399
400 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
408 self.store.all_nodes()
409 }
410
411 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
415 self.store.all_edges()
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use grafeo_common::types::{EdgeId, NodeId, Value};
422
423 use super::super::GrafeoDB;
424 use super::{Snapshot, SnapshotEdge, SnapshotNode};
425
426 #[test]
427 fn test_restore_snapshot_basic() {
428 let db = GrafeoDB::new_in_memory();
429 let session = db.session();
430
431 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
433 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
434
435 let snapshot = db.export_snapshot().unwrap();
436
437 session
439 .execute("INSERT (:Person {name: 'Vincent'})")
440 .unwrap();
441 assert_eq!(db.store.node_count(), 3);
442
443 db.restore_snapshot(&snapshot).unwrap();
445
446 assert_eq!(db.store.node_count(), 2);
447 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
448 assert_eq!(result.rows.len(), 2);
449 }
450
451 #[test]
452 fn test_restore_snapshot_validation_failure() {
453 let db = GrafeoDB::new_in_memory();
454 let session = db.session();
455
456 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
457
458 let result = db.restore_snapshot(b"garbage");
460 assert!(result.is_err());
461
462 assert_eq!(db.store.node_count(), 1);
464 }
465
466 #[test]
467 fn test_restore_snapshot_empty_db() {
468 let db = GrafeoDB::new_in_memory();
469
470 let empty_snapshot = db.export_snapshot().unwrap();
472
473 let session = db.session();
474 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
475 assert_eq!(db.store.node_count(), 1);
476
477 db.restore_snapshot(&empty_snapshot).unwrap();
478 assert_eq!(db.store.node_count(), 0);
479 }
480
481 #[test]
482 fn test_restore_snapshot_with_edges() {
483 let db = GrafeoDB::new_in_memory();
484 let session = db.session();
485
486 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
487 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
488 session
489 .execute(
490 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
491 )
492 .unwrap();
493
494 let snapshot = db.export_snapshot().unwrap();
495 assert_eq!(db.store.edge_count(), 1);
496
497 session
499 .execute("INSERT (:Person {name: 'Vincent'})")
500 .unwrap();
501
502 db.restore_snapshot(&snapshot).unwrap();
504 assert_eq!(db.store.node_count(), 2);
505 assert_eq!(db.store.edge_count(), 1);
506 }
507
508 #[test]
509 fn test_restore_snapshot_preserves_sessions() {
510 let db = GrafeoDB::new_in_memory();
511 let session = db.session();
512
513 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
514 let snapshot = db.export_snapshot().unwrap();
515
516 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
518
519 db.restore_snapshot(&snapshot).unwrap();
521
522 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
524 assert_eq!(result.rows.len(), 1);
525 }
526
527 #[test]
528 fn test_export_import_roundtrip() {
529 let db = GrafeoDB::new_in_memory();
530 let session = db.session();
531
532 session
533 .execute("INSERT (:Person {name: 'Alix', age: 30})")
534 .unwrap();
535
536 let snapshot = db.export_snapshot().unwrap();
537 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
538 let session2 = db2.session();
539
540 let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
541 assert_eq!(result.rows.len(), 1);
542 }
543
544 #[test]
547 fn test_to_memory_empty() {
548 let db = GrafeoDB::new_in_memory();
549 let copy = db.to_memory().unwrap();
550 assert_eq!(copy.store.node_count(), 0);
551 assert_eq!(copy.store.edge_count(), 0);
552 }
553
554 #[test]
555 fn test_to_memory_copies_nodes_and_properties() {
556 let db = GrafeoDB::new_in_memory();
557 let session = db.session();
558 session
559 .execute("INSERT (:Person {name: 'Alix', age: 30})")
560 .unwrap();
561 session
562 .execute("INSERT (:Person {name: 'Gus', age: 25})")
563 .unwrap();
564
565 let copy = db.to_memory().unwrap();
566 assert_eq!(copy.store.node_count(), 2);
567
568 let s2 = copy.session();
569 let result = s2
570 .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
571 .unwrap();
572 assert_eq!(result.rows.len(), 2);
573 assert_eq!(result.rows[0][0], Value::String("Alix".into()));
574 assert_eq!(result.rows[1][0], Value::String("Gus".into()));
575 }
576
577 #[test]
578 fn test_to_memory_copies_edges_and_properties() {
579 let db = GrafeoDB::new_in_memory();
580 let a = db.create_node(&["Person"]);
581 db.set_node_property(a, "name", "Alix".into());
582 let b = db.create_node(&["Person"]);
583 db.set_node_property(b, "name", "Gus".into());
584 let edge = db.create_edge(a, b, "KNOWS");
585 db.set_edge_property(edge, "since", Value::Int64(2020));
586
587 let copy = db.to_memory().unwrap();
588 assert_eq!(copy.store.node_count(), 2);
589 assert_eq!(copy.store.edge_count(), 1);
590
591 let s2 = copy.session();
592 let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
593 assert_eq!(result.rows[0][0], Value::Int64(2020));
594 }
595
596 #[test]
597 fn test_to_memory_is_independent() {
598 let db = GrafeoDB::new_in_memory();
599 let session = db.session();
600 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
601
602 let copy = db.to_memory().unwrap();
603
604 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
606 assert_eq!(db.store.node_count(), 2);
607 assert_eq!(copy.store.node_count(), 1);
608 }
609
610 #[test]
613 fn test_iter_nodes_empty() {
614 let db = GrafeoDB::new_in_memory();
615 assert_eq!(db.iter_nodes().count(), 0);
616 }
617
618 #[test]
619 fn test_iter_nodes_returns_all() {
620 let db = GrafeoDB::new_in_memory();
621 let id1 = db.create_node(&["Person"]);
622 db.set_node_property(id1, "name", "Alix".into());
623 let id2 = db.create_node(&["Animal"]);
624 db.set_node_property(id2, "name", "Fido".into());
625
626 let nodes: Vec<_> = db.iter_nodes().collect();
627 assert_eq!(nodes.len(), 2);
628
629 let names: Vec<_> = nodes
630 .iter()
631 .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
632 .map(|(_, v)| v.clone())
633 .collect();
634 assert!(names.contains(&Value::String("Alix".into())));
635 assert!(names.contains(&Value::String("Fido".into())));
636 }
637
638 #[test]
639 fn test_iter_edges_empty() {
640 let db = GrafeoDB::new_in_memory();
641 assert_eq!(db.iter_edges().count(), 0);
642 }
643
644 #[test]
645 fn test_iter_edges_returns_all() {
646 let db = GrafeoDB::new_in_memory();
647 let a = db.create_node(&["A"]);
648 let b = db.create_node(&["B"]);
649 let c = db.create_node(&["C"]);
650 db.create_edge(a, b, "R1");
651 db.create_edge(b, c, "R2");
652
653 let edges: Vec<_> = db.iter_edges().collect();
654 assert_eq!(edges.len(), 2);
655
656 let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
657 assert!(types.contains(&"R1"));
658 assert!(types.contains(&"R2"));
659 }
660
661 fn encode_snapshot(snap: &Snapshot) -> Vec<u8> {
664 bincode::serde::encode_to_vec(snap, bincode::config::standard()).unwrap()
665 }
666
667 #[test]
668 fn test_restore_rejects_unsupported_version() {
669 let db = GrafeoDB::new_in_memory();
670 let session = db.session();
671 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
672
673 let snap = Snapshot {
674 version: 99,
675 nodes: vec![],
676 edges: vec![],
677 };
678 let bytes = encode_snapshot(&snap);
679
680 let result = db.restore_snapshot(&bytes);
681 assert!(result.is_err());
682 let err = result.unwrap_err().to_string();
683 assert!(err.contains("unsupported snapshot version"), "got: {err}");
684
685 assert_eq!(db.store.node_count(), 1);
687 }
688
689 #[test]
690 fn test_restore_rejects_duplicate_node_ids() {
691 let db = GrafeoDB::new_in_memory();
692 let session = db.session();
693 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
694
695 let snap = Snapshot {
696 version: 1,
697 nodes: vec![
698 SnapshotNode {
699 id: NodeId::new(0),
700 labels: vec!["A".into()],
701 properties: vec![],
702 },
703 SnapshotNode {
704 id: NodeId::new(0),
705 labels: vec!["B".into()],
706 properties: vec![],
707 },
708 ],
709 edges: vec![],
710 };
711 let bytes = encode_snapshot(&snap);
712
713 let result = db.restore_snapshot(&bytes);
714 assert!(result.is_err());
715 let err = result.unwrap_err().to_string();
716 assert!(err.contains("duplicate node ID"), "got: {err}");
717 assert_eq!(db.store.node_count(), 1);
718 }
719
720 #[test]
721 fn test_restore_rejects_duplicate_edge_ids() {
722 let db = GrafeoDB::new_in_memory();
723
724 let snap = Snapshot {
725 version: 1,
726 nodes: vec![
727 SnapshotNode {
728 id: NodeId::new(0),
729 labels: vec![],
730 properties: vec![],
731 },
732 SnapshotNode {
733 id: NodeId::new(1),
734 labels: vec![],
735 properties: vec![],
736 },
737 ],
738 edges: vec![
739 SnapshotEdge {
740 id: EdgeId::new(0),
741 src: NodeId::new(0),
742 dst: NodeId::new(1),
743 edge_type: "REL".into(),
744 properties: vec![],
745 },
746 SnapshotEdge {
747 id: EdgeId::new(0),
748 src: NodeId::new(0),
749 dst: NodeId::new(1),
750 edge_type: "REL".into(),
751 properties: vec![],
752 },
753 ],
754 };
755 let bytes = encode_snapshot(&snap);
756
757 let result = db.restore_snapshot(&bytes);
758 assert!(result.is_err());
759 let err = result.unwrap_err().to_string();
760 assert!(err.contains("duplicate edge ID"), "got: {err}");
761 }
762
763 #[test]
764 fn test_restore_rejects_dangling_source() {
765 let db = GrafeoDB::new_in_memory();
766
767 let snap = Snapshot {
768 version: 1,
769 nodes: vec![SnapshotNode {
770 id: NodeId::new(0),
771 labels: vec![],
772 properties: vec![],
773 }],
774 edges: vec![SnapshotEdge {
775 id: EdgeId::new(0),
776 src: NodeId::new(999),
777 dst: NodeId::new(0),
778 edge_type: "REL".into(),
779 properties: vec![],
780 }],
781 };
782 let bytes = encode_snapshot(&snap);
783
784 let result = db.restore_snapshot(&bytes);
785 assert!(result.is_err());
786 let err = result.unwrap_err().to_string();
787 assert!(err.contains("non-existent source node"), "got: {err}");
788 }
789
790 #[test]
791 fn test_restore_rejects_dangling_destination() {
792 let db = GrafeoDB::new_in_memory();
793
794 let snap = Snapshot {
795 version: 1,
796 nodes: vec![SnapshotNode {
797 id: NodeId::new(0),
798 labels: vec![],
799 properties: vec![],
800 }],
801 edges: vec![SnapshotEdge {
802 id: EdgeId::new(0),
803 src: NodeId::new(0),
804 dst: NodeId::new(999),
805 edge_type: "REL".into(),
806 properties: vec![],
807 }],
808 };
809 let bytes = encode_snapshot(&snap);
810
811 let result = db.restore_snapshot(&bytes);
812 assert!(result.is_err());
813 let err = result.unwrap_err().to_string();
814 assert!(err.contains("non-existent destination node"), "got: {err}");
815 }
816}