1#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15use crate::catalog::{
16 EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
17};
18
19const SNAPSHOT_VERSION: u8 = 4;
21
22#[derive(serde::Serialize, serde::Deserialize)]
25struct Snapshot {
26 version: u8,
27 nodes: Vec<SnapshotNode>,
28 edges: Vec<SnapshotEdge>,
29 named_graphs: Vec<NamedGraphSnapshot>,
30 rdf_triples: Vec<SnapshotTriple>,
31 rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
32 schema: SnapshotSchema,
33 indexes: SnapshotIndexes,
34 epoch: u64,
36}
37
38#[derive(serde::Serialize, serde::Deserialize, Default)]
40struct SnapshotSchema {
41 node_types: Vec<NodeTypeDefinition>,
42 edge_types: Vec<EdgeTypeDefinition>,
43 graph_types: Vec<GraphTypeDefinition>,
44 procedures: Vec<ProcedureDefinition>,
45 schemas: Vec<String>,
46 graph_type_bindings: Vec<(String, String)>,
47}
48
49#[derive(serde::Serialize, serde::Deserialize, Default)]
51struct SnapshotIndexes {
52 property_indexes: Vec<String>,
53 vector_indexes: Vec<SnapshotVectorIndex>,
54 text_indexes: Vec<SnapshotTextIndex>,
55}
56
57#[derive(serde::Serialize, serde::Deserialize)]
59struct SnapshotVectorIndex {
60 label: String,
61 property: String,
62 dimensions: usize,
63 metric: grafeo_core::index::vector::DistanceMetric,
64 m: usize,
65 ef_construction: usize,
66}
67
68#[derive(serde::Serialize, serde::Deserialize)]
70struct SnapshotTextIndex {
71 label: String,
72 property: String,
73}
74
75#[derive(serde::Serialize, serde::Deserialize)]
77struct NamedGraphSnapshot {
78 name: String,
79 nodes: Vec<SnapshotNode>,
80 edges: Vec<SnapshotEdge>,
81}
82
83#[derive(serde::Serialize, serde::Deserialize)]
85struct SnapshotTriple {
86 subject: String,
87 predicate: String,
88 object: String,
89}
90
91#[derive(serde::Serialize, serde::Deserialize)]
93struct RdfNamedGraphSnapshot {
94 name: String,
95 triples: Vec<SnapshotTriple>,
96}
97
98#[derive(serde::Serialize, serde::Deserialize)]
99struct SnapshotNode {
100 id: NodeId,
101 labels: Vec<String>,
102 properties: Vec<(String, Vec<(EpochId, Value)>)>,
104}
105
106#[derive(serde::Serialize, serde::Deserialize)]
107struct SnapshotEdge {
108 id: EdgeId,
109 src: NodeId,
110 dst: NodeId,
111 edge_type: String,
112 properties: Vec<(String, Vec<(EpochId, Value)>)>,
114}
115
116fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
121 store
122 .all_nodes()
123 .map(|n| {
124 #[cfg(feature = "temporal")]
125 let properties = store
126 .node_property_history(n.id)
127 .into_iter()
128 .map(|(k, entries)| (k.to_string(), entries))
129 .collect();
130
131 #[cfg(not(feature = "temporal"))]
132 let properties = n
133 .properties
134 .into_iter()
135 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
136 .collect();
137
138 SnapshotNode {
139 id: n.id,
140 labels: n.labels.iter().map(|l| l.to_string()).collect(),
141 properties,
142 }
143 })
144 .collect()
145}
146
147fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
152 store
153 .all_edges()
154 .map(|e| {
155 #[cfg(feature = "temporal")]
156 let properties = store
157 .edge_property_history(e.id)
158 .into_iter()
159 .map(|(k, entries)| (k.to_string(), entries))
160 .collect();
161
162 #[cfg(not(feature = "temporal"))]
163 let properties = e
164 .properties
165 .into_iter()
166 .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
167 .collect();
168
169 SnapshotEdge {
170 id: e.id,
171 src: e.src,
172 dst: e.dst,
173 edge_type: e.edge_type.to_string(),
174 properties,
175 }
176 })
177 .collect()
178}
179
180fn populate_store_from_snapshot(
185 store: &grafeo_core::graph::lpg::LpgStore,
186 nodes: Vec<SnapshotNode>,
187 edges: Vec<SnapshotEdge>,
188) -> Result<()> {
189 for node in nodes {
190 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
191 store.create_node_with_id(node.id, &label_refs)?;
192 for (key, entries) in node.properties {
193 #[cfg(feature = "temporal")]
194 for (epoch, value) in entries {
195 store.set_node_property_at_epoch(node.id, &key, value, epoch);
196 }
197 #[cfg(not(feature = "temporal"))]
198 if let Some((_, value)) = entries.into_iter().last() {
199 store.set_node_property(node.id, &key, value);
200 }
201 }
202 }
203 for edge in edges {
204 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
205 for (key, entries) in edge.properties {
206 #[cfg(feature = "temporal")]
207 for (epoch, value) in entries {
208 store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
209 }
210 #[cfg(not(feature = "temporal"))]
211 if let Some((_, value)) = entries.into_iter().last() {
212 store.set_edge_property(edge.id, &key, value);
213 }
214 }
215 }
216 Ok(())
217}
218
219fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
221 let mut node_ids = HashSet::with_capacity(nodes.len());
222 for node in nodes {
223 if !node_ids.insert(node.id) {
224 return Err(Error::Internal(format!(
225 "snapshot contains duplicate node ID {}",
226 node.id
227 )));
228 }
229 }
230 let mut edge_ids = HashSet::with_capacity(edges.len());
231 for edge in edges {
232 if !edge_ids.insert(edge.id) {
233 return Err(Error::Internal(format!(
234 "snapshot contains duplicate edge ID {}",
235 edge.id
236 )));
237 }
238 if !node_ids.contains(&edge.src) {
239 return Err(Error::Internal(format!(
240 "snapshot edge {} references non-existent source node {}",
241 edge.id, edge.src
242 )));
243 }
244 if !node_ids.contains(&edge.dst) {
245 return Err(Error::Internal(format!(
246 "snapshot edge {} references non-existent destination node {}",
247 edge.id, edge.dst
248 )));
249 }
250 }
251 Ok(())
252}
253
254#[cfg(feature = "rdf")]
256fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
257 store
258 .triples()
259 .into_iter()
260 .map(|t| SnapshotTriple {
261 subject: t.subject().to_string(),
262 predicate: t.predicate().to_string(),
263 object: t.object().to_string(),
264 })
265 .collect()
266}
267
268#[cfg(feature = "rdf")]
270fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
271 use grafeo_core::graph::rdf::{Term, Triple};
272 for triple in triples {
273 if let (Some(s), Some(p), Some(o)) = (
274 Term::from_ntriples(&triple.subject),
275 Term::from_ntriples(&triple.predicate),
276 Term::from_ntriples(&triple.object),
277 ) {
278 store.insert(Triple::new(s, p, o));
279 }
280 }
281}
282
283#[cfg(feature = "grafeo-file")]
289pub(super) fn load_snapshot_into_store(
290 store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
291 catalog: &std::sync::Arc<crate::catalog::Catalog>,
292 #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
293 data: &[u8],
294) -> grafeo_common::utils::error::Result<()> {
295 use grafeo_common::utils::error::Error;
296
297 let config = bincode::config::standard();
298 let (snapshot, _) =
299 bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
300 Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
301 })?;
302
303 populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
304
305 #[cfg(feature = "temporal")]
308 store.sync_epoch(EpochId::new(snapshot.epoch));
309
310 for graph in &snapshot.named_graphs {
311 store
312 .create_graph(&graph.name)
313 .map_err(|e| Error::Internal(e.to_string()))?;
314 if let Some(graph_store) = store.graph(&graph.name) {
315 populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
316 #[cfg(feature = "temporal")]
317 graph_store.sync_epoch(EpochId::new(snapshot.epoch));
318 }
319 }
320 restore_schema_from_snapshot(catalog, &snapshot.schema);
321
322 #[cfg(feature = "rdf")]
324 {
325 populate_rdf_store(rdf_store, &snapshot.rdf_triples);
326 for rdf_graph in &snapshot.rdf_named_graphs {
327 rdf_store.create_graph(&rdf_graph.name);
328 if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
329 populate_rdf_store(&graph_store, &rdf_graph.triples);
330 }
331 }
332 }
333
334 Ok(())
335}
336
337#[cfg(feature = "grafeo-file")]
339fn populate_store_from_snapshot_ref(
340 store: &grafeo_core::graph::lpg::LpgStore,
341 nodes: &[SnapshotNode],
342 edges: &[SnapshotEdge],
343) -> grafeo_common::utils::error::Result<()> {
344 for node in nodes {
345 let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
346 store.create_node_with_id(node.id, &label_refs)?;
347 for (key, entries) in &node.properties {
348 #[cfg(feature = "temporal")]
349 for (epoch, value) in entries {
350 store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
351 }
352 #[cfg(not(feature = "temporal"))]
353 if let Some((_, value)) = entries.last() {
354 store.set_node_property(node.id, key, value.clone());
355 }
356 }
357 }
358 for edge in edges {
359 store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
360 for (key, entries) in &edge.properties {
361 #[cfg(feature = "temporal")]
362 for (epoch, value) in entries {
363 store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
364 }
365 #[cfg(not(feature = "temporal"))]
366 if let Some((_, value)) = entries.last() {
367 store.set_edge_property(edge.id, key, value.clone());
368 }
369 }
370 }
371 Ok(())
372}
373
374fn restore_schema_from_snapshot(
376 catalog: &std::sync::Arc<crate::catalog::Catalog>,
377 schema: &SnapshotSchema,
378) {
379 for def in &schema.node_types {
380 catalog.register_or_replace_node_type(def.clone());
381 }
382 for def in &schema.edge_types {
383 catalog.register_or_replace_edge_type_def(def.clone());
384 }
385 for def in &schema.graph_types {
386 let _ = catalog.register_graph_type(def.clone());
387 }
388 for def in &schema.procedures {
389 catalog.replace_procedure(def.clone()).ok();
390 }
391 for name in &schema.schemas {
392 let _ = catalog.register_schema_namespace(name.clone());
393 }
394 for (graph_name, type_name) in &schema.graph_type_bindings {
395 let _ = catalog.bind_graph_type(graph_name, type_name.clone());
396 }
397}
398
399fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
401 SnapshotSchema {
402 node_types: catalog.all_node_type_defs(),
403 edge_types: catalog.all_edge_type_defs(),
404 graph_types: catalog.all_graph_type_defs(),
405 procedures: catalog.all_procedure_defs(),
406 schemas: catalog.schema_names(),
407 graph_type_bindings: catalog.all_graph_type_bindings(),
408 }
409}
410
411fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
416 for name in &indexes.property_indexes {
417 db.store.create_property_index(name);
418 }
419
420 #[cfg(feature = "vector-index")]
421 for vi in &indexes.vector_indexes {
422 if let Err(err) = db.create_vector_index(
423 &vi.label,
424 &vi.property,
425 Some(vi.dimensions),
426 Some(vi.metric.name()),
427 Some(vi.m),
428 Some(vi.ef_construction),
429 ) {
430 tracing::warn!(
431 "Failed to restore vector index :{label}({property}): {err}",
432 label = vi.label,
433 property = vi.property,
434 );
435 }
436 }
437
438 #[cfg(feature = "text-index")]
439 for ti in &indexes.text_indexes {
440 if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
441 tracing::warn!(
442 "Failed to restore text index :{label}({property}): {err}",
443 label = ti.label,
444 property = ti.property,
445 );
446 }
447 }
448}
449
450fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
452 let property_indexes = store.property_index_keys();
453
454 #[cfg(feature = "vector-index")]
455 let vector_indexes: Vec<SnapshotVectorIndex> = store
456 .vector_index_entries()
457 .into_iter()
458 .filter_map(|(key, index)| {
459 let (label, property) = key.split_once(':')?;
460 let config = index.config();
461 Some(SnapshotVectorIndex {
462 label: label.to_string(),
463 property: property.to_string(),
464 dimensions: config.dimensions,
465 metric: config.metric,
466 m: config.m,
467 ef_construction: config.ef_construction,
468 })
469 })
470 .collect();
471 #[cfg(not(feature = "vector-index"))]
472 let vector_indexes = Vec::new();
473
474 #[cfg(feature = "text-index")]
475 let text_indexes: Vec<SnapshotTextIndex> = store
476 .text_index_entries()
477 .into_iter()
478 .filter_map(|(key, _)| {
479 let (label, property) = key.split_once(':')?;
480 Some(SnapshotTextIndex {
481 label: label.to_string(),
482 property: property.to_string(),
483 })
484 })
485 .collect();
486 #[cfg(not(feature = "text-index"))]
487 let text_indexes = Vec::new();
488
489 SnapshotIndexes {
490 property_indexes,
491 vector_indexes,
492 text_indexes,
493 }
494}
495
496impl super::GrafeoDB {
497 #[cfg(feature = "wal")]
516 pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
517 let path = path.as_ref();
518
519 #[cfg(feature = "grafeo-file")]
521 if path.extension().is_some_and(|ext| ext == "grafeo") {
522 return self.save_as_grafeo_file(path);
523 }
524
525 let target_config = Config::persistent(path);
527 let target = Self::with_config(target_config)?;
528
529 for node in self.store.all_nodes() {
531 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
532 target.store.create_node_with_id(node.id, &label_refs)?;
533
534 target.log_wal(&WalRecord::CreateNode {
536 id: node.id,
537 labels: node.labels.iter().map(|s| s.to_string()).collect(),
538 })?;
539
540 for (key, value) in node.properties {
542 target
543 .store
544 .set_node_property(node.id, key.as_str(), value.clone());
545 target.log_wal(&WalRecord::SetNodeProperty {
546 id: node.id,
547 key: key.to_string(),
548 value,
549 })?;
550 }
551 }
552
553 for edge in self.store.all_edges() {
555 target
556 .store
557 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
558
559 target.log_wal(&WalRecord::CreateEdge {
561 id: edge.id,
562 src: edge.src,
563 dst: edge.dst,
564 edge_type: edge.edge_type.to_string(),
565 })?;
566
567 for (key, value) in edge.properties {
569 target
570 .store
571 .set_edge_property(edge.id, key.as_str(), value.clone());
572 target.log_wal(&WalRecord::SetEdgeProperty {
573 id: edge.id,
574 key: key.to_string(),
575 value,
576 })?;
577 }
578 }
579
580 for graph_name in self.store.graph_names() {
582 if let Some(src_graph) = self.store.graph(&graph_name) {
583 target.log_wal(&WalRecord::CreateNamedGraph {
584 name: graph_name.clone(),
585 })?;
586 target
587 .store
588 .create_graph(&graph_name)
589 .map_err(|e| Error::Internal(e.to_string()))?;
590
591 if let Some(dst_graph) = target.store.graph(&graph_name) {
592 target.log_wal(&WalRecord::SwitchGraph {
594 name: Some(graph_name.clone()),
595 })?;
596
597 for node in src_graph.all_nodes() {
598 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
599 dst_graph.create_node_with_id(node.id, &label_refs)?;
600 target.log_wal(&WalRecord::CreateNode {
601 id: node.id,
602 labels: node.labels.iter().map(|s| s.to_string()).collect(),
603 })?;
604 for (key, value) in node.properties {
605 dst_graph.set_node_property(node.id, key.as_str(), value.clone());
606 target.log_wal(&WalRecord::SetNodeProperty {
607 id: node.id,
608 key: key.to_string(),
609 value,
610 })?;
611 }
612 }
613 for edge in src_graph.all_edges() {
614 dst_graph.create_edge_with_id(
615 edge.id,
616 edge.src,
617 edge.dst,
618 &edge.edge_type,
619 )?;
620 target.log_wal(&WalRecord::CreateEdge {
621 id: edge.id,
622 src: edge.src,
623 dst: edge.dst,
624 edge_type: edge.edge_type.to_string(),
625 })?;
626 for (key, value) in edge.properties {
627 dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
628 target.log_wal(&WalRecord::SetEdgeProperty {
629 id: edge.id,
630 key: key.to_string(),
631 value,
632 })?;
633 }
634 }
635 }
636 }
637 }
638
639 if !self.store.graph_names().is_empty() {
641 target.log_wal(&WalRecord::SwitchGraph { name: None })?;
642 }
643
644 #[cfg(feature = "rdf")]
646 {
647 for triple in self.rdf_store.triples() {
648 let record = WalRecord::InsertRdfTriple {
649 subject: triple.subject().to_string(),
650 predicate: triple.predicate().to_string(),
651 object: triple.object().to_string(),
652 graph: None,
653 };
654 target.rdf_store.insert((*triple).clone());
655 target.log_wal(&record)?;
656 }
657 for name in self.rdf_store.graph_names() {
658 target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
659 if let Some(src_graph) = self.rdf_store.graph(&name) {
660 let dst_graph = target.rdf_store.graph_or_create(&name);
661 for triple in src_graph.triples() {
662 let record = WalRecord::InsertRdfTriple {
663 subject: triple.subject().to_string(),
664 predicate: triple.predicate().to_string(),
665 object: triple.object().to_string(),
666 graph: Some(name.clone()),
667 };
668 dst_graph.insert((*triple).clone());
669 target.log_wal(&record)?;
670 }
671 }
672 }
673 }
674
675 target.close()?;
677
678 Ok(())
679 }
680
681 #[cfg(feature = "grafeo-file")]
688 fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
689 use grafeo_adapters::storage::file::GrafeoFileManager;
690
691 let snapshot_data = self.export_snapshot()?;
692 let epoch = self.store.current_epoch();
693 let transaction_id = self
694 .transaction_manager
695 .last_assigned_transaction_id()
696 .map_or(0, |t| t.0);
697 let node_count = self.store.node_count() as u64;
698 let edge_count = self.store.edge_count() as u64;
699
700 let fm = GrafeoFileManager::create(path)?;
701 fm.write_snapshot(
702 &snapshot_data,
703 epoch.0,
704 transaction_id,
705 node_count,
706 edge_count,
707 )?;
708 Ok(())
709 }
710
711 pub fn to_memory(&self) -> Result<Self> {
718 let config = Config::in_memory();
719 let target = Self::with_config(config)?;
720
721 for node in self.store.all_nodes() {
723 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
724 target.store.create_node_with_id(node.id, &label_refs)?;
725 for (key, value) in node.properties {
726 target.store.set_node_property(node.id, key.as_str(), value);
727 }
728 }
729
730 for edge in self.store.all_edges() {
732 target
733 .store
734 .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
735 for (key, value) in edge.properties {
736 target.store.set_edge_property(edge.id, key.as_str(), value);
737 }
738 }
739
740 for graph_name in self.store.graph_names() {
742 if let Some(src_graph) = self.store.graph(&graph_name) {
743 target
744 .store
745 .create_graph(&graph_name)
746 .map_err(|e| Error::Internal(e.to_string()))?;
747 if let Some(dst_graph) = target.store.graph(&graph_name) {
748 for node in src_graph.all_nodes() {
749 let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
750 dst_graph.create_node_with_id(node.id, &label_refs)?;
751 for (key, value) in node.properties {
752 dst_graph.set_node_property(node.id, key.as_str(), value);
753 }
754 }
755 for edge in src_graph.all_edges() {
756 dst_graph.create_edge_with_id(
757 edge.id,
758 edge.src,
759 edge.dst,
760 &edge.edge_type,
761 )?;
762 for (key, value) in edge.properties {
763 dst_graph.set_edge_property(edge.id, key.as_str(), value);
764 }
765 }
766 }
767 }
768 }
769
770 #[cfg(feature = "rdf")]
772 {
773 for triple in self.rdf_store.triples() {
774 target.rdf_store.insert((*triple).clone());
775 }
776 for name in self.rdf_store.graph_names() {
777 if let Some(src_graph) = self.rdf_store.graph(&name) {
778 let dst_graph = target.rdf_store.graph_or_create(&name);
779 for triple in src_graph.triples() {
780 dst_graph.insert((*triple).clone());
781 }
782 }
783 }
784 }
785
786 Ok(target)
787 }
788
789 #[cfg(feature = "wal")]
798 pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
799 let source = Self::open(path)?;
801
802 let target = source.to_memory()?;
804
805 source.close()?;
807
808 Ok(target)
809 }
810
811 pub fn export_snapshot(&self) -> Result<Vec<u8>> {
829 let nodes = collect_snapshot_nodes(&self.store);
830 let edges = collect_snapshot_edges(&self.store);
831
832 let named_graphs: Vec<NamedGraphSnapshot> = self
834 .store
835 .graph_names()
836 .into_iter()
837 .filter_map(|name| {
838 self.store
839 .graph(&name)
840 .map(|graph_store| NamedGraphSnapshot {
841 name,
842 nodes: collect_snapshot_nodes(&graph_store),
843 edges: collect_snapshot_edges(&graph_store),
844 })
845 })
846 .collect();
847
848 #[cfg(feature = "rdf")]
850 let rdf_triples = collect_rdf_triples(&self.rdf_store);
851 #[cfg(not(feature = "rdf"))]
852 let rdf_triples = Vec::new();
853
854 #[cfg(feature = "rdf")]
855 let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
856 .rdf_store
857 .graph_names()
858 .into_iter()
859 .filter_map(|name| {
860 self.rdf_store
861 .graph(&name)
862 .map(|graph| RdfNamedGraphSnapshot {
863 name,
864 triples: collect_rdf_triples(&graph),
865 })
866 })
867 .collect();
868 #[cfg(not(feature = "rdf"))]
869 let rdf_named_graphs = Vec::new();
870
871 let schema = collect_schema(&self.catalog);
872 let indexes = collect_index_metadata(&self.store);
873
874 let snapshot = Snapshot {
875 version: SNAPSHOT_VERSION,
876 nodes,
877 edges,
878 named_graphs,
879 rdf_triples,
880 rdf_named_graphs,
881 schema,
882 indexes,
883 #[cfg(feature = "temporal")]
884 epoch: self.transaction_manager.current_epoch().as_u64(),
885 #[cfg(not(feature = "temporal"))]
886 epoch: 0,
887 };
888
889 let config = bincode::config::standard();
890 bincode::serde::encode_to_vec(&snapshot, config)
891 .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
892 }
893
894 pub fn import_snapshot(data: &[u8]) -> Result<Self> {
908 if data.is_empty() {
909 return Err(Error::Internal("empty snapshot data".to_string()));
910 }
911
912 let version = data[0];
913 if version != 4 {
914 return Err(Error::Internal(format!(
915 "unsupported snapshot version: {version} (expected 4)"
916 )));
917 }
918
919 let config = bincode::config::standard();
920 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
921 .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
922
923 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
925
926 for ng in &snapshot.named_graphs {
928 validate_snapshot_data(&ng.nodes, &ng.edges)?;
929 }
930
931 let db = Self::new_in_memory();
932 populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
933
934 #[cfg(feature = "temporal")]
936 {
937 let epoch = EpochId::new(snapshot.epoch);
938 db.store.sync_epoch(epoch);
939 db.transaction_manager.sync_epoch(epoch);
940 }
941
942 #[cfg(feature = "temporal")]
944 let snapshot_epoch = EpochId::new(snapshot.epoch);
945
946 for ng in snapshot.named_graphs {
948 db.store
949 .create_graph(&ng.name)
950 .map_err(|e| Error::Internal(e.to_string()))?;
951 if let Some(graph_store) = db.store.graph(&ng.name) {
952 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
953 #[cfg(feature = "temporal")]
956 graph_store.sync_epoch(snapshot_epoch);
957 }
958 }
959
960 #[cfg(feature = "rdf")]
962 {
963 populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
964 for rng in &snapshot.rdf_named_graphs {
965 let graph = db.rdf_store.graph_or_create(&rng.name);
966 populate_rdf_store(&graph, &rng.triples);
967 }
968 }
969
970 restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
972
973 restore_indexes_from_snapshot(&db, &snapshot.indexes);
975
976 Ok(db)
977 }
978
979 pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
995 if data.is_empty() {
996 return Err(Error::Internal("empty snapshot data".to_string()));
997 }
998
999 let version = data[0];
1000 if version != 4 {
1001 return Err(Error::Internal(format!(
1002 "unsupported snapshot version: {version} (expected 4)"
1003 )));
1004 }
1005
1006 let config = bincode::config::standard();
1007 let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1008 .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1009
1010 validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1012 for ng in &snapshot.named_graphs {
1013 validate_snapshot_data(&ng.nodes, &ng.edges)?;
1014 }
1015
1016 for name in self.store.graph_names() {
1018 self.store.drop_graph(&name);
1019 }
1020 self.store.clear();
1021
1022 populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
1023
1024 #[cfg(feature = "temporal")]
1026 let snapshot_epoch = {
1027 let epoch = EpochId::new(snapshot.epoch);
1028 self.store.sync_epoch(epoch);
1029 self.transaction_manager.sync_epoch(epoch);
1030 epoch
1031 };
1032
1033 for ng in snapshot.named_graphs {
1035 self.store
1036 .create_graph(&ng.name)
1037 .map_err(|e| Error::Internal(e.to_string()))?;
1038 if let Some(graph_store) = self.store.graph(&ng.name) {
1039 populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1040 #[cfg(feature = "temporal")]
1041 graph_store.sync_epoch(snapshot_epoch);
1042 }
1043 }
1044
1045 #[cfg(feature = "rdf")]
1047 {
1048 self.rdf_store.clear();
1050 for name in self.rdf_store.graph_names() {
1051 self.rdf_store.drop_graph(&name);
1052 }
1053 populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1054 for rng in &snapshot.rdf_named_graphs {
1055 let graph = self.rdf_store.graph_or_create(&rng.name);
1056 populate_rdf_store(&graph, &rng.triples);
1057 }
1058 }
1059
1060 restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
1062
1063 restore_indexes_from_snapshot(self, &snapshot.indexes);
1065
1066 Ok(())
1067 }
1068
1069 pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1077 self.store.all_nodes()
1078 }
1079
1080 pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1084 self.store.all_edges()
1085 }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090 use grafeo_common::types::{EdgeId, NodeId, Value};
1091
1092 use super::super::GrafeoDB;
1093 use super::{
1094 SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1095 };
1096
1097 #[test]
1098 fn test_restore_snapshot_basic() {
1099 let db = GrafeoDB::new_in_memory();
1100 let session = db.session();
1101
1102 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1104 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1105
1106 let snapshot = db.export_snapshot().unwrap();
1107
1108 session
1110 .execute("INSERT (:Person {name: 'Vincent'})")
1111 .unwrap();
1112 assert_eq!(db.store.node_count(), 3);
1113
1114 db.restore_snapshot(&snapshot).unwrap();
1116
1117 assert_eq!(db.store.node_count(), 2);
1118 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1119 assert_eq!(result.rows.len(), 2);
1120 }
1121
1122 #[test]
1123 fn test_restore_snapshot_validation_failure() {
1124 let db = GrafeoDB::new_in_memory();
1125 let session = db.session();
1126
1127 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1128
1129 let result = db.restore_snapshot(b"garbage");
1131 assert!(result.is_err());
1132
1133 assert_eq!(db.store.node_count(), 1);
1135 }
1136
1137 #[test]
1138 fn test_restore_snapshot_empty_db() {
1139 let db = GrafeoDB::new_in_memory();
1140
1141 let empty_snapshot = db.export_snapshot().unwrap();
1143
1144 let session = db.session();
1145 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1146 assert_eq!(db.store.node_count(), 1);
1147
1148 db.restore_snapshot(&empty_snapshot).unwrap();
1149 assert_eq!(db.store.node_count(), 0);
1150 }
1151
1152 #[test]
1153 fn test_restore_snapshot_with_edges() {
1154 let db = GrafeoDB::new_in_memory();
1155 let session = db.session();
1156
1157 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1158 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1159 session
1160 .execute(
1161 "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1162 )
1163 .unwrap();
1164
1165 let snapshot = db.export_snapshot().unwrap();
1166 assert_eq!(db.store.edge_count(), 1);
1167
1168 session
1170 .execute("INSERT (:Person {name: 'Vincent'})")
1171 .unwrap();
1172
1173 db.restore_snapshot(&snapshot).unwrap();
1175 assert_eq!(db.store.node_count(), 2);
1176 assert_eq!(db.store.edge_count(), 1);
1177 }
1178
1179 #[test]
1180 fn test_restore_snapshot_preserves_sessions() {
1181 let db = GrafeoDB::new_in_memory();
1182 let session = db.session();
1183
1184 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1185 let snapshot = db.export_snapshot().unwrap();
1186
1187 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1189
1190 db.restore_snapshot(&snapshot).unwrap();
1192
1193 let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1195 assert_eq!(result.rows.len(), 1);
1196 }
1197
1198 #[test]
1199 fn test_export_import_roundtrip() {
1200 let db = GrafeoDB::new_in_memory();
1201 let session = db.session();
1202
1203 session
1204 .execute("INSERT (:Person {name: 'Alix', age: 30})")
1205 .unwrap();
1206
1207 let snapshot = db.export_snapshot().unwrap();
1208 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1209 let session2 = db2.session();
1210
1211 let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1212 assert_eq!(result.rows.len(), 1);
1213 }
1214
1215 #[test]
1218 fn test_to_memory_empty() {
1219 let db = GrafeoDB::new_in_memory();
1220 let copy = db.to_memory().unwrap();
1221 assert_eq!(copy.store.node_count(), 0);
1222 assert_eq!(copy.store.edge_count(), 0);
1223 }
1224
1225 #[test]
1226 fn test_to_memory_copies_nodes_and_properties() {
1227 let db = GrafeoDB::new_in_memory();
1228 let session = db.session();
1229 session
1230 .execute("INSERT (:Person {name: 'Alix', age: 30})")
1231 .unwrap();
1232 session
1233 .execute("INSERT (:Person {name: 'Gus', age: 25})")
1234 .unwrap();
1235
1236 let copy = db.to_memory().unwrap();
1237 assert_eq!(copy.store.node_count(), 2);
1238
1239 let s2 = copy.session();
1240 let result = s2
1241 .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1242 .unwrap();
1243 assert_eq!(result.rows.len(), 2);
1244 assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1245 assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1246 }
1247
1248 #[test]
1249 fn test_to_memory_copies_edges_and_properties() {
1250 let db = GrafeoDB::new_in_memory();
1251 let a = db.create_node(&["Person"]);
1252 db.set_node_property(a, "name", "Alix".into());
1253 let b = db.create_node(&["Person"]);
1254 db.set_node_property(b, "name", "Gus".into());
1255 let edge = db.create_edge(a, b, "KNOWS");
1256 db.set_edge_property(edge, "since", Value::Int64(2020));
1257
1258 let copy = db.to_memory().unwrap();
1259 assert_eq!(copy.store.node_count(), 2);
1260 assert_eq!(copy.store.edge_count(), 1);
1261
1262 let s2 = copy.session();
1263 let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1264 assert_eq!(result.rows[0][0], Value::Int64(2020));
1265 }
1266
1267 #[test]
1268 fn test_to_memory_is_independent() {
1269 let db = GrafeoDB::new_in_memory();
1270 let session = db.session();
1271 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1272
1273 let copy = db.to_memory().unwrap();
1274
1275 session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1277 assert_eq!(db.store.node_count(), 2);
1278 assert_eq!(copy.store.node_count(), 1);
1279 }
1280
1281 #[test]
1284 fn test_iter_nodes_empty() {
1285 let db = GrafeoDB::new_in_memory();
1286 assert_eq!(db.iter_nodes().count(), 0);
1287 }
1288
1289 #[test]
1290 fn test_iter_nodes_returns_all() {
1291 let db = GrafeoDB::new_in_memory();
1292 let id1 = db.create_node(&["Person"]);
1293 db.set_node_property(id1, "name", "Alix".into());
1294 let id2 = db.create_node(&["Animal"]);
1295 db.set_node_property(id2, "name", "Fido".into());
1296
1297 let nodes: Vec<_> = db.iter_nodes().collect();
1298 assert_eq!(nodes.len(), 2);
1299
1300 let names: Vec<_> = nodes
1301 .iter()
1302 .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1303 .map(|(_, v)| v.clone())
1304 .collect();
1305 assert!(names.contains(&Value::String("Alix".into())));
1306 assert!(names.contains(&Value::String("Fido".into())));
1307 }
1308
1309 #[test]
1310 fn test_iter_edges_empty() {
1311 let db = GrafeoDB::new_in_memory();
1312 assert_eq!(db.iter_edges().count(), 0);
1313 }
1314
1315 #[test]
1316 fn test_iter_edges_returns_all() {
1317 let db = GrafeoDB::new_in_memory();
1318 let a = db.create_node(&["A"]);
1319 let b = db.create_node(&["B"]);
1320 let c = db.create_node(&["C"]);
1321 db.create_edge(a, b, "R1");
1322 db.create_edge(b, c, "R2");
1323
1324 let edges: Vec<_> = db.iter_edges().collect();
1325 assert_eq!(edges.len(), 2);
1326
1327 let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1328 assert!(types.contains(&"R1"));
1329 assert!(types.contains(&"R2"));
1330 }
1331
1332 fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1335 let snap = Snapshot {
1336 version,
1337 nodes,
1338 edges,
1339 named_graphs: vec![],
1340 rdf_triples: vec![],
1341 rdf_named_graphs: vec![],
1342 schema: SnapshotSchema::default(),
1343 indexes: SnapshotIndexes::default(),
1344 epoch: 0,
1345 };
1346 bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1347 }
1348
1349 #[test]
1350 fn test_restore_rejects_unsupported_version() {
1351 let db = GrafeoDB::new_in_memory();
1352 let session = db.session();
1353 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1354
1355 let bytes = make_snapshot(99, vec![], vec![]);
1356
1357 let result = db.restore_snapshot(&bytes);
1358 assert!(result.is_err());
1359 let err = result.unwrap_err().to_string();
1360 assert!(err.contains("unsupported snapshot version"), "got: {err}");
1361
1362 assert_eq!(db.store.node_count(), 1);
1364 }
1365
1366 #[test]
1367 fn test_restore_rejects_duplicate_node_ids() {
1368 let db = GrafeoDB::new_in_memory();
1369 let session = db.session();
1370 session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1371
1372 let bytes = make_snapshot(
1373 SNAPSHOT_VERSION,
1374 vec![
1375 SnapshotNode {
1376 id: NodeId::new(0),
1377 labels: vec!["A".into()],
1378 properties: vec![],
1379 },
1380 SnapshotNode {
1381 id: NodeId::new(0),
1382 labels: vec!["B".into()],
1383 properties: vec![],
1384 },
1385 ],
1386 vec![],
1387 );
1388
1389 let result = db.restore_snapshot(&bytes);
1390 assert!(result.is_err());
1391 let err = result.unwrap_err().to_string();
1392 assert!(err.contains("duplicate node ID"), "got: {err}");
1393 assert_eq!(db.store.node_count(), 1);
1394 }
1395
1396 #[test]
1397 fn test_restore_rejects_duplicate_edge_ids() {
1398 let db = GrafeoDB::new_in_memory();
1399
1400 let bytes = make_snapshot(
1401 SNAPSHOT_VERSION,
1402 vec![
1403 SnapshotNode {
1404 id: NodeId::new(0),
1405 labels: vec![],
1406 properties: vec![],
1407 },
1408 SnapshotNode {
1409 id: NodeId::new(1),
1410 labels: vec![],
1411 properties: vec![],
1412 },
1413 ],
1414 vec![
1415 SnapshotEdge {
1416 id: EdgeId::new(0),
1417 src: NodeId::new(0),
1418 dst: NodeId::new(1),
1419 edge_type: "REL".into(),
1420 properties: vec![],
1421 },
1422 SnapshotEdge {
1423 id: EdgeId::new(0),
1424 src: NodeId::new(0),
1425 dst: NodeId::new(1),
1426 edge_type: "REL".into(),
1427 properties: vec![],
1428 },
1429 ],
1430 );
1431
1432 let result = db.restore_snapshot(&bytes);
1433 assert!(result.is_err());
1434 let err = result.unwrap_err().to_string();
1435 assert!(err.contains("duplicate edge ID"), "got: {err}");
1436 }
1437
1438 #[test]
1439 fn test_restore_rejects_dangling_source() {
1440 let db = GrafeoDB::new_in_memory();
1441
1442 let bytes = make_snapshot(
1443 SNAPSHOT_VERSION,
1444 vec![SnapshotNode {
1445 id: NodeId::new(0),
1446 labels: vec![],
1447 properties: vec![],
1448 }],
1449 vec![SnapshotEdge {
1450 id: EdgeId::new(0),
1451 src: NodeId::new(999),
1452 dst: NodeId::new(0),
1453 edge_type: "REL".into(),
1454 properties: vec![],
1455 }],
1456 );
1457
1458 let result = db.restore_snapshot(&bytes);
1459 assert!(result.is_err());
1460 let err = result.unwrap_err().to_string();
1461 assert!(err.contains("non-existent source node"), "got: {err}");
1462 }
1463
1464 #[test]
1465 fn test_restore_rejects_dangling_destination() {
1466 let db = GrafeoDB::new_in_memory();
1467
1468 let bytes = make_snapshot(
1469 SNAPSHOT_VERSION,
1470 vec![SnapshotNode {
1471 id: NodeId::new(0),
1472 labels: vec![],
1473 properties: vec![],
1474 }],
1475 vec![SnapshotEdge {
1476 id: EdgeId::new(0),
1477 src: NodeId::new(0),
1478 dst: NodeId::new(999),
1479 edge_type: "REL".into(),
1480 properties: vec![],
1481 }],
1482 );
1483
1484 let result = db.restore_snapshot(&bytes);
1485 assert!(result.is_err());
1486 let err = result.unwrap_err().to_string();
1487 assert!(err.contains("non-existent destination node"), "got: {err}");
1488 }
1489
1490 #[test]
1493 fn test_snapshot_roundtrip_property_index() {
1494 let db = GrafeoDB::new_in_memory();
1495 let session = db.session();
1496
1497 session
1498 .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1499 .unwrap();
1500 db.create_property_index("email");
1501 assert!(db.has_property_index("email"));
1502
1503 let snapshot = db.export_snapshot().unwrap();
1504 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1505
1506 assert!(db2.has_property_index("email"));
1507
1508 let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1510 assert_eq!(found.len(), 1);
1511 }
1512
1513 #[cfg(feature = "vector-index")]
1514 #[test]
1515 fn test_snapshot_roundtrip_vector_index() {
1516 use std::sync::Arc;
1517
1518 let db = GrafeoDB::new_in_memory();
1519
1520 let n1 = db.create_node(&["Doc"]);
1521 db.set_node_property(
1522 n1,
1523 "embedding",
1524 Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1525 );
1526 let n2 = db.create_node(&["Doc"]);
1527 db.set_node_property(
1528 n2,
1529 "embedding",
1530 Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1531 );
1532
1533 db.create_vector_index("Doc", "embedding", None, Some("cosine"), Some(4), Some(32))
1534 .unwrap();
1535
1536 let snapshot = db.export_snapshot().unwrap();
1537 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1538
1539 let results = db2
1541 .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1542 .unwrap();
1543 assert_eq!(results.len(), 2);
1544 assert_eq!(results[0].0, n1);
1546 }
1547
1548 #[cfg(feature = "text-index")]
1549 #[test]
1550 fn test_snapshot_roundtrip_text_index() {
1551 let db = GrafeoDB::new_in_memory();
1552
1553 let n1 = db.create_node(&["Article"]);
1554 db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1555 let n2 = db.create_node(&["Article"]);
1556 db.set_node_property(n2, "body", Value::String("python web framework".into()));
1557
1558 db.create_text_index("Article", "body").unwrap();
1559
1560 let snapshot = db.export_snapshot().unwrap();
1561 let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1562
1563 let results = db2
1565 .text_search("Article", "body", "graph database", 10)
1566 .unwrap();
1567 assert_eq!(results.len(), 1);
1568 assert_eq!(results[0].0, n1);
1569 }
1570
1571 #[test]
1572 fn test_snapshot_roundtrip_property_index_via_restore() {
1573 let db = GrafeoDB::new_in_memory();
1574 let session = db.session();
1575
1576 session
1577 .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1578 .unwrap();
1579 db.create_property_index("email");
1580
1581 let snapshot = db.export_snapshot().unwrap();
1582
1583 session
1585 .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1586 .unwrap();
1587 db.drop_property_index("email");
1588 assert!(!db.has_property_index("email"));
1589
1590 db.restore_snapshot(&snapshot).unwrap();
1592 assert!(db.has_property_index("email"));
1593 }
1594}