1#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15#[derive(serde::Serialize, serde::Deserialize)]
17struct Snapshot {
18 version: u8,
19 nodes: Vec<SnapshotNode>,
20 edges: Vec<SnapshotEdge>,
21}
22
23#[derive(serde::Serialize, serde::Deserialize)]
25struct SnapshotV2 {
26 version: u8,
27 nodes: Vec<SnapshotNode>,
28 edges: Vec<SnapshotEdge>,
29 named_graphs: Vec<NamedGraphSnapshot>,
30 #[serde(default)]
32 rdf_triples: Vec<SnapshotTriple>,
33 #[serde(default)]
35 rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
36}
37
38#[derive(serde::Serialize, serde::Deserialize)]
40struct NamedGraphSnapshot {
41 name: String,
42 nodes: Vec<SnapshotNode>,
43 edges: Vec<SnapshotEdge>,
44}
45
46#[derive(serde::Serialize, serde::Deserialize)]
48struct SnapshotTriple {
49 subject: String,
50 predicate: String,
51 object: String,
52}
53
54#[derive(serde::Serialize, serde::Deserialize)]
56struct RdfNamedGraphSnapshot {
57 name: String,
58 triples: Vec<SnapshotTriple>,
59}
60
61#[derive(serde::Serialize, serde::Deserialize)]
62struct SnapshotNode {
63 id: NodeId,
64 labels: Vec<String>,
65 properties: Vec<(String, Value)>,
66}
67
68#[derive(serde::Serialize, serde::Deserialize)]
69struct SnapshotEdge {
70 id: EdgeId,
71 src: NodeId,
72 dst: NodeId,
73 edge_type: String,
74 properties: Vec<(String, Value)>,
75}
76
77fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
79 store
80 .all_nodes()
81 .map(|n| SnapshotNode {
82 id: n.id,
83 labels: n.labels.iter().map(|l| l.to_string()).collect(),
84 properties: n
85 .properties
86 .into_iter()
87 .map(|(k, v)| (k.to_string(), v))
88 .collect(),
89 })
90 .collect()
91}
92
93fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
95 store
96 .all_edges()
97 .map(|e| SnapshotEdge {
98 id: e.id,
99 src: e.src,
100 dst: e.dst,
101 edge_type: e.edge_type.to_string(),
102 properties: e
103 .properties
104 .into_iter()
105 .map(|(k, v)| (k.to_string(), v))
106 .collect(),
107 })
108 .collect()
109}
110
111fn populate_store_from_snapshot(
113 store: &grafeo_core::graph::lpg::LpgStore,
114 nodes: Vec<SnapshotNode>,
115 edges: Vec<SnapshotEdge>,
116) -> Result<()> {
117 for node in nodes {
118 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
119 store.create_node_with_id(node.id, &label_refs)?;
120 for (key, value) in node.properties {
121 store.set_node_property(node.id, &key, value);
122 }
123 }
124 for edge in edges {
125 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
126 for (key, value) in edge.properties {
127 store.set_edge_property(edge.id, &key, value);
128 }
129 }
130 Ok(())
131}
132
133fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
135 let mut node_ids = HashSet::with_capacity(nodes.len());
136 for node in nodes {
137 if !node_ids.insert(node.id) {
138 return Err(Error::Internal(format!(
139 "snapshot contains duplicate node ID {}",
140 node.id
141 )));
142 }
143 }
144 let mut edge_ids = HashSet::with_capacity(edges.len());
145 for edge in edges {
146 if !edge_ids.insert(edge.id) {
147 return Err(Error::Internal(format!(
148 "snapshot contains duplicate edge ID {}",
149 edge.id
150 )));
151 }
152 if !node_ids.contains(&edge.src) {
153 return Err(Error::Internal(format!(
154 "snapshot edge {} references non-existent source node {}",
155 edge.id, edge.src
156 )));
157 }
158 if !node_ids.contains(&edge.dst) {
159 return Err(Error::Internal(format!(
160 "snapshot edge {} references non-existent destination node {}",
161 edge.id, edge.dst
162 )));
163 }
164 }
165 Ok(())
166}
167
168#[cfg(feature = "rdf")]
170fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
171 store
172 .triples()
173 .into_iter()
174 .map(|t| SnapshotTriple {
175 subject: t.subject().to_string(),
176 predicate: t.predicate().to_string(),
177 object: t.object().to_string(),
178 })
179 .collect()
180}
181
182#[cfg(feature = "rdf")]
184fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
185 use grafeo_core::graph::rdf::{Term, Triple};
186 for triple in triples {
187 if let (Some(s), Some(p), Some(o)) = (
188 Term::from_ntriples(&triple.subject),
189 Term::from_ntriples(&triple.predicate),
190 Term::from_ntriples(&triple.object),
191 ) {
192 store.insert(Triple::new(s, p, o));
193 }
194 }
195}
196
197impl super::GrafeoDB {
198 #[cfg(feature = "wal")]
215 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
216 let path = path.as_ref();
217
218 let target_config = Config::persistent(path);
220 let target = Self::with_config(target_config)?;
221
222 for node in self.store.all_nodes() {
224 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
225 target.store.create_node_with_id(node.id, &label_refs)?;
226
227 target.log_wal(&WalRecord::CreateNode {
229 id: node.id,
230 labels: node.labels.iter().map(|s| s.to_string()).collect(),
231 })?;
232
233 for (key, value) in node.properties {
235 target
236 .store
237 .set_node_property(node.id, key.as_str(), value.clone());
238 target.log_wal(&WalRecord::SetNodeProperty {
239 id: node.id,
240 key: key.to_string(),
241 value,
242 })?;
243 }
244 }
245
246 for edge in self.store.all_edges() {
248 target
249 .store
250 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
251
252 target.log_wal(&WalRecord::CreateEdge {
254 id: edge.id,
255 src: edge.src,
256 dst: edge.dst,
257 edge_type: edge.edge_type.to_string(),
258 })?;
259
260 for (key, value) in edge.properties {
262 target
263 .store
264 .set_edge_property(edge.id, key.as_str(), value.clone());
265 target.log_wal(&WalRecord::SetEdgeProperty {
266 id: edge.id,
267 key: key.to_string(),
268 value,
269 })?;
270 }
271 }
272
273 for graph_name in self.store.graph_names() {
275 if let Some(src_graph) = self.store.graph(&graph_name) {
276 target.log_wal(&WalRecord::CreateNamedGraph {
277 name: graph_name.clone(),
278 })?;
279 target
280 .store
281 .create_graph(&graph_name)
282 .map_err(|e| Error::Internal(e.to_string()))?;
283
284 if let Some(dst_graph) = target.store.graph(&graph_name) {
285 target.log_wal(&WalRecord::SwitchGraph {
287 name: Some(graph_name.clone()),
288 })?;
289
290 for node in src_graph.all_nodes() {
291 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
292 dst_graph.create_node_with_id(node.id, &label_refs)?;
293 target.log_wal(&WalRecord::CreateNode {
294 id: node.id,
295 labels: node.labels.iter().map(|s| s.to_string()).collect(),
296 })?;
297 for (key, value) in node.properties {
298 dst_graph.set_node_property(node.id, key.as_str(), value.clone());
299 target.log_wal(&WalRecord::SetNodeProperty {
300 id: node.id,
301 key: key.to_string(),
302 value,
303 })?;
304 }
305 }
306 for edge in src_graph.all_edges() {
307 dst_graph.create_edge_with_id(
308 edge.id,
309 edge.src,
310 edge.dst,
311 &edge.edge_type,
312 )?;
313 target.log_wal(&WalRecord::CreateEdge {
314 id: edge.id,
315 src: edge.src,
316 dst: edge.dst,
317 edge_type: edge.edge_type.to_string(),
318 })?;
319 for (key, value) in edge.properties {
320 dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
321 target.log_wal(&WalRecord::SetEdgeProperty {
322 id: edge.id,
323 key: key.to_string(),
324 value,
325 })?;
326 }
327 }
328 }
329 }
330 }
331
332 if !self.store.graph_names().is_empty() {
334 target.log_wal(&WalRecord::SwitchGraph { name: None })?;
335 }
336
337 #[cfg(feature = "rdf")]
339 {
340 for triple in self.rdf_store.triples() {
341 let record = WalRecord::InsertRdfTriple {
342 subject: triple.subject().to_string(),
343 predicate: triple.predicate().to_string(),
344 object: triple.object().to_string(),
345 graph: None,
346 };
347 target.rdf_store.insert((*triple).clone());
348 target.log_wal(&record)?;
349 }
350 for name in self.rdf_store.graph_names() {
351 target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
352 if let Some(src_graph) = self.rdf_store.graph(&name) {
353 let dst_graph = target.rdf_store.graph_or_create(&name);
354 for triple in src_graph.triples() {
355 let record = WalRecord::InsertRdfTriple {
356 subject: triple.subject().to_string(),
357 predicate: triple.predicate().to_string(),
358 object: triple.object().to_string(),
359 graph: Some(name.clone()),
360 };
361 dst_graph.insert((*triple).clone());
362 target.log_wal(&record)?;
363 }
364 }
365 }
366 }
367
368 target.close()?;
370
371 Ok(())
372 }
373
374 pub fn to_memory(&self) -> Result<Self> {
386 let config = Config::in_memory();
387 let target = Self::with_config(config)?;
388
389 for node in self.store.all_nodes() {
391 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
392 target.store.create_node_with_id(node.id, &label_refs)?;
393 for (key, value) in node.properties {
394 target.store.set_node_property(node.id, key.as_str(), value);
395 }
396 }
397
398 for edge in self.store.all_edges() {
400 target
401 .store
402 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
403 for (key, value) in edge.properties {
404 target.store.set_edge_property(edge.id, key.as_str(), value);
405 }
406 }
407
408 for graph_name in self.store.graph_names() {
410 if let Some(src_graph) = self.store.graph(&graph_name) {
411 target
412 .store
413 .create_graph(&graph_name)
414 .map_err(|e| Error::Internal(e.to_string()))?;
415 if let Some(dst_graph) = target.store.graph(&graph_name) {
416 for node in src_graph.all_nodes() {
417 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
418 dst_graph.create_node_with_id(node.id, &label_refs)?;
419 for (key, value) in node.properties {
420 dst_graph.set_node_property(node.id, key.as_str(), value);
421 }
422 }
423 for edge in src_graph.all_edges() {
424 dst_graph.create_edge_with_id(
425 edge.id,
426 edge.src,
427 edge.dst,
428 &edge.edge_type,
429 )?;
430 for (key, value) in edge.properties {
431 dst_graph.set_edge_property(edge.id, key.as_str(), value);
432 }
433 }
434 }
435 }
436 }
437
438 #[cfg(feature = "rdf")]
440 {
441 for triple in self.rdf_store.triples() {
442 target.rdf_store.insert((*triple).clone());
443 }
444 for name in self.rdf_store.graph_names() {
445 if let Some(src_graph) = self.rdf_store.graph(&name) {
446 let dst_graph = target.rdf_store.graph_or_create(&name);
447 for triple in src_graph.triples() {
448 dst_graph.insert((*triple).clone());
449 }
450 }
451 }
452 }
453
454 Ok(target)
455 }
456
457 #[cfg(feature = "wal")]
466 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
467 let source = Self::open(path)?;
469
470 let target = source.to_memory()?;
472
473 source.close()?;
475
476 Ok(target)
477 }
478
479 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
493 let nodes = collect_snapshot_nodes(&self.store);
494 let edges = collect_snapshot_edges(&self.store);
495
496 let named_graphs: Vec<NamedGraphSnapshot> = self
498 .store
499 .graph_names()
500 .into_iter()
501 .filter_map(|name| {
502 self.store
503 .graph(&name)
504 .map(|graph_store| NamedGraphSnapshot {
505 name,
506 nodes: collect_snapshot_nodes(&graph_store),
507 edges: collect_snapshot_edges(&graph_store),
508 })
509 })
510 .collect();
511
512 #[cfg(feature = "rdf")]
514 let rdf_triples = collect_rdf_triples(&self.rdf_store);
515 #[cfg(not(feature = "rdf"))]
516 let rdf_triples = Vec::new();
517
518 #[cfg(feature = "rdf")]
519 let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
520 .rdf_store
521 .graph_names()
522 .into_iter()
523 .filter_map(|name| {
524 self.rdf_store
525 .graph(&name)
526 .map(|graph| RdfNamedGraphSnapshot {
527 name,
528 triples: collect_rdf_triples(&graph),
529 })
530 })
531 .collect();
532 #[cfg(not(feature = "rdf"))]
533 let rdf_named_graphs = Vec::new();
534
535 let snapshot = SnapshotV2 {
536 version: 2,
537 nodes,
538 edges,
539 named_graphs,
540 rdf_triples,
541 rdf_named_graphs,
542 };
543
544 let config = bincode::config::standard();
545 bincode::serde::encode_to_vec(&snapshot, config)
546 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
547 }
548
549 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
564 if data.is_empty() {
565 return Err(Error::Internal("empty snapshot data".to_string()));
566 }
567
568 let config = bincode::config::standard();
569
570 match data[0] {
572 1 => Self::import_snapshot_v1(data, config),
573 2 => Self::import_snapshot_v2(data, config),
574 v => Err(Error::Internal(format!(
575 "unsupported snapshot version: {v}"
576 ))),
577 }
578 }
579
580 fn import_snapshot_v1(data: &[u8], config: bincode::config::Configuration) -> Result<Self> {
581 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
582 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
583
584 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
585
586 let db = Self::new_in_memory();
587 populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
588 Ok(db)
589 }
590
591 fn import_snapshot_v2(data: &[u8], config: bincode::config::Configuration) -> Result<Self> {
592 let (snapshot, _): (SnapshotV2, _) = bincode::serde::decode_from_slice(data, config)
593 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
594
595 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
597
598 for ng in &snapshot.named_graphs {
600 validate_snapshot_data(&ng.nodes, &ng.edges)?;
601 }
602
603 let db = Self::new_in_memory();
604 populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
605
606 for ng in snapshot.named_graphs {
608 db.store
609 .create_graph(&ng.name)
610 .map_err(|e| Error::Internal(e.to_string()))?;
611 if let Some(graph_store) = db.store.graph(&ng.name) {
612 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
613 }
614 }
615
616 #[cfg(feature = "rdf")]
618 {
619 populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
620 for rng in &snapshot.rdf_named_graphs {
621 let graph = db.rdf_store.graph_or_create(&rng.name);
622 populate_rdf_store(&graph, &rng.triples);
623 }
624 }
625
626 Ok(db)
627 }
628
629 pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
645 if data.is_empty() {
646 return Err(Error::Internal("empty snapshot data".to_string()));
647 }
648
649 let config = bincode::config::standard();
650
651 match data[0] {
652 1 => self.restore_snapshot_v1(data, config),
653 2 => self.restore_snapshot_v2(data, config),
654 v => Err(Error::Internal(format!(
655 "unsupported snapshot version: {v}"
656 ))),
657 }
658 }
659
660 fn restore_snapshot_v1(
661 &self,
662 data: &[u8],
663 config: bincode::config::Configuration,
664 ) -> Result<()> {
665 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
666 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
667
668 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
669
670 for name in self.store.graph_names() {
672 self.store.drop_graph(&name);
673 }
674 self.store.clear();
675 populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)
676 }
677
678 fn restore_snapshot_v2(
679 &self,
680 data: &[u8],
681 config: bincode::config::Configuration,
682 ) -> Result<()> {
683 let (snapshot, _): (SnapshotV2, _) = bincode::serde::decode_from_slice(data, config)
684 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
685
686 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
688 for ng in &snapshot.named_graphs {
689 validate_snapshot_data(&ng.nodes, &ng.edges)?;
690 }
691
692 for name in self.store.graph_names() {
694 self.store.drop_graph(&name);
695 }
696 self.store.clear();
697
698 populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
699
700 for ng in snapshot.named_graphs {
702 self.store
703 .create_graph(&ng.name)
704 .map_err(|e| Error::Internal(e.to_string()))?;
705 if let Some(graph_store) = self.store.graph(&ng.name) {
706 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
707 }
708 }
709
710 #[cfg(feature = "rdf")]
712 {
713 self.rdf_store.clear();
715 for name in self.rdf_store.graph_names() {
716 self.rdf_store.drop_graph(&name);
717 }
718 populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
719 for rng in &snapshot.rdf_named_graphs {
720 let graph = self.rdf_store.graph_or_create(&rng.name);
721 populate_rdf_store(&graph, &rng.triples);
722 }
723 }
724
725 Ok(())
726 }
727
728 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
736 self.store.all_nodes()
737 }
738
739 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
743 self.store.all_edges()
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use grafeo_common::types::{EdgeId, NodeId, Value};
750
751 use super::super::GrafeoDB;
752 use super::{Snapshot, SnapshotEdge, SnapshotNode};
753
754 #[test]
755 fn test_restore_snapshot_basic() {
756 let db = GrafeoDB::new_in_memory();
757 let session = db.session();
758
759 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
761 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
762
763 let snapshot = db.export_snapshot().unwrap();
764
765 session
767 .execute("INSERT (:Person {name: 'Vincent'})")
768 .unwrap();
769 assert_eq!(db.store.node_count(), 3);
770
771 db.restore_snapshot(&snapshot).unwrap();
773
774 assert_eq!(db.store.node_count(), 2);
775 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
776 assert_eq!(result.rows.len(), 2);
777 }
778
779 #[test]
780 fn test_restore_snapshot_validation_failure() {
781 let db = GrafeoDB::new_in_memory();
782 let session = db.session();
783
784 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
785
786 let result = db.restore_snapshot(b"garbage");
788 assert!(result.is_err());
789
790 assert_eq!(db.store.node_count(), 1);
792 }
793
794 #[test]
795 fn test_restore_snapshot_empty_db() {
796 let db = GrafeoDB::new_in_memory();
797
798 let empty_snapshot = db.export_snapshot().unwrap();
800
801 let session = db.session();
802 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
803 assert_eq!(db.store.node_count(), 1);
804
805 db.restore_snapshot(&empty_snapshot).unwrap();
806 assert_eq!(db.store.node_count(), 0);
807 }
808
809 #[test]
810 fn test_restore_snapshot_with_edges() {
811 let db = GrafeoDB::new_in_memory();
812 let session = db.session();
813
814 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
815 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
816 session
817 .execute(
818 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
819 )
820 .unwrap();
821
822 let snapshot = db.export_snapshot().unwrap();
823 assert_eq!(db.store.edge_count(), 1);
824
825 session
827 .execute("INSERT (:Person {name: 'Vincent'})")
828 .unwrap();
829
830 db.restore_snapshot(&snapshot).unwrap();
832 assert_eq!(db.store.node_count(), 2);
833 assert_eq!(db.store.edge_count(), 1);
834 }
835
836 #[test]
837 fn test_restore_snapshot_preserves_sessions() {
838 let db = GrafeoDB::new_in_memory();
839 let session = db.session();
840
841 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
842 let snapshot = db.export_snapshot().unwrap();
843
844 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
846
847 db.restore_snapshot(&snapshot).unwrap();
849
850 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
852 assert_eq!(result.rows.len(), 1);
853 }
854
855 #[test]
856 fn test_export_import_roundtrip() {
857 let db = GrafeoDB::new_in_memory();
858 let session = db.session();
859
860 session
861 .execute("INSERT (:Person {name: 'Alix', age: 30})")
862 .unwrap();
863
864 let snapshot = db.export_snapshot().unwrap();
865 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
866 let session2 = db2.session();
867
868 let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
869 assert_eq!(result.rows.len(), 1);
870 }
871
872 #[test]
875 fn test_to_memory_empty() {
876 let db = GrafeoDB::new_in_memory();
877 let copy = db.to_memory().unwrap();
878 assert_eq!(copy.store.node_count(), 0);
879 assert_eq!(copy.store.edge_count(), 0);
880 }
881
882 #[test]
883 fn test_to_memory_copies_nodes_and_properties() {
884 let db = GrafeoDB::new_in_memory();
885 let session = db.session();
886 session
887 .execute("INSERT (:Person {name: 'Alix', age: 30})")
888 .unwrap();
889 session
890 .execute("INSERT (:Person {name: 'Gus', age: 25})")
891 .unwrap();
892
893 let copy = db.to_memory().unwrap();
894 assert_eq!(copy.store.node_count(), 2);
895
896 let s2 = copy.session();
897 let result = s2
898 .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
899 .unwrap();
900 assert_eq!(result.rows.len(), 2);
901 assert_eq!(result.rows[0][0], Value::String("Alix".into()));
902 assert_eq!(result.rows[1][0], Value::String("Gus".into()));
903 }
904
905 #[test]
906 fn test_to_memory_copies_edges_and_properties() {
907 let db = GrafeoDB::new_in_memory();
908 let a = db.create_node(&["Person"]);
909 db.set_node_property(a, "name", "Alix".into());
910 let b = db.create_node(&["Person"]);
911 db.set_node_property(b, "name", "Gus".into());
912 let edge = db.create_edge(a, b, "KNOWS");
913 db.set_edge_property(edge, "since", Value::Int64(2020));
914
915 let copy = db.to_memory().unwrap();
916 assert_eq!(copy.store.node_count(), 2);
917 assert_eq!(copy.store.edge_count(), 1);
918
919 let s2 = copy.session();
920 let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
921 assert_eq!(result.rows[0][0], Value::Int64(2020));
922 }
923
924 #[test]
925 fn test_to_memory_is_independent() {
926 let db = GrafeoDB::new_in_memory();
927 let session = db.session();
928 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
929
930 let copy = db.to_memory().unwrap();
931
932 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
934 assert_eq!(db.store.node_count(), 2);
935 assert_eq!(copy.store.node_count(), 1);
936 }
937
938 #[test]
941 fn test_iter_nodes_empty() {
942 let db = GrafeoDB::new_in_memory();
943 assert_eq!(db.iter_nodes().count(), 0);
944 }
945
946 #[test]
947 fn test_iter_nodes_returns_all() {
948 let db = GrafeoDB::new_in_memory();
949 let id1 = db.create_node(&["Person"]);
950 db.set_node_property(id1, "name", "Alix".into());
951 let id2 = db.create_node(&["Animal"]);
952 db.set_node_property(id2, "name", "Fido".into());
953
954 let nodes: Vec<_> = db.iter_nodes().collect();
955 assert_eq!(nodes.len(), 2);
956
957 let names: Vec<_> = nodes
958 .iter()
959 .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
960 .map(|(_, v)| v.clone())
961 .collect();
962 assert!(names.contains(&Value::String("Alix".into())));
963 assert!(names.contains(&Value::String("Fido".into())));
964 }
965
966 #[test]
967 fn test_iter_edges_empty() {
968 let db = GrafeoDB::new_in_memory();
969 assert_eq!(db.iter_edges().count(), 0);
970 }
971
972 #[test]
973 fn test_iter_edges_returns_all() {
974 let db = GrafeoDB::new_in_memory();
975 let a = db.create_node(&["A"]);
976 let b = db.create_node(&["B"]);
977 let c = db.create_node(&["C"]);
978 db.create_edge(a, b, "R1");
979 db.create_edge(b, c, "R2");
980
981 let edges: Vec<_> = db.iter_edges().collect();
982 assert_eq!(edges.len(), 2);
983
984 let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
985 assert!(types.contains(&"R1"));
986 assert!(types.contains(&"R2"));
987 }
988
989 fn encode_snapshot(snap: &Snapshot) -> Vec<u8> {
992 bincode::serde::encode_to_vec(snap, bincode::config::standard()).unwrap()
993 }
994
995 #[test]
996 fn test_restore_rejects_unsupported_version() {
997 let db = GrafeoDB::new_in_memory();
998 let session = db.session();
999 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1000
1001 let snap = Snapshot {
1002 version: 99,
1003 nodes: vec![],
1004 edges: vec![],
1005 };
1006 let bytes = encode_snapshot(&snap);
1007
1008 let result = db.restore_snapshot(&bytes);
1009 assert!(result.is_err());
1010 let err = result.unwrap_err().to_string();
1011 assert!(err.contains("unsupported snapshot version"), "got: {err}");
1012
1013 assert_eq!(db.store.node_count(), 1);
1015 }
1016
1017 #[test]
1018 fn test_restore_rejects_duplicate_node_ids() {
1019 let db = GrafeoDB::new_in_memory();
1020 let session = db.session();
1021 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1022
1023 let snap = Snapshot {
1024 version: 1,
1025 nodes: vec![
1026 SnapshotNode {
1027 id: NodeId::new(0),
1028 labels: vec!["A".into()],
1029 properties: vec![],
1030 },
1031 SnapshotNode {
1032 id: NodeId::new(0),
1033 labels: vec!["B".into()],
1034 properties: vec![],
1035 },
1036 ],
1037 edges: vec![],
1038 };
1039 let bytes = encode_snapshot(&snap);
1040
1041 let result = db.restore_snapshot(&bytes);
1042 assert!(result.is_err());
1043 let err = result.unwrap_err().to_string();
1044 assert!(err.contains("duplicate node ID"), "got: {err}");
1045 assert_eq!(db.store.node_count(), 1);
1046 }
1047
1048 #[test]
1049 fn test_restore_rejects_duplicate_edge_ids() {
1050 let db = GrafeoDB::new_in_memory();
1051
1052 let snap = Snapshot {
1053 version: 1,
1054 nodes: vec![
1055 SnapshotNode {
1056 id: NodeId::new(0),
1057 labels: vec![],
1058 properties: vec![],
1059 },
1060 SnapshotNode {
1061 id: NodeId::new(1),
1062 labels: vec![],
1063 properties: vec![],
1064 },
1065 ],
1066 edges: vec![
1067 SnapshotEdge {
1068 id: EdgeId::new(0),
1069 src: NodeId::new(0),
1070 dst: NodeId::new(1),
1071 edge_type: "REL".into(),
1072 properties: vec![],
1073 },
1074 SnapshotEdge {
1075 id: EdgeId::new(0),
1076 src: NodeId::new(0),
1077 dst: NodeId::new(1),
1078 edge_type: "REL".into(),
1079 properties: vec![],
1080 },
1081 ],
1082 };
1083 let bytes = encode_snapshot(&snap);
1084
1085 let result = db.restore_snapshot(&bytes);
1086 assert!(result.is_err());
1087 let err = result.unwrap_err().to_string();
1088 assert!(err.contains("duplicate edge ID"), "got: {err}");
1089 }
1090
1091 #[test]
1092 fn test_restore_rejects_dangling_source() {
1093 let db = GrafeoDB::new_in_memory();
1094
1095 let snap = Snapshot {
1096 version: 1,
1097 nodes: vec![SnapshotNode {
1098 id: NodeId::new(0),
1099 labels: vec![],
1100 properties: vec![],
1101 }],
1102 edges: vec![SnapshotEdge {
1103 id: EdgeId::new(0),
1104 src: NodeId::new(999),
1105 dst: NodeId::new(0),
1106 edge_type: "REL".into(),
1107 properties: vec![],
1108 }],
1109 };
1110 let bytes = encode_snapshot(&snap);
1111
1112 let result = db.restore_snapshot(&bytes);
1113 assert!(result.is_err());
1114 let err = result.unwrap_err().to_string();
1115 assert!(err.contains("non-existent source node"), "got: {err}");
1116 }
1117
1118 #[test]
1119 fn test_restore_rejects_dangling_destination() {
1120 let db = GrafeoDB::new_in_memory();
1121
1122 let snap = Snapshot {
1123 version: 1,
1124 nodes: vec![SnapshotNode {
1125 id: NodeId::new(0),
1126 labels: vec![],
1127 properties: vec![],
1128 }],
1129 edges: vec![SnapshotEdge {
1130 id: EdgeId::new(0),
1131 src: NodeId::new(0),
1132 dst: NodeId::new(999),
1133 edge_type: "REL".into(),
1134 properties: vec![],
1135 }],
1136 };
1137 let bytes = encode_snapshot(&snap);
1138
1139 let result = db.restore_snapshot(&bytes);
1140 assert!(result.is_err());
1141 let err = result.unwrap_err().to_string();
1142 assert!(err.contains("non-existent destination node"), "got: {err}");
1143 }
1144}