1#[cfg(any(feature = "wal", feature = "grafeo-file"))]
4use std::path::Path;
5
6#[cfg(any(feature = "vector-index", feature = "text-index"))]
7use grafeo_common::grafeo_warn;
8use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
9use grafeo_common::utils::error::{Error, Result};
10use hashbrown::HashSet;
11
12use crate::config::Config;
13
14#[cfg(feature = "wal")]
15use grafeo_adapters::storage::wal::WalRecord;
16
17use crate::catalog::{
18 EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
19};
20
21const SNAPSHOT_VERSION: u8 = 4;
23
24#[derive(serde::Serialize, serde::Deserialize)]
27struct Snapshot {
28 version: u8,
29 nodes: Vec<SnapshotNode>,
30 edges: Vec<SnapshotEdge>,
31 named_graphs: Vec<NamedGraphSnapshot>,
32 rdf_triples: Vec<SnapshotTriple>,
33 rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
34 schema: SnapshotSchema,
35 indexes: SnapshotIndexes,
36 epoch: u64,
38}
39
40#[derive(serde::Serialize, serde::Deserialize, Default)]
42struct SnapshotSchema {
43 node_types: Vec<NodeTypeDefinition>,
44 edge_types: Vec<EdgeTypeDefinition>,
45 graph_types: Vec<GraphTypeDefinition>,
46 procedures: Vec<ProcedureDefinition>,
47 schemas: Vec<String>,
48 graph_type_bindings: Vec<(String, String)>,
49}
50
51#[derive(serde::Serialize, serde::Deserialize, Default)]
53struct SnapshotIndexes {
54 property_indexes: Vec<String>,
55 vector_indexes: Vec<SnapshotVectorIndex>,
56 text_indexes: Vec<SnapshotTextIndex>,
57}
58
59#[derive(serde::Serialize, serde::Deserialize)]
61struct SnapshotVectorIndex {
62 label: String,
63 property: String,
64 dimensions: usize,
65 metric: grafeo_core::index::vector::DistanceMetric,
66 m: usize,
67 ef_construction: usize,
68}
69
70#[derive(serde::Serialize, serde::Deserialize)]
72struct SnapshotTextIndex {
73 label: String,
74 property: String,
75}
76
77#[derive(serde::Serialize, serde::Deserialize)]
79struct NamedGraphSnapshot {
80 name: String,
81 nodes: Vec<SnapshotNode>,
82 edges: Vec<SnapshotEdge>,
83}
84
85#[derive(serde::Serialize, serde::Deserialize)]
87struct SnapshotTriple {
88 subject: String,
89 predicate: String,
90 object: String,
91}
92
93#[derive(serde::Serialize, serde::Deserialize)]
95struct RdfNamedGraphSnapshot {
96 name: String,
97 triples: Vec<SnapshotTriple>,
98}
99
100#[derive(serde::Serialize, serde::Deserialize)]
101struct SnapshotNode {
102 id: NodeId,
103 labels: Vec<String>,
104 properties: Vec<(String, Vec<(EpochId, Value)>)>,
106}
107
108#[derive(serde::Serialize, serde::Deserialize)]
109struct SnapshotEdge {
110 id: EdgeId,
111 src: NodeId,
112 dst: NodeId,
113 edge_type: String,
114 properties: Vec<(String, Vec<(EpochId, Value)>)>,
116}
117
118fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
123 store
124 .all_nodes()
125 .map(|n| {
126 #[cfg(feature = "temporal")]
127 let properties = store
128 .node_property_history(n.id)
129 .into_iter()
130 .map(|(k, entries)| (k.to_string(), entries))
131 .collect();
132
133 #[cfg(not(feature = "temporal"))]
134 let properties = n
135 .properties
136 .into_iter()
137 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
138 .collect();
139
140 SnapshotNode {
141 id: n.id,
142 labels: n.labels.iter().map(|l| l.to_string()).collect(),
143 properties,
144 }
145 })
146 .collect()
147}
148
149fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
154 store
155 .all_edges()
156 .map(|e| {
157 #[cfg(feature = "temporal")]
158 let properties = store
159 .edge_property_history(e.id)
160 .into_iter()
161 .map(|(k, entries)| (k.to_string(), entries))
162 .collect();
163
164 #[cfg(not(feature = "temporal"))]
165 let properties = e
166 .properties
167 .into_iter()
168 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
169 .collect();
170
171 SnapshotEdge {
172 id: e.id,
173 src: e.src,
174 dst: e.dst,
175 edge_type: e.edge_type.to_string(),
176 properties,
177 }
178 })
179 .collect()
180}
181
182fn populate_store_from_snapshot(
187 store: &grafeo_core::graph::lpg::LpgStore,
188 nodes: Vec<SnapshotNode>,
189 edges: Vec<SnapshotEdge>,
190) -> Result<()> {
191 for node in nodes {
192 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
193 store.create_node_with_id(node.id, &label_refs)?;
194 for (key, entries) in node.properties {
195 #[cfg(feature = "temporal")]
196 for (epoch, value) in entries {
197 store.set_node_property_at_epoch(node.id, &key, value, epoch);
198 }
199 #[cfg(not(feature = "temporal"))]
200 if let Some((_, value)) = entries.into_iter().last() {
201 store.set_node_property(node.id, &key, value);
202 }
203 }
204 }
205 for edge in edges {
206 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
207 for (key, entries) in edge.properties {
208 #[cfg(feature = "temporal")]
209 for (epoch, value) in entries {
210 store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
211 }
212 #[cfg(not(feature = "temporal"))]
213 if let Some((_, value)) = entries.into_iter().last() {
214 store.set_edge_property(edge.id, &key, value);
215 }
216 }
217 }
218 Ok(())
219}
220
221fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
223 let mut node_ids = HashSet::with_capacity(nodes.len());
224 for node in nodes {
225 if !node_ids.insert(node.id) {
226 return Err(Error::Internal(format!(
227 "snapshot contains duplicate node ID {}",
228 node.id
229 )));
230 }
231 }
232 let mut edge_ids = HashSet::with_capacity(edges.len());
233 for edge in edges {
234 if !edge_ids.insert(edge.id) {
235 return Err(Error::Internal(format!(
236 "snapshot contains duplicate edge ID {}",
237 edge.id
238 )));
239 }
240 if !node_ids.contains(&edge.src) {
241 return Err(Error::Internal(format!(
242 "snapshot edge {} references non-existent source node {}",
243 edge.id, edge.src
244 )));
245 }
246 if !node_ids.contains(&edge.dst) {
247 return Err(Error::Internal(format!(
248 "snapshot edge {} references non-existent destination node {}",
249 edge.id, edge.dst
250 )));
251 }
252 }
253 Ok(())
254}
255
256#[cfg(feature = "rdf")]
258fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
259 store
260 .triples()
261 .into_iter()
262 .map(|t| SnapshotTriple {
263 subject: t.subject().to_string(),
264 predicate: t.predicate().to_string(),
265 object: t.object().to_string(),
266 })
267 .collect()
268}
269
270#[cfg(feature = "rdf")]
272fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
273 use grafeo_core::graph::rdf::{Term, Triple};
274 for triple in triples {
275 if let (Some(s), Some(p), Some(o)) = (
276 Term::from_ntriples(&triple.subject),
277 Term::from_ntriples(&triple.predicate),
278 Term::from_ntriples(&triple.object),
279 ) {
280 store.insert(Triple::new(s, p, o));
281 }
282 }
283}
284
285#[cfg(feature = "grafeo-file")]
291pub(super) fn load_snapshot_into_store(
292 store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
293 catalog: &std::sync::Arc<crate::catalog::Catalog>,
294 #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
295 data: &[u8],
296) -> grafeo_common::utils::error::Result<()> {
297 use grafeo_common::utils::error::Error;
298
299 let config = bincode::config::standard();
300 let (snapshot, _) =
301 bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
302 Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
303 })?;
304
305 populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
306
307 #[cfg(feature = "temporal")]
310 store.sync_epoch(EpochId::new(snapshot.epoch));
311
312 for graph in &snapshot.named_graphs {
313 store
314 .create_graph(&graph.name)
315 .map_err(|e| Error::Internal(e.to_string()))?;
316 if let Some(graph_store) = store.graph(&graph.name) {
317 populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
318 #[cfg(feature = "temporal")]
319 graph_store.sync_epoch(EpochId::new(snapshot.epoch));
320 }
321 }
322 restore_schema_from_snapshot(catalog, &snapshot.schema);
323
324 #[cfg(feature = "rdf")]
326 {
327 populate_rdf_store(rdf_store, &snapshot.rdf_triples);
328 for rdf_graph in &snapshot.rdf_named_graphs {
329 rdf_store.create_graph(&rdf_graph.name);
330 if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
331 populate_rdf_store(&graph_store, &rdf_graph.triples);
332 }
333 }
334 }
335
336 Ok(())
337}
338
339#[cfg(feature = "grafeo-file")]
341fn populate_store_from_snapshot_ref(
342 store: &grafeo_core::graph::lpg::LpgStore,
343 nodes: &[SnapshotNode],
344 edges: &[SnapshotEdge],
345) -> grafeo_common::utils::error::Result<()> {
346 for node in nodes {
347 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
348 store.create_node_with_id(node.id, &label_refs)?;
349 for (key, entries) in &node.properties {
350 #[cfg(feature = "temporal")]
351 for (epoch, value) in entries {
352 store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
353 }
354 #[cfg(not(feature = "temporal"))]
355 if let Some((_, value)) = entries.last() {
356 store.set_node_property(node.id, key, value.clone());
357 }
358 }
359 }
360 for edge in edges {
361 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
362 for (key, entries) in &edge.properties {
363 #[cfg(feature = "temporal")]
364 for (epoch, value) in entries {
365 store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
366 }
367 #[cfg(not(feature = "temporal"))]
368 if let Some((_, value)) = entries.last() {
369 store.set_edge_property(edge.id, key, value.clone());
370 }
371 }
372 }
373 Ok(())
374}
375
376fn restore_schema_from_snapshot(
378 catalog: &std::sync::Arc<crate::catalog::Catalog>,
379 schema: &SnapshotSchema,
380) {
381 for def in &schema.node_types {
382 catalog.register_or_replace_node_type(def.clone());
383 }
384 for def in &schema.edge_types {
385 catalog.register_or_replace_edge_type_def(def.clone());
386 }
387 for def in &schema.graph_types {
388 let _ = catalog.register_graph_type(def.clone());
389 }
390 for def in &schema.procedures {
391 catalog.replace_procedure(def.clone()).ok();
392 }
393 for name in &schema.schemas {
394 let _ = catalog.register_schema_namespace(name.clone());
395 }
396 for (graph_name, type_name) in &schema.graph_type_bindings {
397 let _ = catalog.bind_graph_type(graph_name, type_name.clone());
398 }
399}
400
401fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
403 SnapshotSchema {
404 node_types: catalog.all_node_type_defs(),
405 edge_types: catalog.all_edge_type_defs(),
406 graph_types: catalog.all_graph_type_defs(),
407 procedures: catalog.all_procedure_defs(),
408 schemas: catalog.schema_names(),
409 graph_type_bindings: catalog.all_graph_type_bindings(),
410 }
411}
412
413fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
418 for name in &indexes.property_indexes {
419 db.lpg_store().create_property_index(name);
420 }
421
422 #[cfg(feature = "vector-index")]
423 for vi in &indexes.vector_indexes {
424 if let Err(err) = db.create_vector_index(
425 &vi.label,
426 &vi.property,
427 Some(vi.dimensions),
428 Some(vi.metric.name()),
429 Some(vi.m),
430 Some(vi.ef_construction),
431 ) {
432 grafeo_warn!(
433 "Failed to restore vector index :{label}({property}): {err}",
434 label = vi.label,
435 property = vi.property,
436 );
437 }
438 }
439
440 #[cfg(feature = "text-index")]
441 for ti in &indexes.text_indexes {
442 if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
443 grafeo_warn!(
444 "Failed to restore text index :{label}({property}): {err}",
445 label = ti.label,
446 property = ti.property,
447 );
448 }
449 }
450}
451
452fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
454 let property_indexes = store.property_index_keys();
455
456 #[cfg(feature = "vector-index")]
457 let vector_indexes: Vec<SnapshotVectorIndex> = store
458 .vector_index_entries()
459 .into_iter()
460 .filter_map(|(key, index)| {
461 let (label, property) = key.split_once(':')?;
462 let config = index.config();
463 Some(SnapshotVectorIndex {
464 label: label.to_string(),
465 property: property.to_string(),
466 dimensions: config.dimensions,
467 metric: config.metric,
468 m: config.m,
469 ef_construction: config.ef_construction,
470 })
471 })
472 .collect();
473 #[cfg(not(feature = "vector-index"))]
474 let vector_indexes = Vec::new();
475
476 #[cfg(feature = "text-index")]
477 let text_indexes: Vec<SnapshotTextIndex> = store
478 .text_index_entries()
479 .into_iter()
480 .filter_map(|(key, _)| {
481 let (label, property) = key.split_once(':')?;
482 Some(SnapshotTextIndex {
483 label: label.to_string(),
484 property: property.to_string(),
485 })
486 })
487 .collect();
488 #[cfg(not(feature = "text-index"))]
489 let text_indexes = Vec::new();
490
491 SnapshotIndexes {
492 property_indexes,
493 vector_indexes,
494 text_indexes,
495 }
496}
497
498impl super::GrafeoDB {
499 #[cfg(feature = "wal")]
518 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
519 let path = path.as_ref();
520
521 #[cfg(feature = "grafeo-file")]
523 if path.extension().is_some_and(|ext| ext == "grafeo") {
524 return self.save_as_grafeo_file(path);
525 }
526
527 let target_config = Config::persistent(path);
529 let target = Self::with_config(target_config)?;
530
531 for node in self.lpg_store().all_nodes() {
533 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
534 target
535 .lpg_store()
536 .create_node_with_id(node.id, &label_refs)?;
537
538 target.log_wal(&WalRecord::CreateNode {
540 id: node.id,
541 labels: node.labels.iter().map(|s| s.to_string()).collect(),
542 })?;
543
544 for (key, value) in node.properties {
546 target
547 .lpg_store()
548 .set_node_property(node.id, key.as_str(), value.clone());
549 target.log_wal(&WalRecord::SetNodeProperty {
550 id: node.id,
551 key: key.to_string(),
552 value,
553 })?;
554 }
555 }
556
557 for edge in self.lpg_store().all_edges() {
559 target
560 .lpg_store()
561 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
562
563 target.log_wal(&WalRecord::CreateEdge {
565 id: edge.id,
566 src: edge.src,
567 dst: edge.dst,
568 edge_type: edge.edge_type.to_string(),
569 })?;
570
571 for (key, value) in edge.properties {
573 target
574 .lpg_store()
575 .set_edge_property(edge.id, key.as_str(), value.clone());
576 target.log_wal(&WalRecord::SetEdgeProperty {
577 id: edge.id,
578 key: key.to_string(),
579 value,
580 })?;
581 }
582 }
583
584 for graph_name in self.lpg_store().graph_names() {
586 if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
587 target.log_wal(&WalRecord::CreateNamedGraph {
588 name: graph_name.clone(),
589 })?;
590 target
591 .lpg_store()
592 .create_graph(&graph_name)
593 .map_err(|e| Error::Internal(e.to_string()))?;
594
595 if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
596 target.log_wal(&WalRecord::SwitchGraph {
598 name: Some(graph_name.clone()),
599 })?;
600
601 for node in src_graph.all_nodes() {
602 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
603 dst_graph.create_node_with_id(node.id, &label_refs)?;
604 target.log_wal(&WalRecord::CreateNode {
605 id: node.id,
606 labels: node.labels.iter().map(|s| s.to_string()).collect(),
607 })?;
608 for (key, value) in node.properties {
609 dst_graph.set_node_property(node.id, key.as_str(), value.clone());
610 target.log_wal(&WalRecord::SetNodeProperty {
611 id: node.id,
612 key: key.to_string(),
613 value,
614 })?;
615 }
616 }
617 for edge in src_graph.all_edges() {
618 dst_graph.create_edge_with_id(
619 edge.id,
620 edge.src,
621 edge.dst,
622 &edge.edge_type,
623 )?;
624 target.log_wal(&WalRecord::CreateEdge {
625 id: edge.id,
626 src: edge.src,
627 dst: edge.dst,
628 edge_type: edge.edge_type.to_string(),
629 })?;
630 for (key, value) in edge.properties {
631 dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
632 target.log_wal(&WalRecord::SetEdgeProperty {
633 id: edge.id,
634 key: key.to_string(),
635 value,
636 })?;
637 }
638 }
639 }
640 }
641 }
642
643 if !self.lpg_store().graph_names().is_empty() {
645 target.log_wal(&WalRecord::SwitchGraph { name: None })?;
646 }
647
648 #[cfg(feature = "rdf")]
650 {
651 for triple in self.rdf_store.triples() {
652 let record = WalRecord::InsertRdfTriple {
653 subject: triple.subject().to_string(),
654 predicate: triple.predicate().to_string(),
655 object: triple.object().to_string(),
656 graph: None,
657 };
658 target.rdf_store.insert((*triple).clone());
659 target.log_wal(&record)?;
660 }
661 for name in self.rdf_store.graph_names() {
662 target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
663 if let Some(src_graph) = self.rdf_store.graph(&name) {
664 let dst_graph = target.rdf_store.graph_or_create(&name);
665 for triple in src_graph.triples() {
666 let record = WalRecord::InsertRdfTriple {
667 subject: triple.subject().to_string(),
668 predicate: triple.predicate().to_string(),
669 object: triple.object().to_string(),
670 graph: Some(name.clone()),
671 };
672 dst_graph.insert((*triple).clone());
673 target.log_wal(&record)?;
674 }
675 }
676 }
677 }
678
679 target.close()?;
681
682 Ok(())
683 }
684
685 #[cfg(feature = "grafeo-file")]
692 fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
693 use grafeo_adapters::storage::file::GrafeoFileManager;
694
695 let snapshot_data = self.export_snapshot()?;
696 let epoch = self.lpg_store().current_epoch();
697 let transaction_id = self
698 .transaction_manager
699 .last_assigned_transaction_id()
700 .map_or(0, |t| t.0);
701 let node_count = self.lpg_store().node_count() as u64;
702 let edge_count = self.lpg_store().edge_count() as u64;
703
704 let fm = GrafeoFileManager::create(path)?;
705 fm.write_snapshot(
706 &snapshot_data,
707 epoch.0,
708 transaction_id,
709 node_count,
710 edge_count,
711 )?;
712 Ok(())
713 }
714
715 pub fn to_memory(&self) -> Result<Self> {
722 let config = Config::in_memory();
723 let target = Self::with_config(config)?;
724
725 for node in self.lpg_store().all_nodes() {
727 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
728 target
729 .lpg_store()
730 .create_node_with_id(node.id, &label_refs)?;
731 for (key, value) in node.properties {
732 target
733 .lpg_store()
734 .set_node_property(node.id, key.as_str(), value);
735 }
736 }
737
738 for edge in self.lpg_store().all_edges() {
740 target
741 .lpg_store()
742 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
743 for (key, value) in edge.properties {
744 target
745 .lpg_store()
746 .set_edge_property(edge.id, key.as_str(), value);
747 }
748 }
749
750 for graph_name in self.lpg_store().graph_names() {
752 if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
753 target
754 .lpg_store()
755 .create_graph(&graph_name)
756 .map_err(|e| Error::Internal(e.to_string()))?;
757 if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
758 for node in src_graph.all_nodes() {
759 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
760 dst_graph.create_node_with_id(node.id, &label_refs)?;
761 for (key, value) in node.properties {
762 dst_graph.set_node_property(node.id, key.as_str(), value);
763 }
764 }
765 for edge in src_graph.all_edges() {
766 dst_graph.create_edge_with_id(
767 edge.id,
768 edge.src,
769 edge.dst,
770 &edge.edge_type,
771 )?;
772 for (key, value) in edge.properties {
773 dst_graph.set_edge_property(edge.id, key.as_str(), value);
774 }
775 }
776 }
777 }
778 }
779
780 #[cfg(feature = "rdf")]
782 {
783 for triple in self.rdf_store.triples() {
784 target.rdf_store.insert((*triple).clone());
785 }
786 for name in self.rdf_store.graph_names() {
787 if let Some(src_graph) = self.rdf_store.graph(&name) {
788 let dst_graph = target.rdf_store.graph_or_create(&name);
789 for triple in src_graph.triples() {
790 dst_graph.insert((*triple).clone());
791 }
792 }
793 }
794 }
795
796 Ok(target)
797 }
798
799 #[cfg(feature = "wal")]
808 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
809 let source = Self::open(path)?;
811
812 let target = source.to_memory()?;
814
815 source.close()?;
817
818 Ok(target)
819 }
820
821 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
839 let nodes = collect_snapshot_nodes(self.lpg_store());
840 let edges = collect_snapshot_edges(self.lpg_store());
841
842 let named_graphs: Vec<NamedGraphSnapshot> = self
844 .lpg_store()
845 .graph_names()
846 .into_iter()
847 .filter_map(|name| {
848 self.lpg_store()
849 .graph(&name)
850 .map(|graph_store| NamedGraphSnapshot {
851 name,
852 nodes: collect_snapshot_nodes(&graph_store),
853 edges: collect_snapshot_edges(&graph_store),
854 })
855 })
856 .collect();
857
858 #[cfg(feature = "rdf")]
860 let rdf_triples = collect_rdf_triples(&self.rdf_store);
861 #[cfg(not(feature = "rdf"))]
862 let rdf_triples = Vec::new();
863
864 #[cfg(feature = "rdf")]
865 let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
866 .rdf_store
867 .graph_names()
868 .into_iter()
869 .filter_map(|name| {
870 self.rdf_store
871 .graph(&name)
872 .map(|graph| RdfNamedGraphSnapshot {
873 name,
874 triples: collect_rdf_triples(&graph),
875 })
876 })
877 .collect();
878 #[cfg(not(feature = "rdf"))]
879 let rdf_named_graphs = Vec::new();
880
881 let schema = collect_schema(&self.catalog);
882 let indexes = collect_index_metadata(self.lpg_store());
883
884 let snapshot = Snapshot {
885 version: SNAPSHOT_VERSION,
886 nodes,
887 edges,
888 named_graphs,
889 rdf_triples,
890 rdf_named_graphs,
891 schema,
892 indexes,
893 #[cfg(feature = "temporal")]
894 epoch: self.transaction_manager.current_epoch().as_u64(),
895 #[cfg(not(feature = "temporal"))]
896 epoch: 0,
897 };
898
899 let config = bincode::config::standard();
900 bincode::serde::encode_to_vec(&snapshot, config)
901 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
902 }
903
904 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
918 if data.is_empty() {
919 return Err(Error::Internal("empty snapshot data".to_string()));
920 }
921
922 let version = data[0];
923 if version != 4 {
924 return Err(Error::Internal(format!(
925 "unsupported snapshot version: {version} (expected 4)"
926 )));
927 }
928
929 let config = bincode::config::standard();
930 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
931 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
932
933 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
935
936 for ng in &snapshot.named_graphs {
938 validate_snapshot_data(&ng.nodes, &ng.edges)?;
939 }
940
941 let db = Self::new_in_memory();
942 populate_store_from_snapshot(db.lpg_store(), snapshot.nodes, snapshot.edges)?;
943
944 #[cfg(feature = "temporal")]
946 {
947 let epoch = EpochId::new(snapshot.epoch);
948 db.lpg_store().sync_epoch(epoch);
949 db.transaction_manager.sync_epoch(epoch);
950 }
951
952 #[cfg(feature = "temporal")]
954 let snapshot_epoch = EpochId::new(snapshot.epoch);
955
956 for ng in snapshot.named_graphs {
958 db.lpg_store()
959 .create_graph(&ng.name)
960 .map_err(|e| Error::Internal(e.to_string()))?;
961 if let Some(graph_store) = db.lpg_store().graph(&ng.name) {
962 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
963 #[cfg(feature = "temporal")]
966 graph_store.sync_epoch(snapshot_epoch);
967 }
968 }
969
970 #[cfg(feature = "rdf")]
972 {
973 populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
974 for rng in &snapshot.rdf_named_graphs {
975 let graph = db.rdf_store.graph_or_create(&rng.name);
976 populate_rdf_store(&graph, &rng.triples);
977 }
978 }
979
980 restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
982
983 restore_indexes_from_snapshot(&db, &snapshot.indexes);
985
986 Ok(db)
987 }
988
989 pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
1005 if data.is_empty() {
1006 return Err(Error::Internal("empty snapshot data".to_string()));
1007 }
1008
1009 let version = data[0];
1010 if version != 4 {
1011 return Err(Error::Internal(format!(
1012 "unsupported snapshot version: {version} (expected 4)"
1013 )));
1014 }
1015
1016 let config = bincode::config::standard();
1017 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1018 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1019
1020 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1022 for ng in &snapshot.named_graphs {
1023 validate_snapshot_data(&ng.nodes, &ng.edges)?;
1024 }
1025
1026 for name in self.lpg_store().graph_names() {
1028 self.lpg_store().drop_graph(&name);
1029 }
1030 self.lpg_store().clear();
1031
1032 populate_store_from_snapshot(self.lpg_store(), snapshot.nodes, snapshot.edges)?;
1033
1034 #[cfg(feature = "temporal")]
1036 let snapshot_epoch = {
1037 let epoch = EpochId::new(snapshot.epoch);
1038 self.lpg_store().sync_epoch(epoch);
1039 self.transaction_manager.sync_epoch(epoch);
1040 epoch
1041 };
1042
1043 for ng in snapshot.named_graphs {
1045 self.lpg_store()
1046 .create_graph(&ng.name)
1047 .map_err(|e| Error::Internal(e.to_string()))?;
1048 if let Some(graph_store) = self.lpg_store().graph(&ng.name) {
1049 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1050 #[cfg(feature = "temporal")]
1051 graph_store.sync_epoch(snapshot_epoch);
1052 }
1053 }
1054
1055 #[cfg(feature = "rdf")]
1057 {
1058 self.rdf_store.clear();
1060 for name in self.rdf_store.graph_names() {
1061 self.rdf_store.drop_graph(&name);
1062 }
1063 populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1064 for rng in &snapshot.rdf_named_graphs {
1065 let graph = self.rdf_store.graph_or_create(&rng.name);
1066 populate_rdf_store(&graph, &rng.triples);
1067 }
1068 }
1069
1070 restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
1072
1073 restore_indexes_from_snapshot(self, &snapshot.indexes);
1075
1076 Ok(())
1077 }
1078
1079 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1087 self.lpg_store().all_nodes()
1088 }
1089
1090 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1094 self.lpg_store().all_edges()
1095 }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100 use grafeo_common::types::{EdgeId, NodeId, Value};
1101
1102 use super::super::GrafeoDB;
1103 use super::{
1104 SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1105 };
1106
1107 #[test]
1108 fn test_restore_snapshot_basic() {
1109 let db = GrafeoDB::new_in_memory();
1110 let session = db.session();
1111
1112 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1114 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1115
1116 let snapshot = db.export_snapshot().unwrap();
1117
1118 session
1120 .execute("INSERT (:Person {name: 'Vincent'})")
1121 .unwrap();
1122 assert_eq!(db.lpg_store().node_count(), 3);
1123
1124 db.restore_snapshot(&snapshot).unwrap();
1126
1127 assert_eq!(db.lpg_store().node_count(), 2);
1128 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1129 assert_eq!(result.rows.len(), 2);
1130 }
1131
1132 #[test]
1133 fn test_restore_snapshot_validation_failure() {
1134 let db = GrafeoDB::new_in_memory();
1135 let session = db.session();
1136
1137 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1138
1139 let result = db.restore_snapshot(b"garbage");
1141 assert!(result.is_err());
1142
1143 assert_eq!(db.lpg_store().node_count(), 1);
1145 }
1146
1147 #[test]
1148 fn test_restore_snapshot_empty_db() {
1149 let db = GrafeoDB::new_in_memory();
1150
1151 let empty_snapshot = db.export_snapshot().unwrap();
1153
1154 let session = db.session();
1155 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1156 assert_eq!(db.lpg_store().node_count(), 1);
1157
1158 db.restore_snapshot(&empty_snapshot).unwrap();
1159 assert_eq!(db.lpg_store().node_count(), 0);
1160 }
1161
1162 #[test]
1163 fn test_restore_snapshot_with_edges() {
1164 let db = GrafeoDB::new_in_memory();
1165 let session = db.session();
1166
1167 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1168 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1169 session
1170 .execute(
1171 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1172 )
1173 .unwrap();
1174
1175 let snapshot = db.export_snapshot().unwrap();
1176 assert_eq!(db.lpg_store().edge_count(), 1);
1177
1178 session
1180 .execute("INSERT (:Person {name: 'Vincent'})")
1181 .unwrap();
1182
1183 db.restore_snapshot(&snapshot).unwrap();
1185 assert_eq!(db.lpg_store().node_count(), 2);
1186 assert_eq!(db.lpg_store().edge_count(), 1);
1187 }
1188
1189 #[test]
1190 fn test_restore_snapshot_preserves_sessions() {
1191 let db = GrafeoDB::new_in_memory();
1192 let session = db.session();
1193
1194 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1195 let snapshot = db.export_snapshot().unwrap();
1196
1197 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1199
1200 db.restore_snapshot(&snapshot).unwrap();
1202
1203 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1205 assert_eq!(result.rows.len(), 1);
1206 }
1207
1208 #[test]
1209 fn test_export_import_roundtrip() {
1210 let db = GrafeoDB::new_in_memory();
1211 let session = db.session();
1212
1213 session
1214 .execute("INSERT (:Person {name: 'Alix', age: 30})")
1215 .unwrap();
1216
1217 let snapshot = db.export_snapshot().unwrap();
1218 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1219 let session2 = db2.session();
1220
1221 let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1222 assert_eq!(result.rows.len(), 1);
1223 }
1224
1225 #[test]
1228 fn test_to_memory_empty() {
1229 let db = GrafeoDB::new_in_memory();
1230 let copy = db.to_memory().unwrap();
1231 assert_eq!(copy.lpg_store().node_count(), 0);
1232 assert_eq!(copy.lpg_store().edge_count(), 0);
1233 }
1234
1235 #[test]
1236 fn test_to_memory_copies_nodes_and_properties() {
1237 let db = GrafeoDB::new_in_memory();
1238 let session = db.session();
1239 session
1240 .execute("INSERT (:Person {name: 'Alix', age: 30})")
1241 .unwrap();
1242 session
1243 .execute("INSERT (:Person {name: 'Gus', age: 25})")
1244 .unwrap();
1245
1246 let copy = db.to_memory().unwrap();
1247 assert_eq!(copy.lpg_store().node_count(), 2);
1248
1249 let s2 = copy.session();
1250 let result = s2
1251 .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1252 .unwrap();
1253 assert_eq!(result.rows.len(), 2);
1254 assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1255 assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1256 }
1257
1258 #[test]
1259 fn test_to_memory_copies_edges_and_properties() {
1260 let db = GrafeoDB::new_in_memory();
1261 let a = db.create_node(&["Person"]);
1262 db.set_node_property(a, "name", "Alix".into());
1263 let b = db.create_node(&["Person"]);
1264 db.set_node_property(b, "name", "Gus".into());
1265 let edge = db.create_edge(a, b, "KNOWS");
1266 db.set_edge_property(edge, "since", Value::Int64(2020));
1267
1268 let copy = db.to_memory().unwrap();
1269 assert_eq!(copy.lpg_store().node_count(), 2);
1270 assert_eq!(copy.lpg_store().edge_count(), 1);
1271
1272 let s2 = copy.session();
1273 let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1274 assert_eq!(result.rows[0][0], Value::Int64(2020));
1275 }
1276
1277 #[test]
1278 fn test_to_memory_is_independent() {
1279 let db = GrafeoDB::new_in_memory();
1280 let session = db.session();
1281 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1282
1283 let copy = db.to_memory().unwrap();
1284
1285 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1287 assert_eq!(db.lpg_store().node_count(), 2);
1288 assert_eq!(copy.lpg_store().node_count(), 1);
1289 }
1290
1291 #[test]
1294 fn test_iter_nodes_empty() {
1295 let db = GrafeoDB::new_in_memory();
1296 assert_eq!(db.iter_nodes().count(), 0);
1297 }
1298
1299 #[test]
1300 fn test_iter_nodes_returns_all() {
1301 let db = GrafeoDB::new_in_memory();
1302 let id1 = db.create_node(&["Person"]);
1303 db.set_node_property(id1, "name", "Alix".into());
1304 let id2 = db.create_node(&["Animal"]);
1305 db.set_node_property(id2, "name", "Fido".into());
1306
1307 let nodes: Vec<_> = db.iter_nodes().collect();
1308 assert_eq!(nodes.len(), 2);
1309
1310 let names: Vec<_> = nodes
1311 .iter()
1312 .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1313 .map(|(_, v)| v.clone())
1314 .collect();
1315 assert!(names.contains(&Value::String("Alix".into())));
1316 assert!(names.contains(&Value::String("Fido".into())));
1317 }
1318
1319 #[test]
1320 fn test_iter_edges_empty() {
1321 let db = GrafeoDB::new_in_memory();
1322 assert_eq!(db.iter_edges().count(), 0);
1323 }
1324
1325 #[test]
1326 fn test_iter_edges_returns_all() {
1327 let db = GrafeoDB::new_in_memory();
1328 let a = db.create_node(&["A"]);
1329 let b = db.create_node(&["B"]);
1330 let c = db.create_node(&["C"]);
1331 db.create_edge(a, b, "R1");
1332 db.create_edge(b, c, "R2");
1333
1334 let edges: Vec<_> = db.iter_edges().collect();
1335 assert_eq!(edges.len(), 2);
1336
1337 let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1338 assert!(types.contains(&"R1"));
1339 assert!(types.contains(&"R2"));
1340 }
1341
1342 fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1345 let snap = Snapshot {
1346 version,
1347 nodes,
1348 edges,
1349 named_graphs: vec![],
1350 rdf_triples: vec![],
1351 rdf_named_graphs: vec![],
1352 schema: SnapshotSchema::default(),
1353 indexes: SnapshotIndexes::default(),
1354 epoch: 0,
1355 };
1356 bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1357 }
1358
1359 #[test]
1360 fn test_restore_rejects_unsupported_version() {
1361 let db = GrafeoDB::new_in_memory();
1362 let session = db.session();
1363 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1364
1365 let bytes = make_snapshot(99, vec![], vec![]);
1366
1367 let result = db.restore_snapshot(&bytes);
1368 assert!(result.is_err());
1369 let err = result.unwrap_err().to_string();
1370 assert!(err.contains("unsupported snapshot version"), "got: {err}");
1371
1372 assert_eq!(db.lpg_store().node_count(), 1);
1374 }
1375
1376 #[test]
1377 fn test_restore_rejects_duplicate_node_ids() {
1378 let db = GrafeoDB::new_in_memory();
1379 let session = db.session();
1380 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1381
1382 let bytes = make_snapshot(
1383 SNAPSHOT_VERSION,
1384 vec![
1385 SnapshotNode {
1386 id: NodeId::new(0),
1387 labels: vec!["A".into()],
1388 properties: vec![],
1389 },
1390 SnapshotNode {
1391 id: NodeId::new(0),
1392 labels: vec!["B".into()],
1393 properties: vec![],
1394 },
1395 ],
1396 vec![],
1397 );
1398
1399 let result = db.restore_snapshot(&bytes);
1400 assert!(result.is_err());
1401 let err = result.unwrap_err().to_string();
1402 assert!(err.contains("duplicate node ID"), "got: {err}");
1403 assert_eq!(db.lpg_store().node_count(), 1);
1404 }
1405
1406 #[test]
1407 fn test_restore_rejects_duplicate_edge_ids() {
1408 let db = GrafeoDB::new_in_memory();
1409
1410 let bytes = make_snapshot(
1411 SNAPSHOT_VERSION,
1412 vec![
1413 SnapshotNode {
1414 id: NodeId::new(0),
1415 labels: vec![],
1416 properties: vec![],
1417 },
1418 SnapshotNode {
1419 id: NodeId::new(1),
1420 labels: vec![],
1421 properties: vec![],
1422 },
1423 ],
1424 vec![
1425 SnapshotEdge {
1426 id: EdgeId::new(0),
1427 src: NodeId::new(0),
1428 dst: NodeId::new(1),
1429 edge_type: "REL".into(),
1430 properties: vec![],
1431 },
1432 SnapshotEdge {
1433 id: EdgeId::new(0),
1434 src: NodeId::new(0),
1435 dst: NodeId::new(1),
1436 edge_type: "REL".into(),
1437 properties: vec![],
1438 },
1439 ],
1440 );
1441
1442 let result = db.restore_snapshot(&bytes);
1443 assert!(result.is_err());
1444 let err = result.unwrap_err().to_string();
1445 assert!(err.contains("duplicate edge ID"), "got: {err}");
1446 }
1447
1448 #[test]
1449 fn test_restore_rejects_dangling_source() {
1450 let db = GrafeoDB::new_in_memory();
1451
1452 let bytes = make_snapshot(
1453 SNAPSHOT_VERSION,
1454 vec![SnapshotNode {
1455 id: NodeId::new(0),
1456 labels: vec![],
1457 properties: vec![],
1458 }],
1459 vec![SnapshotEdge {
1460 id: EdgeId::new(0),
1461 src: NodeId::new(999),
1462 dst: NodeId::new(0),
1463 edge_type: "REL".into(),
1464 properties: vec![],
1465 }],
1466 );
1467
1468 let result = db.restore_snapshot(&bytes);
1469 assert!(result.is_err());
1470 let err = result.unwrap_err().to_string();
1471 assert!(err.contains("non-existent source node"), "got: {err}");
1472 }
1473
1474 #[test]
1475 fn test_restore_rejects_dangling_destination() {
1476 let db = GrafeoDB::new_in_memory();
1477
1478 let bytes = make_snapshot(
1479 SNAPSHOT_VERSION,
1480 vec![SnapshotNode {
1481 id: NodeId::new(0),
1482 labels: vec![],
1483 properties: vec![],
1484 }],
1485 vec![SnapshotEdge {
1486 id: EdgeId::new(0),
1487 src: NodeId::new(0),
1488 dst: NodeId::new(999),
1489 edge_type: "REL".into(),
1490 properties: vec![],
1491 }],
1492 );
1493
1494 let result = db.restore_snapshot(&bytes);
1495 assert!(result.is_err());
1496 let err = result.unwrap_err().to_string();
1497 assert!(err.contains("non-existent destination node"), "got: {err}");
1498 }
1499
1500 #[test]
1503 fn test_snapshot_roundtrip_property_index() {
1504 let db = GrafeoDB::new_in_memory();
1505 let session = db.session();
1506
1507 session
1508 .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1509 .unwrap();
1510 db.create_property_index("email");
1511 assert!(db.has_property_index("email"));
1512
1513 let snapshot = db.export_snapshot().unwrap();
1514 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1515
1516 assert!(db2.has_property_index("email"));
1517
1518 let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1520 assert_eq!(found.len(), 1);
1521 }
1522
1523 #[cfg(feature = "vector-index")]
1524 #[test]
1525 fn test_snapshot_roundtrip_vector_index() {
1526 use std::sync::Arc;
1527
1528 let db = GrafeoDB::new_in_memory();
1529
1530 let n1 = db.create_node(&["Doc"]);
1531 db.set_node_property(
1532 n1,
1533 "embedding",
1534 Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1535 );
1536 let n2 = db.create_node(&["Doc"]);
1537 db.set_node_property(
1538 n2,
1539 "embedding",
1540 Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1541 );
1542
1543 db.create_vector_index("Doc", "embedding", None, Some("cosine"), Some(4), Some(32))
1544 .unwrap();
1545
1546 let snapshot = db.export_snapshot().unwrap();
1547 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1548
1549 let results = db2
1551 .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1552 .unwrap();
1553 assert_eq!(results.len(), 2);
1554 assert_eq!(results[0].0, n1);
1556 }
1557
1558 #[cfg(feature = "text-index")]
1559 #[test]
1560 fn test_snapshot_roundtrip_text_index() {
1561 let db = GrafeoDB::new_in_memory();
1562
1563 let n1 = db.create_node(&["Article"]);
1564 db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1565 let n2 = db.create_node(&["Article"]);
1566 db.set_node_property(n2, "body", Value::String("python web framework".into()));
1567
1568 db.create_text_index("Article", "body").unwrap();
1569
1570 let snapshot = db.export_snapshot().unwrap();
1571 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1572
1573 let results = db2
1575 .text_search("Article", "body", "graph database", 10)
1576 .unwrap();
1577 assert_eq!(results.len(), 1);
1578 assert_eq!(results[0].0, n1);
1579 }
1580
1581 #[test]
1582 fn test_snapshot_roundtrip_property_index_via_restore() {
1583 let db = GrafeoDB::new_in_memory();
1584 let session = db.session();
1585
1586 session
1587 .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1588 .unwrap();
1589 db.create_property_index("email");
1590
1591 let snapshot = db.export_snapshot().unwrap();
1592
1593 session
1595 .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1596 .unwrap();
1597 db.drop_property_index("email");
1598 assert!(!db.has_property_index("email"));
1599
1600 db.restore_snapshot(&snapshot).unwrap();
1602 assert!(db.has_property_index("email"));
1603 }
1604}