1#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15use crate::catalog::{
16 EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
17};
18
19const SNAPSHOT_VERSION: u8 = 3;
21
22#[derive(serde::Serialize, serde::Deserialize)]
24struct Snapshot {
25 version: u8,
26 nodes: Vec<SnapshotNode>,
27 edges: Vec<SnapshotEdge>,
28 named_graphs: Vec<NamedGraphSnapshot>,
29 rdf_triples: Vec<SnapshotTriple>,
30 rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
31 schema: SnapshotSchema,
32}
33
34#[derive(serde::Serialize, serde::Deserialize, Default)]
36struct SnapshotSchema {
37 node_types: Vec<NodeTypeDefinition>,
38 edge_types: Vec<EdgeTypeDefinition>,
39 graph_types: Vec<GraphTypeDefinition>,
40 procedures: Vec<ProcedureDefinition>,
41 schemas: Vec<String>,
42 graph_type_bindings: Vec<(String, String)>,
43}
44
45#[derive(serde::Serialize, serde::Deserialize)]
47struct NamedGraphSnapshot {
48 name: String,
49 nodes: Vec<SnapshotNode>,
50 edges: Vec<SnapshotEdge>,
51}
52
53#[derive(serde::Serialize, serde::Deserialize)]
55struct SnapshotTriple {
56 subject: String,
57 predicate: String,
58 object: String,
59}
60
61#[derive(serde::Serialize, serde::Deserialize)]
63struct RdfNamedGraphSnapshot {
64 name: String,
65 triples: Vec<SnapshotTriple>,
66}
67
68#[derive(serde::Serialize, serde::Deserialize)]
69struct SnapshotNode {
70 id: NodeId,
71 labels: Vec<String>,
72 properties: Vec<(String, Value)>,
73}
74
75#[derive(serde::Serialize, serde::Deserialize)]
76struct SnapshotEdge {
77 id: EdgeId,
78 src: NodeId,
79 dst: NodeId,
80 edge_type: String,
81 properties: Vec<(String, Value)>,
82}
83
84fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
86 store
87 .all_nodes()
88 .map(|n| SnapshotNode {
89 id: n.id,
90 labels: n.labels.iter().map(|l| l.to_string()).collect(),
91 properties: n
92 .properties
93 .into_iter()
94 .map(|(k, v)| (k.to_string(), v))
95 .collect(),
96 })
97 .collect()
98}
99
100fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
102 store
103 .all_edges()
104 .map(|e| SnapshotEdge {
105 id: e.id,
106 src: e.src,
107 dst: e.dst,
108 edge_type: e.edge_type.to_string(),
109 properties: e
110 .properties
111 .into_iter()
112 .map(|(k, v)| (k.to_string(), v))
113 .collect(),
114 })
115 .collect()
116}
117
118fn populate_store_from_snapshot(
120 store: &grafeo_core::graph::lpg::LpgStore,
121 nodes: Vec<SnapshotNode>,
122 edges: Vec<SnapshotEdge>,
123) -> Result<()> {
124 for node in nodes {
125 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
126 store.create_node_with_id(node.id, &label_refs)?;
127 for (key, value) in node.properties {
128 store.set_node_property(node.id, &key, value);
129 }
130 }
131 for edge in edges {
132 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
133 for (key, value) in edge.properties {
134 store.set_edge_property(edge.id, &key, value);
135 }
136 }
137 Ok(())
138}
139
140fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
142 let mut node_ids = HashSet::with_capacity(nodes.len());
143 for node in nodes {
144 if !node_ids.insert(node.id) {
145 return Err(Error::Internal(format!(
146 "snapshot contains duplicate node ID {}",
147 node.id
148 )));
149 }
150 }
151 let mut edge_ids = HashSet::with_capacity(edges.len());
152 for edge in edges {
153 if !edge_ids.insert(edge.id) {
154 return Err(Error::Internal(format!(
155 "snapshot contains duplicate edge ID {}",
156 edge.id
157 )));
158 }
159 if !node_ids.contains(&edge.src) {
160 return Err(Error::Internal(format!(
161 "snapshot edge {} references non-existent source node {}",
162 edge.id, edge.src
163 )));
164 }
165 if !node_ids.contains(&edge.dst) {
166 return Err(Error::Internal(format!(
167 "snapshot edge {} references non-existent destination node {}",
168 edge.id, edge.dst
169 )));
170 }
171 }
172 Ok(())
173}
174
175#[cfg(feature = "rdf")]
177fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
178 store
179 .triples()
180 .into_iter()
181 .map(|t| SnapshotTriple {
182 subject: t.subject().to_string(),
183 predicate: t.predicate().to_string(),
184 object: t.object().to_string(),
185 })
186 .collect()
187}
188
189#[cfg(feature = "rdf")]
191fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
192 use grafeo_core::graph::rdf::{Term, Triple};
193 for triple in triples {
194 if let (Some(s), Some(p), Some(o)) = (
195 Term::from_ntriples(&triple.subject),
196 Term::from_ntriples(&triple.predicate),
197 Term::from_ntriples(&triple.object),
198 ) {
199 store.insert(Triple::new(s, p, o));
200 }
201 }
202}
203
204#[cfg(feature = "grafeo-file")]
210pub(super) fn load_snapshot_into_store(
211 store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
212 catalog: &std::sync::Arc<crate::catalog::Catalog>,
213 #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
214 data: &[u8],
215) -> grafeo_common::utils::error::Result<()> {
216 use grafeo_common::utils::error::Error;
217
218 let config = bincode::config::standard();
219 let (snapshot, _) =
220 bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
221 Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
222 })?;
223
224 populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
225 for graph in &snapshot.named_graphs {
226 store
227 .create_graph(&graph.name)
228 .map_err(|e| Error::Internal(e.to_string()))?;
229 if let Some(graph_store) = store.graph(&graph.name) {
230 populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
231 }
232 }
233 restore_schema_from_snapshot(catalog, &snapshot.schema);
234
235 #[cfg(feature = "rdf")]
237 {
238 populate_rdf_store(rdf_store, &snapshot.rdf_triples);
239 for rdf_graph in &snapshot.rdf_named_graphs {
240 rdf_store.create_graph(&rdf_graph.name);
241 if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
242 populate_rdf_store(&graph_store, &rdf_graph.triples);
243 }
244 }
245 }
246
247 Ok(())
248}
249
250#[cfg(feature = "grafeo-file")]
252fn populate_store_from_snapshot_ref(
253 store: &grafeo_core::graph::lpg::LpgStore,
254 nodes: &[SnapshotNode],
255 edges: &[SnapshotEdge],
256) -> grafeo_common::utils::error::Result<()> {
257 for node in nodes {
258 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
259 store.create_node_with_id(node.id, &label_refs)?;
260 for (key, value) in &node.properties {
261 store.set_node_property(node.id, key, value.clone());
262 }
263 }
264 for edge in edges {
265 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
266 for (key, value) in &edge.properties {
267 store.set_edge_property(edge.id, key, value.clone());
268 }
269 }
270 Ok(())
271}
272
273fn restore_schema_from_snapshot(
275 catalog: &std::sync::Arc<crate::catalog::Catalog>,
276 schema: &SnapshotSchema,
277) {
278 for def in &schema.node_types {
279 catalog.register_or_replace_node_type(def.clone());
280 }
281 for def in &schema.edge_types {
282 catalog.register_or_replace_edge_type_def(def.clone());
283 }
284 for def in &schema.graph_types {
285 let _ = catalog.register_graph_type(def.clone());
286 }
287 for def in &schema.procedures {
288 catalog.replace_procedure(def.clone()).ok();
289 }
290 for name in &schema.schemas {
291 let _ = catalog.register_schema_namespace(name.clone());
292 }
293 for (graph_name, type_name) in &schema.graph_type_bindings {
294 let _ = catalog.bind_graph_type(graph_name, type_name.clone());
295 }
296}
297
298fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
300 SnapshotSchema {
301 node_types: catalog.all_node_type_defs(),
302 edge_types: catalog.all_edge_type_defs(),
303 graph_types: catalog.all_graph_type_defs(),
304 procedures: catalog.all_procedure_defs(),
305 schemas: catalog.schema_names(),
306 graph_type_bindings: catalog.all_graph_type_bindings(),
307 }
308}
309
310impl super::GrafeoDB {
311 #[cfg(feature = "wal")]
330 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
331 let path = path.as_ref();
332
333 #[cfg(feature = "grafeo-file")]
335 if path.extension().is_some_and(|ext| ext == "grafeo") {
336 return self.save_as_grafeo_file(path);
337 }
338
339 let target_config = Config::persistent(path);
341 let target = Self::with_config(target_config)?;
342
343 for node in self.store.all_nodes() {
345 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
346 target.store.create_node_with_id(node.id, &label_refs)?;
347
348 target.log_wal(&WalRecord::CreateNode {
350 id: node.id,
351 labels: node.labels.iter().map(|s| s.to_string()).collect(),
352 })?;
353
354 for (key, value) in node.properties {
356 target
357 .store
358 .set_node_property(node.id, key.as_str(), value.clone());
359 target.log_wal(&WalRecord::SetNodeProperty {
360 id: node.id,
361 key: key.to_string(),
362 value,
363 })?;
364 }
365 }
366
367 for edge in self.store.all_edges() {
369 target
370 .store
371 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
372
373 target.log_wal(&WalRecord::CreateEdge {
375 id: edge.id,
376 src: edge.src,
377 dst: edge.dst,
378 edge_type: edge.edge_type.to_string(),
379 })?;
380
381 for (key, value) in edge.properties {
383 target
384 .store
385 .set_edge_property(edge.id, key.as_str(), value.clone());
386 target.log_wal(&WalRecord::SetEdgeProperty {
387 id: edge.id,
388 key: key.to_string(),
389 value,
390 })?;
391 }
392 }
393
394 for graph_name in self.store.graph_names() {
396 if let Some(src_graph) = self.store.graph(&graph_name) {
397 target.log_wal(&WalRecord::CreateNamedGraph {
398 name: graph_name.clone(),
399 })?;
400 target
401 .store
402 .create_graph(&graph_name)
403 .map_err(|e| Error::Internal(e.to_string()))?;
404
405 if let Some(dst_graph) = target.store.graph(&graph_name) {
406 target.log_wal(&WalRecord::SwitchGraph {
408 name: Some(graph_name.clone()),
409 })?;
410
411 for node in src_graph.all_nodes() {
412 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
413 dst_graph.create_node_with_id(node.id, &label_refs)?;
414 target.log_wal(&WalRecord::CreateNode {
415 id: node.id,
416 labels: node.labels.iter().map(|s| s.to_string()).collect(),
417 })?;
418 for (key, value) in node.properties {
419 dst_graph.set_node_property(node.id, key.as_str(), value.clone());
420 target.log_wal(&WalRecord::SetNodeProperty {
421 id: node.id,
422 key: key.to_string(),
423 value,
424 })?;
425 }
426 }
427 for edge in src_graph.all_edges() {
428 dst_graph.create_edge_with_id(
429 edge.id,
430 edge.src,
431 edge.dst,
432 &edge.edge_type,
433 )?;
434 target.log_wal(&WalRecord::CreateEdge {
435 id: edge.id,
436 src: edge.src,
437 dst: edge.dst,
438 edge_type: edge.edge_type.to_string(),
439 })?;
440 for (key, value) in edge.properties {
441 dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
442 target.log_wal(&WalRecord::SetEdgeProperty {
443 id: edge.id,
444 key: key.to_string(),
445 value,
446 })?;
447 }
448 }
449 }
450 }
451 }
452
453 if !self.store.graph_names().is_empty() {
455 target.log_wal(&WalRecord::SwitchGraph { name: None })?;
456 }
457
458 #[cfg(feature = "rdf")]
460 {
461 for triple in self.rdf_store.triples() {
462 let record = WalRecord::InsertRdfTriple {
463 subject: triple.subject().to_string(),
464 predicate: triple.predicate().to_string(),
465 object: triple.object().to_string(),
466 graph: None,
467 };
468 target.rdf_store.insert((*triple).clone());
469 target.log_wal(&record)?;
470 }
471 for name in self.rdf_store.graph_names() {
472 target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
473 if let Some(src_graph) = self.rdf_store.graph(&name) {
474 let dst_graph = target.rdf_store.graph_or_create(&name);
475 for triple in src_graph.triples() {
476 let record = WalRecord::InsertRdfTriple {
477 subject: triple.subject().to_string(),
478 predicate: triple.predicate().to_string(),
479 object: triple.object().to_string(),
480 graph: Some(name.clone()),
481 };
482 dst_graph.insert((*triple).clone());
483 target.log_wal(&record)?;
484 }
485 }
486 }
487 }
488
489 target.close()?;
491
492 Ok(())
493 }
494
495 #[cfg(feature = "grafeo-file")]
502 fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
503 use grafeo_adapters::storage::file::GrafeoFileManager;
504
505 let snapshot_data = self.export_snapshot()?;
506 let epoch = self.store.current_epoch();
507 let transaction_id = self
508 .transaction_manager
509 .last_assigned_transaction_id()
510 .map_or(0, |t| t.0);
511 let node_count = self.store.node_count() as u64;
512 let edge_count = self.store.edge_count() as u64;
513
514 let fm = GrafeoFileManager::create(path)?;
515 fm.write_snapshot(
516 &snapshot_data,
517 epoch.0,
518 transaction_id,
519 node_count,
520 edge_count,
521 )?;
522 Ok(())
523 }
524
525 pub fn to_memory(&self) -> Result<Self> {
532 let config = Config::in_memory();
533 let target = Self::with_config(config)?;
534
535 for node in self.store.all_nodes() {
537 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
538 target.store.create_node_with_id(node.id, &label_refs)?;
539 for (key, value) in node.properties {
540 target.store.set_node_property(node.id, key.as_str(), value);
541 }
542 }
543
544 for edge in self.store.all_edges() {
546 target
547 .store
548 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
549 for (key, value) in edge.properties {
550 target.store.set_edge_property(edge.id, key.as_str(), value);
551 }
552 }
553
554 for graph_name in self.store.graph_names() {
556 if let Some(src_graph) = self.store.graph(&graph_name) {
557 target
558 .store
559 .create_graph(&graph_name)
560 .map_err(|e| Error::Internal(e.to_string()))?;
561 if let Some(dst_graph) = target.store.graph(&graph_name) {
562 for node in src_graph.all_nodes() {
563 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
564 dst_graph.create_node_with_id(node.id, &label_refs)?;
565 for (key, value) in node.properties {
566 dst_graph.set_node_property(node.id, key.as_str(), value);
567 }
568 }
569 for edge in src_graph.all_edges() {
570 dst_graph.create_edge_with_id(
571 edge.id,
572 edge.src,
573 edge.dst,
574 &edge.edge_type,
575 )?;
576 for (key, value) in edge.properties {
577 dst_graph.set_edge_property(edge.id, key.as_str(), value);
578 }
579 }
580 }
581 }
582 }
583
584 #[cfg(feature = "rdf")]
586 {
587 for triple in self.rdf_store.triples() {
588 target.rdf_store.insert((*triple).clone());
589 }
590 for name in self.rdf_store.graph_names() {
591 if let Some(src_graph) = self.rdf_store.graph(&name) {
592 let dst_graph = target.rdf_store.graph_or_create(&name);
593 for triple in src_graph.triples() {
594 dst_graph.insert((*triple).clone());
595 }
596 }
597 }
598 }
599
600 Ok(target)
601 }
602
603 #[cfg(feature = "wal")]
612 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
613 let source = Self::open(path)?;
615
616 let target = source.to_memory()?;
618
619 source.close()?;
621
622 Ok(target)
623 }
624
625 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
639 let nodes = collect_snapshot_nodes(&self.store);
640 let edges = collect_snapshot_edges(&self.store);
641
642 let named_graphs: Vec<NamedGraphSnapshot> = self
644 .store
645 .graph_names()
646 .into_iter()
647 .filter_map(|name| {
648 self.store
649 .graph(&name)
650 .map(|graph_store| NamedGraphSnapshot {
651 name,
652 nodes: collect_snapshot_nodes(&graph_store),
653 edges: collect_snapshot_edges(&graph_store),
654 })
655 })
656 .collect();
657
658 #[cfg(feature = "rdf")]
660 let rdf_triples = collect_rdf_triples(&self.rdf_store);
661 #[cfg(not(feature = "rdf"))]
662 let rdf_triples = Vec::new();
663
664 #[cfg(feature = "rdf")]
665 let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
666 .rdf_store
667 .graph_names()
668 .into_iter()
669 .filter_map(|name| {
670 self.rdf_store
671 .graph(&name)
672 .map(|graph| RdfNamedGraphSnapshot {
673 name,
674 triples: collect_rdf_triples(&graph),
675 })
676 })
677 .collect();
678 #[cfg(not(feature = "rdf"))]
679 let rdf_named_graphs = Vec::new();
680
681 let schema = collect_schema(&self.catalog);
682
683 let snapshot = Snapshot {
684 version: SNAPSHOT_VERSION,
685 nodes,
686 edges,
687 named_graphs,
688 rdf_triples,
689 rdf_named_graphs,
690 schema,
691 };
692
693 let config = bincode::config::standard();
694 bincode::serde::encode_to_vec(&snapshot, config)
695 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
696 }
697
698 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
713 if data.is_empty() {
714 return Err(Error::Internal("empty snapshot data".to_string()));
715 }
716
717 if data[0] != SNAPSHOT_VERSION {
719 return Err(Error::Internal(format!(
720 "unsupported snapshot version: {} (expected {SNAPSHOT_VERSION})",
721 data[0]
722 )));
723 }
724
725 let config = bincode::config::standard();
726 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
727 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
728
729 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
731
732 for ng in &snapshot.named_graphs {
734 validate_snapshot_data(&ng.nodes, &ng.edges)?;
735 }
736
737 let db = Self::new_in_memory();
738 populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
739
740 for ng in snapshot.named_graphs {
742 db.store
743 .create_graph(&ng.name)
744 .map_err(|e| Error::Internal(e.to_string()))?;
745 if let Some(graph_store) = db.store.graph(&ng.name) {
746 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
747 }
748 }
749
750 #[cfg(feature = "rdf")]
752 {
753 populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
754 for rng in &snapshot.rdf_named_graphs {
755 let graph = db.rdf_store.graph_or_create(&rng.name);
756 populate_rdf_store(&graph, &rng.triples);
757 }
758 }
759
760 restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
762
763 Ok(db)
764 }
765
766 pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
782 if data.is_empty() {
783 return Err(Error::Internal("empty snapshot data".to_string()));
784 }
785
786 if data[0] != SNAPSHOT_VERSION {
787 return Err(Error::Internal(format!(
788 "unsupported snapshot version: {} (expected {SNAPSHOT_VERSION})",
789 data[0]
790 )));
791 }
792
793 let config = bincode::config::standard();
794 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
795 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
796
797 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
799 for ng in &snapshot.named_graphs {
800 validate_snapshot_data(&ng.nodes, &ng.edges)?;
801 }
802
803 for name in self.store.graph_names() {
805 self.store.drop_graph(&name);
806 }
807 self.store.clear();
808
809 populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
810
811 for ng in snapshot.named_graphs {
813 self.store
814 .create_graph(&ng.name)
815 .map_err(|e| Error::Internal(e.to_string()))?;
816 if let Some(graph_store) = self.store.graph(&ng.name) {
817 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
818 }
819 }
820
821 #[cfg(feature = "rdf")]
823 {
824 self.rdf_store.clear();
826 for name in self.rdf_store.graph_names() {
827 self.rdf_store.drop_graph(&name);
828 }
829 populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
830 for rng in &snapshot.rdf_named_graphs {
831 let graph = self.rdf_store.graph_or_create(&rng.name);
832 populate_rdf_store(&graph, &rng.triples);
833 }
834 }
835
836 restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
838
839 Ok(())
840 }
841
842 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
850 self.store.all_nodes()
851 }
852
853 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
857 self.store.all_edges()
858 }
859}
860
861#[cfg(test)]
862mod tests {
863 use grafeo_common::types::{EdgeId, NodeId, Value};
864
865 use super::super::GrafeoDB;
866 use super::{SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotNode, SnapshotSchema};
867
868 #[test]
869 fn test_restore_snapshot_basic() {
870 let db = GrafeoDB::new_in_memory();
871 let session = db.session();
872
873 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
875 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
876
877 let snapshot = db.export_snapshot().unwrap();
878
879 session
881 .execute("INSERT (:Person {name: 'Vincent'})")
882 .unwrap();
883 assert_eq!(db.store.node_count(), 3);
884
885 db.restore_snapshot(&snapshot).unwrap();
887
888 assert_eq!(db.store.node_count(), 2);
889 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
890 assert_eq!(result.rows.len(), 2);
891 }
892
893 #[test]
894 fn test_restore_snapshot_validation_failure() {
895 let db = GrafeoDB::new_in_memory();
896 let session = db.session();
897
898 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
899
900 let result = db.restore_snapshot(b"garbage");
902 assert!(result.is_err());
903
904 assert_eq!(db.store.node_count(), 1);
906 }
907
908 #[test]
909 fn test_restore_snapshot_empty_db() {
910 let db = GrafeoDB::new_in_memory();
911
912 let empty_snapshot = db.export_snapshot().unwrap();
914
915 let session = db.session();
916 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
917 assert_eq!(db.store.node_count(), 1);
918
919 db.restore_snapshot(&empty_snapshot).unwrap();
920 assert_eq!(db.store.node_count(), 0);
921 }
922
923 #[test]
924 fn test_restore_snapshot_with_edges() {
925 let db = GrafeoDB::new_in_memory();
926 let session = db.session();
927
928 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
929 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
930 session
931 .execute(
932 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
933 )
934 .unwrap();
935
936 let snapshot = db.export_snapshot().unwrap();
937 assert_eq!(db.store.edge_count(), 1);
938
939 session
941 .execute("INSERT (:Person {name: 'Vincent'})")
942 .unwrap();
943
944 db.restore_snapshot(&snapshot).unwrap();
946 assert_eq!(db.store.node_count(), 2);
947 assert_eq!(db.store.edge_count(), 1);
948 }
949
950 #[test]
951 fn test_restore_snapshot_preserves_sessions() {
952 let db = GrafeoDB::new_in_memory();
953 let session = db.session();
954
955 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
956 let snapshot = db.export_snapshot().unwrap();
957
958 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
960
961 db.restore_snapshot(&snapshot).unwrap();
963
964 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
966 assert_eq!(result.rows.len(), 1);
967 }
968
969 #[test]
970 fn test_export_import_roundtrip() {
971 let db = GrafeoDB::new_in_memory();
972 let session = db.session();
973
974 session
975 .execute("INSERT (:Person {name: 'Alix', age: 30})")
976 .unwrap();
977
978 let snapshot = db.export_snapshot().unwrap();
979 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
980 let session2 = db2.session();
981
982 let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
983 assert_eq!(result.rows.len(), 1);
984 }
985
986 #[test]
989 fn test_to_memory_empty() {
990 let db = GrafeoDB::new_in_memory();
991 let copy = db.to_memory().unwrap();
992 assert_eq!(copy.store.node_count(), 0);
993 assert_eq!(copy.store.edge_count(), 0);
994 }
995
996 #[test]
997 fn test_to_memory_copies_nodes_and_properties() {
998 let db = GrafeoDB::new_in_memory();
999 let session = db.session();
1000 session
1001 .execute("INSERT (:Person {name: 'Alix', age: 30})")
1002 .unwrap();
1003 session
1004 .execute("INSERT (:Person {name: 'Gus', age: 25})")
1005 .unwrap();
1006
1007 let copy = db.to_memory().unwrap();
1008 assert_eq!(copy.store.node_count(), 2);
1009
1010 let s2 = copy.session();
1011 let result = s2
1012 .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1013 .unwrap();
1014 assert_eq!(result.rows.len(), 2);
1015 assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1016 assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1017 }
1018
1019 #[test]
1020 fn test_to_memory_copies_edges_and_properties() {
1021 let db = GrafeoDB::new_in_memory();
1022 let a = db.create_node(&["Person"]);
1023 db.set_node_property(a, "name", "Alix".into());
1024 let b = db.create_node(&["Person"]);
1025 db.set_node_property(b, "name", "Gus".into());
1026 let edge = db.create_edge(a, b, "KNOWS");
1027 db.set_edge_property(edge, "since", Value::Int64(2020));
1028
1029 let copy = db.to_memory().unwrap();
1030 assert_eq!(copy.store.node_count(), 2);
1031 assert_eq!(copy.store.edge_count(), 1);
1032
1033 let s2 = copy.session();
1034 let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1035 assert_eq!(result.rows[0][0], Value::Int64(2020));
1036 }
1037
1038 #[test]
1039 fn test_to_memory_is_independent() {
1040 let db = GrafeoDB::new_in_memory();
1041 let session = db.session();
1042 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1043
1044 let copy = db.to_memory().unwrap();
1045
1046 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1048 assert_eq!(db.store.node_count(), 2);
1049 assert_eq!(copy.store.node_count(), 1);
1050 }
1051
1052 #[test]
1055 fn test_iter_nodes_empty() {
1056 let db = GrafeoDB::new_in_memory();
1057 assert_eq!(db.iter_nodes().count(), 0);
1058 }
1059
1060 #[test]
1061 fn test_iter_nodes_returns_all() {
1062 let db = GrafeoDB::new_in_memory();
1063 let id1 = db.create_node(&["Person"]);
1064 db.set_node_property(id1, "name", "Alix".into());
1065 let id2 = db.create_node(&["Animal"]);
1066 db.set_node_property(id2, "name", "Fido".into());
1067
1068 let nodes: Vec<_> = db.iter_nodes().collect();
1069 assert_eq!(nodes.len(), 2);
1070
1071 let names: Vec<_> = nodes
1072 .iter()
1073 .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1074 .map(|(_, v)| v.clone())
1075 .collect();
1076 assert!(names.contains(&Value::String("Alix".into())));
1077 assert!(names.contains(&Value::String("Fido".into())));
1078 }
1079
1080 #[test]
1081 fn test_iter_edges_empty() {
1082 let db = GrafeoDB::new_in_memory();
1083 assert_eq!(db.iter_edges().count(), 0);
1084 }
1085
1086 #[test]
1087 fn test_iter_edges_returns_all() {
1088 let db = GrafeoDB::new_in_memory();
1089 let a = db.create_node(&["A"]);
1090 let b = db.create_node(&["B"]);
1091 let c = db.create_node(&["C"]);
1092 db.create_edge(a, b, "R1");
1093 db.create_edge(b, c, "R2");
1094
1095 let edges: Vec<_> = db.iter_edges().collect();
1096 assert_eq!(edges.len(), 2);
1097
1098 let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1099 assert!(types.contains(&"R1"));
1100 assert!(types.contains(&"R2"));
1101 }
1102
1103 fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1106 let snap = Snapshot {
1107 version,
1108 nodes,
1109 edges,
1110 named_graphs: vec![],
1111 rdf_triples: vec![],
1112 rdf_named_graphs: vec![],
1113 schema: SnapshotSchema::default(),
1114 };
1115 bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1116 }
1117
1118 #[test]
1119 fn test_restore_rejects_unsupported_version() {
1120 let db = GrafeoDB::new_in_memory();
1121 let session = db.session();
1122 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1123
1124 let bytes = make_snapshot(99, vec![], vec![]);
1125
1126 let result = db.restore_snapshot(&bytes);
1127 assert!(result.is_err());
1128 let err = result.unwrap_err().to_string();
1129 assert!(err.contains("unsupported snapshot version"), "got: {err}");
1130
1131 assert_eq!(db.store.node_count(), 1);
1133 }
1134
1135 #[test]
1136 fn test_restore_rejects_duplicate_node_ids() {
1137 let db = GrafeoDB::new_in_memory();
1138 let session = db.session();
1139 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1140
1141 let bytes = make_snapshot(
1142 SNAPSHOT_VERSION,
1143 vec![
1144 SnapshotNode {
1145 id: NodeId::new(0),
1146 labels: vec!["A".into()],
1147 properties: vec![],
1148 },
1149 SnapshotNode {
1150 id: NodeId::new(0),
1151 labels: vec!["B".into()],
1152 properties: vec![],
1153 },
1154 ],
1155 vec![],
1156 );
1157
1158 let result = db.restore_snapshot(&bytes);
1159 assert!(result.is_err());
1160 let err = result.unwrap_err().to_string();
1161 assert!(err.contains("duplicate node ID"), "got: {err}");
1162 assert_eq!(db.store.node_count(), 1);
1163 }
1164
1165 #[test]
1166 fn test_restore_rejects_duplicate_edge_ids() {
1167 let db = GrafeoDB::new_in_memory();
1168
1169 let bytes = make_snapshot(
1170 SNAPSHOT_VERSION,
1171 vec![
1172 SnapshotNode {
1173 id: NodeId::new(0),
1174 labels: vec![],
1175 properties: vec![],
1176 },
1177 SnapshotNode {
1178 id: NodeId::new(1),
1179 labels: vec![],
1180 properties: vec![],
1181 },
1182 ],
1183 vec![
1184 SnapshotEdge {
1185 id: EdgeId::new(0),
1186 src: NodeId::new(0),
1187 dst: NodeId::new(1),
1188 edge_type: "REL".into(),
1189 properties: vec![],
1190 },
1191 SnapshotEdge {
1192 id: EdgeId::new(0),
1193 src: NodeId::new(0),
1194 dst: NodeId::new(1),
1195 edge_type: "REL".into(),
1196 properties: vec![],
1197 },
1198 ],
1199 );
1200
1201 let result = db.restore_snapshot(&bytes);
1202 assert!(result.is_err());
1203 let err = result.unwrap_err().to_string();
1204 assert!(err.contains("duplicate edge ID"), "got: {err}");
1205 }
1206
1207 #[test]
1208 fn test_restore_rejects_dangling_source() {
1209 let db = GrafeoDB::new_in_memory();
1210
1211 let bytes = make_snapshot(
1212 SNAPSHOT_VERSION,
1213 vec![SnapshotNode {
1214 id: NodeId::new(0),
1215 labels: vec![],
1216 properties: vec![],
1217 }],
1218 vec![SnapshotEdge {
1219 id: EdgeId::new(0),
1220 src: NodeId::new(999),
1221 dst: NodeId::new(0),
1222 edge_type: "REL".into(),
1223 properties: vec![],
1224 }],
1225 );
1226
1227 let result = db.restore_snapshot(&bytes);
1228 assert!(result.is_err());
1229 let err = result.unwrap_err().to_string();
1230 assert!(err.contains("non-existent source node"), "got: {err}");
1231 }
1232
1233 #[test]
1234 fn test_restore_rejects_dangling_destination() {
1235 let db = GrafeoDB::new_in_memory();
1236
1237 let bytes = make_snapshot(
1238 SNAPSHOT_VERSION,
1239 vec![SnapshotNode {
1240 id: NodeId::new(0),
1241 labels: vec![],
1242 properties: vec![],
1243 }],
1244 vec![SnapshotEdge {
1245 id: EdgeId::new(0),
1246 src: NodeId::new(0),
1247 dst: NodeId::new(999),
1248 edge_type: "REL".into(),
1249 properties: vec![],
1250 }],
1251 );
1252
1253 let result = db.restore_snapshot(&bytes);
1254 assert!(result.is_err());
1255 let err = result.unwrap_err().to_string();
1256 assert!(err.contains("non-existent destination node"), "got: {err}");
1257 }
1258}