1use std::path::Path;
4
5use grafeo_common::types::{EdgeId, NodeId, Value};
6use grafeo_common::utils::error::{Error, Result};
7use hashbrown::HashSet;
8
9use crate::config::Config;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::WalRecord;
13
14#[derive(serde::Serialize, serde::Deserialize)]
16struct Snapshot {
17 version: u8,
18 nodes: Vec<SnapshotNode>,
19 edges: Vec<SnapshotEdge>,
20}
21
22#[derive(serde::Serialize, serde::Deserialize)]
23struct SnapshotNode {
24 id: NodeId,
25 labels: Vec<String>,
26 properties: Vec<(String, Value)>,
27}
28
29#[derive(serde::Serialize, serde::Deserialize)]
30struct SnapshotEdge {
31 id: EdgeId,
32 src: NodeId,
33 dst: NodeId,
34 edge_type: String,
35 properties: Vec<(String, Value)>,
36}
37
38impl super::GrafeoDB {
39 #[cfg(feature = "wal")]
56 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
57 let path = path.as_ref();
58
59 let target_config = Config::persistent(path);
61 let target = Self::with_config(target_config)?;
62
63 for node in self.store.all_nodes() {
65 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
66 target.store.create_node_with_id(node.id, &label_refs);
67
68 target.log_wal(&WalRecord::CreateNode {
70 id: node.id,
71 labels: node.labels.iter().map(|s| s.to_string()).collect(),
72 })?;
73
74 for (key, value) in node.properties {
76 target
77 .store
78 .set_node_property(node.id, key.as_str(), value.clone());
79 target.log_wal(&WalRecord::SetNodeProperty {
80 id: node.id,
81 key: key.to_string(),
82 value,
83 })?;
84 }
85 }
86
87 for edge in self.store.all_edges() {
89 target
90 .store
91 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
92
93 target.log_wal(&WalRecord::CreateEdge {
95 id: edge.id,
96 src: edge.src,
97 dst: edge.dst,
98 edge_type: edge.edge_type.to_string(),
99 })?;
100
101 for (key, value) in edge.properties {
103 target
104 .store
105 .set_edge_property(edge.id, key.as_str(), value.clone());
106 target.log_wal(&WalRecord::SetEdgeProperty {
107 id: edge.id,
108 key: key.to_string(),
109 value,
110 })?;
111 }
112 }
113
114 target.close()?;
116
117 Ok(())
118 }
119
120 pub fn to_memory(&self) -> Result<Self> {
131 let config = Config::in_memory();
132 let target = Self::with_config(config)?;
133
134 for node in self.store.all_nodes() {
136 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
137 target.store.create_node_with_id(node.id, &label_refs);
138
139 for (key, value) in node.properties {
141 target.store.set_node_property(node.id, key.as_str(), value);
142 }
143 }
144
145 for edge in self.store.all_edges() {
147 target
148 .store
149 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
150
151 for (key, value) in edge.properties {
153 target.store.set_edge_property(edge.id, key.as_str(), value);
154 }
155 }
156
157 Ok(target)
158 }
159
160 #[cfg(feature = "wal")]
169 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
170 let source = Self::open(path)?;
172
173 let target = source.to_memory()?;
175
176 source.close()?;
178
179 Ok(target)
180 }
181
182 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
195 let nodes: Vec<SnapshotNode> = self
196 .store
197 .all_nodes()
198 .map(|n| SnapshotNode {
199 id: n.id,
200 labels: n.labels.iter().map(|l| l.to_string()).collect(),
201 properties: n
202 .properties
203 .into_iter()
204 .map(|(k, v)| (k.to_string(), v))
205 .collect(),
206 })
207 .collect();
208
209 let edges: Vec<SnapshotEdge> = self
210 .store
211 .all_edges()
212 .map(|e| SnapshotEdge {
213 id: e.id,
214 src: e.src,
215 dst: e.dst,
216 edge_type: e.edge_type.to_string(),
217 properties: e
218 .properties
219 .into_iter()
220 .map(|(k, v)| (k.to_string(), v))
221 .collect(),
222 })
223 .collect();
224
225 let snapshot = Snapshot {
226 version: 1,
227 nodes,
228 edges,
229 };
230
231 let config = bincode::config::standard();
232 bincode::serde::encode_to_vec(&snapshot, config)
233 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
234 }
235
236 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
250 let config = bincode::config::standard();
251 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
252 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
253
254 if snapshot.version != 1 {
255 return Err(Error::Internal(format!(
256 "unsupported snapshot version: {}",
257 snapshot.version
258 )));
259 }
260
261 let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
263 for node in &snapshot.nodes {
264 if !node_ids.insert(node.id) {
265 return Err(Error::Internal(format!(
266 "snapshot contains duplicate node ID {}",
267 node.id
268 )));
269 }
270 }
271
272 let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
274 for edge in &snapshot.edges {
275 if !edge_ids.insert(edge.id) {
276 return Err(Error::Internal(format!(
277 "snapshot contains duplicate edge ID {}",
278 edge.id
279 )));
280 }
281 if !node_ids.contains(&edge.src) {
282 return Err(Error::Internal(format!(
283 "snapshot edge {} references non-existent source node {}",
284 edge.id, edge.src
285 )));
286 }
287 if !node_ids.contains(&edge.dst) {
288 return Err(Error::Internal(format!(
289 "snapshot edge {} references non-existent destination node {}",
290 edge.id, edge.dst
291 )));
292 }
293 }
294
295 let db = Self::new_in_memory();
297
298 for node in snapshot.nodes {
299 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
300 db.store.create_node_with_id(node.id, &label_refs);
301 for (key, value) in node.properties {
302 db.store.set_node_property(node.id, &key, value);
303 }
304 }
305
306 for edge in snapshot.edges {
307 db.store
308 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
309 for (key, value) in edge.properties {
310 db.store.set_edge_property(edge.id, &key, value);
311 }
312 }
313
314 Ok(db)
315 }
316
317 pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
332 let config = bincode::config::standard();
333 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
334 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
335
336 if snapshot.version != 1 {
337 return Err(Error::Internal(format!(
338 "unsupported snapshot version: {}",
339 snapshot.version
340 )));
341 }
342
343 let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
345 for node in &snapshot.nodes {
346 if !node_ids.insert(node.id) {
347 return Err(Error::Internal(format!(
348 "snapshot contains duplicate node ID {}",
349 node.id
350 )));
351 }
352 }
353
354 let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
356 for edge in &snapshot.edges {
357 if !edge_ids.insert(edge.id) {
358 return Err(Error::Internal(format!(
359 "snapshot contains duplicate edge ID {}",
360 edge.id
361 )));
362 }
363 if !node_ids.contains(&edge.src) {
364 return Err(Error::Internal(format!(
365 "snapshot edge {} references non-existent source node {}",
366 edge.id, edge.src
367 )));
368 }
369 if !node_ids.contains(&edge.dst) {
370 return Err(Error::Internal(format!(
371 "snapshot edge {} references non-existent destination node {}",
372 edge.id, edge.dst
373 )));
374 }
375 }
376
377 self.store.clear();
379
380 for node in snapshot.nodes {
381 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
382 self.store.create_node_with_id(node.id, &label_refs);
383 for (key, value) in node.properties {
384 self.store.set_node_property(node.id, &key, value);
385 }
386 }
387
388 for edge in snapshot.edges {
389 self.store
390 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
391 for (key, value) in edge.properties {
392 self.store.set_edge_property(edge.id, &key, value);
393 }
394 }
395
396 Ok(())
397 }
398
399 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
407 self.store.all_nodes()
408 }
409
410 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
414 self.store.all_edges()
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use grafeo_common::types::{EdgeId, NodeId, Value};
421
422 use super::super::GrafeoDB;
423 use super::{Snapshot, SnapshotEdge, SnapshotNode};
424
425 #[test]
426 fn test_restore_snapshot_basic() {
427 let db = GrafeoDB::new_in_memory();
428 let session = db.session();
429
430 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
432 session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
433
434 let snapshot = db.export_snapshot().unwrap();
435
436 session
438 .execute("INSERT (:Person {name: 'Charlie'})")
439 .unwrap();
440 assert_eq!(db.store.node_count(), 3);
441
442 db.restore_snapshot(&snapshot).unwrap();
444
445 assert_eq!(db.store.node_count(), 2);
446 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
447 assert_eq!(result.rows.len(), 2);
448 }
449
450 #[test]
451 fn test_restore_snapshot_validation_failure() {
452 let db = GrafeoDB::new_in_memory();
453 let session = db.session();
454
455 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
456
457 let result = db.restore_snapshot(b"garbage");
459 assert!(result.is_err());
460
461 assert_eq!(db.store.node_count(), 1);
463 }
464
465 #[test]
466 fn test_restore_snapshot_empty_db() {
467 let db = GrafeoDB::new_in_memory();
468
469 let empty_snapshot = db.export_snapshot().unwrap();
471
472 let session = db.session();
473 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
474 assert_eq!(db.store.node_count(), 1);
475
476 db.restore_snapshot(&empty_snapshot).unwrap();
477 assert_eq!(db.store.node_count(), 0);
478 }
479
480 #[test]
481 fn test_restore_snapshot_with_edges() {
482 let db = GrafeoDB::new_in_memory();
483 let session = db.session();
484
485 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
486 session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
487 session
488 .execute(
489 "MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) INSERT (a)-[:KNOWS]->(b)",
490 )
491 .unwrap();
492
493 let snapshot = db.export_snapshot().unwrap();
494 assert_eq!(db.store.edge_count(), 1);
495
496 session
498 .execute("INSERT (:Person {name: 'Charlie'})")
499 .unwrap();
500
501 db.restore_snapshot(&snapshot).unwrap();
503 assert_eq!(db.store.node_count(), 2);
504 assert_eq!(db.store.edge_count(), 1);
505 }
506
507 #[test]
508 fn test_restore_snapshot_preserves_sessions() {
509 let db = GrafeoDB::new_in_memory();
510 let session = db.session();
511
512 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
513 let snapshot = db.export_snapshot().unwrap();
514
515 session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
517
518 db.restore_snapshot(&snapshot).unwrap();
520
521 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
523 assert_eq!(result.rows.len(), 1);
524 }
525
526 #[test]
527 fn test_export_import_roundtrip() {
528 let db = GrafeoDB::new_in_memory();
529 let session = db.session();
530
531 session
532 .execute("INSERT (:Person {name: 'Alice', age: 30})")
533 .unwrap();
534
535 let snapshot = db.export_snapshot().unwrap();
536 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
537 let session2 = db2.session();
538
539 let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
540 assert_eq!(result.rows.len(), 1);
541 }
542
543 #[test]
546 fn test_to_memory_empty() {
547 let db = GrafeoDB::new_in_memory();
548 let copy = db.to_memory().unwrap();
549 assert_eq!(copy.store.node_count(), 0);
550 assert_eq!(copy.store.edge_count(), 0);
551 }
552
553 #[test]
554 fn test_to_memory_copies_nodes_and_properties() {
555 let db = GrafeoDB::new_in_memory();
556 let session = db.session();
557 session
558 .execute("INSERT (:Person {name: 'Alice', age: 30})")
559 .unwrap();
560 session
561 .execute("INSERT (:Person {name: 'Bob', age: 25})")
562 .unwrap();
563
564 let copy = db.to_memory().unwrap();
565 assert_eq!(copy.store.node_count(), 2);
566
567 let s2 = copy.session();
568 let result = s2
569 .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
570 .unwrap();
571 assert_eq!(result.rows.len(), 2);
572 assert_eq!(result.rows[0][0], Value::String("Alice".into()));
573 assert_eq!(result.rows[1][0], Value::String("Bob".into()));
574 }
575
576 #[test]
577 fn test_to_memory_copies_edges_and_properties() {
578 let db = GrafeoDB::new_in_memory();
579 let a = db.create_node(&["Person"]);
580 db.set_node_property(a, "name", "Alice".into());
581 let b = db.create_node(&["Person"]);
582 db.set_node_property(b, "name", "Bob".into());
583 let edge = db.create_edge(a, b, "KNOWS");
584 db.set_edge_property(edge, "since", Value::Int64(2020));
585
586 let copy = db.to_memory().unwrap();
587 assert_eq!(copy.store.node_count(), 2);
588 assert_eq!(copy.store.edge_count(), 1);
589
590 let s2 = copy.session();
591 let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
592 assert_eq!(result.rows[0][0], Value::Int64(2020));
593 }
594
595 #[test]
596 fn test_to_memory_is_independent() {
597 let db = GrafeoDB::new_in_memory();
598 let session = db.session();
599 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
600
601 let copy = db.to_memory().unwrap();
602
603 session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
605 assert_eq!(db.store.node_count(), 2);
606 assert_eq!(copy.store.node_count(), 1);
607 }
608
609 #[test]
612 fn test_iter_nodes_empty() {
613 let db = GrafeoDB::new_in_memory();
614 assert_eq!(db.iter_nodes().count(), 0);
615 }
616
617 #[test]
618 fn test_iter_nodes_returns_all() {
619 let db = GrafeoDB::new_in_memory();
620 let id1 = db.create_node(&["Person"]);
621 db.set_node_property(id1, "name", "Alice".into());
622 let id2 = db.create_node(&["Animal"]);
623 db.set_node_property(id2, "name", "Fido".into());
624
625 let nodes: Vec<_> = db.iter_nodes().collect();
626 assert_eq!(nodes.len(), 2);
627
628 let names: Vec<_> = nodes
629 .iter()
630 .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
631 .map(|(_, v)| v.clone())
632 .collect();
633 assert!(names.contains(&Value::String("Alice".into())));
634 assert!(names.contains(&Value::String("Fido".into())));
635 }
636
637 #[test]
638 fn test_iter_edges_empty() {
639 let db = GrafeoDB::new_in_memory();
640 assert_eq!(db.iter_edges().count(), 0);
641 }
642
643 #[test]
644 fn test_iter_edges_returns_all() {
645 let db = GrafeoDB::new_in_memory();
646 let a = db.create_node(&["A"]);
647 let b = db.create_node(&["B"]);
648 let c = db.create_node(&["C"]);
649 db.create_edge(a, b, "R1");
650 db.create_edge(b, c, "R2");
651
652 let edges: Vec<_> = db.iter_edges().collect();
653 assert_eq!(edges.len(), 2);
654
655 let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
656 assert!(types.contains(&"R1"));
657 assert!(types.contains(&"R2"));
658 }
659
660 fn encode_snapshot(snap: &Snapshot) -> Vec<u8> {
663 bincode::serde::encode_to_vec(snap, bincode::config::standard()).unwrap()
664 }
665
666 #[test]
667 fn test_restore_rejects_unsupported_version() {
668 let db = GrafeoDB::new_in_memory();
669 let session = db.session();
670 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
671
672 let snap = Snapshot {
673 version: 99,
674 nodes: vec![],
675 edges: vec![],
676 };
677 let bytes = encode_snapshot(&snap);
678
679 let result = db.restore_snapshot(&bytes);
680 assert!(result.is_err());
681 let err = result.unwrap_err().to_string();
682 assert!(err.contains("unsupported snapshot version"), "got: {err}");
683
684 assert_eq!(db.store.node_count(), 1);
686 }
687
688 #[test]
689 fn test_restore_rejects_duplicate_node_ids() {
690 let db = GrafeoDB::new_in_memory();
691 let session = db.session();
692 session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
693
694 let snap = Snapshot {
695 version: 1,
696 nodes: vec![
697 SnapshotNode {
698 id: NodeId::new(0),
699 labels: vec!["A".into()],
700 properties: vec![],
701 },
702 SnapshotNode {
703 id: NodeId::new(0),
704 labels: vec!["B".into()],
705 properties: vec![],
706 },
707 ],
708 edges: vec![],
709 };
710 let bytes = encode_snapshot(&snap);
711
712 let result = db.restore_snapshot(&bytes);
713 assert!(result.is_err());
714 let err = result.unwrap_err().to_string();
715 assert!(err.contains("duplicate node ID"), "got: {err}");
716 assert_eq!(db.store.node_count(), 1);
717 }
718
719 #[test]
720 fn test_restore_rejects_duplicate_edge_ids() {
721 let db = GrafeoDB::new_in_memory();
722
723 let snap = Snapshot {
724 version: 1,
725 nodes: vec![
726 SnapshotNode {
727 id: NodeId::new(0),
728 labels: vec![],
729 properties: vec![],
730 },
731 SnapshotNode {
732 id: NodeId::new(1),
733 labels: vec![],
734 properties: vec![],
735 },
736 ],
737 edges: vec![
738 SnapshotEdge {
739 id: EdgeId::new(0),
740 src: NodeId::new(0),
741 dst: NodeId::new(1),
742 edge_type: "REL".into(),
743 properties: vec![],
744 },
745 SnapshotEdge {
746 id: EdgeId::new(0),
747 src: NodeId::new(0),
748 dst: NodeId::new(1),
749 edge_type: "REL".into(),
750 properties: vec![],
751 },
752 ],
753 };
754 let bytes = encode_snapshot(&snap);
755
756 let result = db.restore_snapshot(&bytes);
757 assert!(result.is_err());
758 let err = result.unwrap_err().to_string();
759 assert!(err.contains("duplicate edge ID"), "got: {err}");
760 }
761
762 #[test]
763 fn test_restore_rejects_dangling_source() {
764 let db = GrafeoDB::new_in_memory();
765
766 let snap = Snapshot {
767 version: 1,
768 nodes: vec![SnapshotNode {
769 id: NodeId::new(0),
770 labels: vec![],
771 properties: vec![],
772 }],
773 edges: vec![SnapshotEdge {
774 id: EdgeId::new(0),
775 src: NodeId::new(999),
776 dst: NodeId::new(0),
777 edge_type: "REL".into(),
778 properties: vec![],
779 }],
780 };
781 let bytes = encode_snapshot(&snap);
782
783 let result = db.restore_snapshot(&bytes);
784 assert!(result.is_err());
785 let err = result.unwrap_err().to_string();
786 assert!(err.contains("non-existent source node"), "got: {err}");
787 }
788
789 #[test]
790 fn test_restore_rejects_dangling_destination() {
791 let db = GrafeoDB::new_in_memory();
792
793 let snap = Snapshot {
794 version: 1,
795 nodes: vec![SnapshotNode {
796 id: NodeId::new(0),
797 labels: vec![],
798 properties: vec![],
799 }],
800 edges: vec![SnapshotEdge {
801 id: EdgeId::new(0),
802 src: NodeId::new(0),
803 dst: NodeId::new(999),
804 edge_type: "REL".into(),
805 properties: vec![],
806 }],
807 };
808 let bytes = encode_snapshot(&snap);
809
810 let result = db.restore_snapshot(&bytes);
811 assert!(result.is_err());
812 let err = result.unwrap_err().to_string();
813 assert!(err.contains("non-existent destination node"), "got: {err}");
814 }
815}