Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(any(feature = "wal", feature = "grafeo-file"))]
4use std::path::Path;
5
6#[cfg(any(feature = "vector-index", feature = "text-index"))]
7use grafeo_common::grafeo_warn;
8use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
9use grafeo_common::utils::error::{Error, Result};
10use hashbrown::HashSet;
11
12use crate::config::Config;
13
14#[cfg(feature = "wal")]
15use grafeo_adapters::storage::wal::WalRecord;
16
17use crate::catalog::{
18    EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
19};
20
21/// Current snapshot version.
22const SNAPSHOT_VERSION: u8 = 4;
23
24/// Binary snapshot format (v4: graph data, named graphs, RDF, schema, index metadata,
25/// and property version history for temporal support).
26#[derive(serde::Serialize, serde::Deserialize)]
27struct Snapshot {
28    version: u8,
29    nodes: Vec<SnapshotNode>,
30    edges: Vec<SnapshotEdge>,
31    named_graphs: Vec<NamedGraphSnapshot>,
32    rdf_triples: Vec<SnapshotTriple>,
33    rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
34    schema: SnapshotSchema,
35    indexes: SnapshotIndexes,
36    /// Current store epoch at snapshot time (0 when temporal is disabled).
37    epoch: u64,
38}
39
40/// Schema metadata within a snapshot.
41#[derive(serde::Serialize, serde::Deserialize, Default)]
42struct SnapshotSchema {
43    node_types: Vec<NodeTypeDefinition>,
44    edge_types: Vec<EdgeTypeDefinition>,
45    graph_types: Vec<GraphTypeDefinition>,
46    procedures: Vec<ProcedureDefinition>,
47    schemas: Vec<String>,
48    graph_type_bindings: Vec<(String, String)>,
49}
50
51/// Index metadata within a snapshot (definitions only, not index data).
52#[derive(serde::Serialize, serde::Deserialize, Default)]
53struct SnapshotIndexes {
54    property_indexes: Vec<String>,
55    vector_indexes: Vec<SnapshotVectorIndex>,
56    text_indexes: Vec<SnapshotTextIndex>,
57}
58
59/// Vector index definition for snapshot persistence.
60#[derive(serde::Serialize, serde::Deserialize)]
61struct SnapshotVectorIndex {
62    label: String,
63    property: String,
64    dimensions: usize,
65    metric: grafeo_core::index::vector::DistanceMetric,
66    m: usize,
67    ef_construction: usize,
68}
69
70/// Text index definition for snapshot persistence.
71#[derive(serde::Serialize, serde::Deserialize)]
72struct SnapshotTextIndex {
73    label: String,
74    property: String,
75}
76
77/// A named graph partition within a v2 snapshot.
78#[derive(serde::Serialize, serde::Deserialize)]
79struct NamedGraphSnapshot {
80    name: String,
81    nodes: Vec<SnapshotNode>,
82    edges: Vec<SnapshotEdge>,
83}
84
85/// An RDF triple in snapshot format (N-Triples encoded terms).
86#[derive(serde::Serialize, serde::Deserialize)]
87struct SnapshotTriple {
88    subject: String,
89    predicate: String,
90    object: String,
91}
92
93/// An RDF named graph in snapshot format.
94#[derive(serde::Serialize, serde::Deserialize)]
95struct RdfNamedGraphSnapshot {
96    name: String,
97    triples: Vec<SnapshotTriple>,
98}
99
100#[derive(serde::Serialize, serde::Deserialize)]
101struct SnapshotNode {
102    id: NodeId,
103    labels: Vec<String>,
104    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
105    properties: Vec<(String, Vec<(EpochId, Value)>)>,
106}
107
108#[derive(serde::Serialize, serde::Deserialize)]
109struct SnapshotEdge {
110    id: EdgeId,
111    src: NodeId,
112    dst: NodeId,
113    edge_type: String,
114    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
115    properties: Vec<(String, Vec<(EpochId, Value)>)>,
116}
117
118/// Collects all nodes from a store into snapshot format.
119///
120/// With `temporal`: stores full property version history.
121/// Without: wraps each current value as a single-entry version list at epoch 0.
122fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
123    store
124        .all_nodes()
125        .map(|n| {
126            #[cfg(feature = "temporal")]
127            let properties = store
128                .node_property_history(n.id)
129                .into_iter()
130                .map(|(k, entries)| (k.to_string(), entries))
131                .collect();
132
133            #[cfg(not(feature = "temporal"))]
134            let properties = n
135                .properties
136                .into_iter()
137                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
138                .collect();
139
140            SnapshotNode {
141                id: n.id,
142                labels: n.labels.iter().map(|l| l.to_string()).collect(),
143                properties,
144            }
145        })
146        .collect()
147}
148
149/// Collects all edges from a store into snapshot format.
150///
151/// With `temporal`: stores full property version history.
152/// Without: wraps each current value as a single-entry version list at epoch 0.
153fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
154    store
155        .all_edges()
156        .map(|e| {
157            #[cfg(feature = "temporal")]
158            let properties = store
159                .edge_property_history(e.id)
160                .into_iter()
161                .map(|(k, entries)| (k.to_string(), entries))
162                .collect();
163
164            #[cfg(not(feature = "temporal"))]
165            let properties = e
166                .properties
167                .into_iter()
168                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
169                .collect();
170
171            SnapshotEdge {
172                id: e.id,
173                src: e.src,
174                dst: e.dst,
175                edge_type: e.edge_type.to_string(),
176                properties,
177            }
178        })
179        .collect()
180}
181
182/// Populates a store from snapshot node/edge data.
183///
184/// With `temporal`: replays all `(epoch, value)` entries into version logs.
185/// Without: reads the latest value from each property's version list.
186fn populate_store_from_snapshot(
187    store: &grafeo_core::graph::lpg::LpgStore,
188    nodes: Vec<SnapshotNode>,
189    edges: Vec<SnapshotEdge>,
190) -> Result<()> {
191    for node in nodes {
192        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
193        store.create_node_with_id(node.id, &label_refs)?;
194        for (key, entries) in node.properties {
195            #[cfg(feature = "temporal")]
196            for (epoch, value) in entries {
197                store.set_node_property_at_epoch(node.id, &key, value, epoch);
198            }
199            #[cfg(not(feature = "temporal"))]
200            if let Some((_, value)) = entries.into_iter().last() {
201                store.set_node_property(node.id, &key, value);
202            }
203        }
204    }
205    for edge in edges {
206        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
207        for (key, entries) in edge.properties {
208            #[cfg(feature = "temporal")]
209            for (epoch, value) in entries {
210                store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
211            }
212            #[cfg(not(feature = "temporal"))]
213            if let Some((_, value)) = entries.into_iter().last() {
214                store.set_edge_property(edge.id, &key, value);
215            }
216        }
217    }
218    Ok(())
219}
220
221/// Validates snapshot nodes/edges for duplicates and dangling references.
222fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
223    let mut node_ids = HashSet::with_capacity(nodes.len());
224    for node in nodes {
225        if !node_ids.insert(node.id) {
226            return Err(Error::Internal(format!(
227                "snapshot contains duplicate node ID {}",
228                node.id
229            )));
230        }
231    }
232    let mut edge_ids = HashSet::with_capacity(edges.len());
233    for edge in edges {
234        if !edge_ids.insert(edge.id) {
235            return Err(Error::Internal(format!(
236                "snapshot contains duplicate edge ID {}",
237                edge.id
238            )));
239        }
240        if !node_ids.contains(&edge.src) {
241            return Err(Error::Internal(format!(
242                "snapshot edge {} references non-existent source node {}",
243                edge.id, edge.src
244            )));
245        }
246        if !node_ids.contains(&edge.dst) {
247            return Err(Error::Internal(format!(
248                "snapshot edge {} references non-existent destination node {}",
249                edge.id, edge.dst
250            )));
251        }
252    }
253    Ok(())
254}
255
256/// Collects all triples from an RDF store into snapshot format.
257#[cfg(feature = "rdf")]
258fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
259    store
260        .triples()
261        .into_iter()
262        .map(|t| SnapshotTriple {
263            subject: t.subject().to_string(),
264            predicate: t.predicate().to_string(),
265            object: t.object().to_string(),
266        })
267        .collect()
268}
269
270/// Populates an RDF store from snapshot triples.
271#[cfg(feature = "rdf")]
272fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
273    use grafeo_core::graph::rdf::{Term, Triple};
274    for triple in triples {
275        if let (Some(s), Some(p), Some(o)) = (
276            Term::from_ntriples(&triple.subject),
277            Term::from_ntriples(&triple.predicate),
278            Term::from_ntriples(&triple.object),
279        ) {
280            store.insert(Triple::new(s, p, o));
281        }
282    }
283}
284
285// =========================================================================
286// Snapshot deserialization helpers (used by single-file format)
287// =========================================================================
288
289/// Decodes snapshot bytes and populates a store and catalog.
290#[cfg(feature = "grafeo-file")]
291pub(super) fn load_snapshot_into_store(
292    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
293    catalog: &std::sync::Arc<crate::catalog::Catalog>,
294    #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
295    data: &[u8],
296) -> grafeo_common::utils::error::Result<()> {
297    use grafeo_common::utils::error::Error;
298
299    let config = bincode::config::standard();
300    let (snapshot, _) =
301        bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
302            Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
303        })?;
304
305    populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
306
307    // Restore epoch from snapshot (store-level only; TransactionManager
308    // sync is handled in with_config() after all recovery completes).
309    #[cfg(feature = "temporal")]
310    store.sync_epoch(EpochId::new(snapshot.epoch));
311
312    for graph in &snapshot.named_graphs {
313        store
314            .create_graph(&graph.name)
315            .map_err(|e| Error::Internal(e.to_string()))?;
316        if let Some(graph_store) = store.graph(&graph.name) {
317            populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
318            #[cfg(feature = "temporal")]
319            graph_store.sync_epoch(EpochId::new(snapshot.epoch));
320        }
321    }
322    restore_schema_from_snapshot(catalog, &snapshot.schema);
323
324    // Restore RDF triples
325    #[cfg(feature = "rdf")]
326    {
327        populate_rdf_store(rdf_store, &snapshot.rdf_triples);
328        for rdf_graph in &snapshot.rdf_named_graphs {
329            rdf_store.create_graph(&rdf_graph.name);
330            if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
331                populate_rdf_store(&graph_store, &rdf_graph.triples);
332            }
333        }
334    }
335
336    Ok(())
337}
338
339/// Populates a store from snapshot refs (borrowed, for single-file loading).
340#[cfg(feature = "grafeo-file")]
341fn populate_store_from_snapshot_ref(
342    store: &grafeo_core::graph::lpg::LpgStore,
343    nodes: &[SnapshotNode],
344    edges: &[SnapshotEdge],
345) -> grafeo_common::utils::error::Result<()> {
346    for node in nodes {
347        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
348        store.create_node_with_id(node.id, &label_refs)?;
349        for (key, entries) in &node.properties {
350            #[cfg(feature = "temporal")]
351            for (epoch, value) in entries {
352                store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
353            }
354            #[cfg(not(feature = "temporal"))]
355            if let Some((_, value)) = entries.last() {
356                store.set_node_property(node.id, key, value.clone());
357            }
358        }
359    }
360    for edge in edges {
361        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
362        for (key, entries) in &edge.properties {
363            #[cfg(feature = "temporal")]
364            for (epoch, value) in entries {
365                store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
366            }
367            #[cfg(not(feature = "temporal"))]
368            if let Some((_, value)) = entries.last() {
369                store.set_edge_property(edge.id, key, value.clone());
370            }
371        }
372    }
373    Ok(())
374}
375
376/// Restores schema definitions from a snapshot into the catalog.
377fn restore_schema_from_snapshot(
378    catalog: &std::sync::Arc<crate::catalog::Catalog>,
379    schema: &SnapshotSchema,
380) {
381    for def in &schema.node_types {
382        catalog.register_or_replace_node_type(def.clone());
383    }
384    for def in &schema.edge_types {
385        catalog.register_or_replace_edge_type_def(def.clone());
386    }
387    for def in &schema.graph_types {
388        let _ = catalog.register_graph_type(def.clone());
389    }
390    for def in &schema.procedures {
391        catalog.replace_procedure(def.clone()).ok();
392    }
393    for name in &schema.schemas {
394        let _ = catalog.register_schema_namespace(name.clone());
395    }
396    for (graph_name, type_name) in &schema.graph_type_bindings {
397        let _ = catalog.bind_graph_type(graph_name, type_name.clone());
398    }
399}
400
401/// Collects schema definitions from the catalog into snapshot format.
402fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
403    SnapshotSchema {
404        node_types: catalog.all_node_type_defs(),
405        edge_types: catalog.all_edge_type_defs(),
406        graph_types: catalog.all_graph_type_defs(),
407        procedures: catalog.all_procedure_defs(),
408        schemas: catalog.schema_names(),
409        graph_type_bindings: catalog.all_graph_type_bindings(),
410    }
411}
412
413/// Restores indexes from snapshot metadata by rebuilding them from existing data.
414///
415/// Must be called after all nodes/edges have been populated, since index
416/// creation scans existing data.
417fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
418    for name in &indexes.property_indexes {
419        db.lpg_store().create_property_index(name);
420    }
421
422    #[cfg(feature = "vector-index")]
423    for vi in &indexes.vector_indexes {
424        if let Err(err) = db.create_vector_index(
425            &vi.label,
426            &vi.property,
427            Some(vi.dimensions),
428            Some(vi.metric.name()),
429            Some(vi.m),
430            Some(vi.ef_construction),
431        ) {
432            grafeo_warn!(
433                "Failed to restore vector index :{label}({property}): {err}",
434                label = vi.label,
435                property = vi.property,
436            );
437        }
438    }
439
440    #[cfg(feature = "text-index")]
441    for ti in &indexes.text_indexes {
442        if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
443            grafeo_warn!(
444                "Failed to restore text index :{label}({property}): {err}",
445                label = ti.label,
446                property = ti.property,
447            );
448        }
449    }
450}
451
452/// Collects index metadata from a store into snapshot format.
453fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
454    let property_indexes = store.property_index_keys();
455
456    #[cfg(feature = "vector-index")]
457    let vector_indexes: Vec<SnapshotVectorIndex> = store
458        .vector_index_entries()
459        .into_iter()
460        .filter_map(|(key, index)| {
461            let (label, property) = key.split_once(':')?;
462            let config = index.config();
463            Some(SnapshotVectorIndex {
464                label: label.to_string(),
465                property: property.to_string(),
466                dimensions: config.dimensions,
467                metric: config.metric,
468                m: config.m,
469                ef_construction: config.ef_construction,
470            })
471        })
472        .collect();
473    #[cfg(not(feature = "vector-index"))]
474    let vector_indexes = Vec::new();
475
476    #[cfg(feature = "text-index")]
477    let text_indexes: Vec<SnapshotTextIndex> = store
478        .text_index_entries()
479        .into_iter()
480        .filter_map(|(key, _)| {
481            let (label, property) = key.split_once(':')?;
482            Some(SnapshotTextIndex {
483                label: label.to_string(),
484                property: property.to_string(),
485            })
486        })
487        .collect();
488    #[cfg(not(feature = "text-index"))]
489    let text_indexes = Vec::new();
490
491    SnapshotIndexes {
492        property_indexes,
493        vector_indexes,
494        text_indexes,
495    }
496}
497
498impl super::GrafeoDB {
499    // =========================================================================
500    // ADMIN API: Persistence Control
501    // =========================================================================
502
503    /// Saves the database to a file path.
504    ///
505    /// - If the path ends in `.grafeo`: creates a single-file database
506    /// - Otherwise: creates a WAL directory-backed database at the path
507    /// - If in-memory: creates a new persistent database at path
508    /// - If file-backed: creates a copy at the new path
509    ///
510    /// The original database remains unchanged.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if the save operation fails.
515    ///
516    /// Requires the `wal` feature for persistence support.
517    #[cfg(feature = "wal")]
518    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
519        let path = path.as_ref();
520
521        // Single-file format: export snapshot directly to a .grafeo file
522        #[cfg(feature = "grafeo-file")]
523        if path.extension().is_some_and(|ext| ext == "grafeo") {
524            return self.save_as_grafeo_file(path);
525        }
526
527        // Create target database with WAL enabled
528        let target_config = Config::persistent(path);
529        let target = Self::with_config(target_config)?;
530
531        // Copy all nodes using WAL-enabled methods
532        for node in self.lpg_store().all_nodes() {
533            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
534            target
535                .lpg_store()
536                .create_node_with_id(node.id, &label_refs)?;
537
538            // Log to WAL
539            target.log_wal(&WalRecord::CreateNode {
540                id: node.id,
541                labels: node.labels.iter().map(|s| s.to_string()).collect(),
542            })?;
543
544            // Copy properties
545            for (key, value) in node.properties {
546                target
547                    .lpg_store()
548                    .set_node_property(node.id, key.as_str(), value.clone());
549                target.log_wal(&WalRecord::SetNodeProperty {
550                    id: node.id,
551                    key: key.to_string(),
552                    value,
553                })?;
554            }
555        }
556
557        // Copy all edges using WAL-enabled methods
558        for edge in self.lpg_store().all_edges() {
559            target
560                .lpg_store()
561                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
562
563            // Log to WAL
564            target.log_wal(&WalRecord::CreateEdge {
565                id: edge.id,
566                src: edge.src,
567                dst: edge.dst,
568                edge_type: edge.edge_type.to_string(),
569            })?;
570
571            // Copy properties
572            for (key, value) in edge.properties {
573                target
574                    .lpg_store()
575                    .set_edge_property(edge.id, key.as_str(), value.clone());
576                target.log_wal(&WalRecord::SetEdgeProperty {
577                    id: edge.id,
578                    key: key.to_string(),
579                    value,
580                })?;
581            }
582        }
583
584        // Copy named graphs
585        for graph_name in self.lpg_store().graph_names() {
586            if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
587                target.log_wal(&WalRecord::CreateNamedGraph {
588                    name: graph_name.clone(),
589                })?;
590                target
591                    .lpg_store()
592                    .create_graph(&graph_name)
593                    .map_err(|e| Error::Internal(e.to_string()))?;
594
595                if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
596                    // Switch WAL context to this named graph
597                    target.log_wal(&WalRecord::SwitchGraph {
598                        name: Some(graph_name.clone()),
599                    })?;
600
601                    for node in src_graph.all_nodes() {
602                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
603                        dst_graph.create_node_with_id(node.id, &label_refs)?;
604                        target.log_wal(&WalRecord::CreateNode {
605                            id: node.id,
606                            labels: node.labels.iter().map(|s| s.to_string()).collect(),
607                        })?;
608                        for (key, value) in node.properties {
609                            dst_graph.set_node_property(node.id, key.as_str(), value.clone());
610                            target.log_wal(&WalRecord::SetNodeProperty {
611                                id: node.id,
612                                key: key.to_string(),
613                                value,
614                            })?;
615                        }
616                    }
617                    for edge in src_graph.all_edges() {
618                        dst_graph.create_edge_with_id(
619                            edge.id,
620                            edge.src,
621                            edge.dst,
622                            &edge.edge_type,
623                        )?;
624                        target.log_wal(&WalRecord::CreateEdge {
625                            id: edge.id,
626                            src: edge.src,
627                            dst: edge.dst,
628                            edge_type: edge.edge_type.to_string(),
629                        })?;
630                        for (key, value) in edge.properties {
631                            dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
632                            target.log_wal(&WalRecord::SetEdgeProperty {
633                                id: edge.id,
634                                key: key.to_string(),
635                                value,
636                            })?;
637                        }
638                    }
639                }
640            }
641        }
642
643        // Switch WAL context back to default graph
644        if !self.lpg_store().graph_names().is_empty() {
645            target.log_wal(&WalRecord::SwitchGraph { name: None })?;
646        }
647
648        // Copy RDF data with WAL logging
649        #[cfg(feature = "rdf")]
650        {
651            for triple in self.rdf_store.triples() {
652                let record = WalRecord::InsertRdfTriple {
653                    subject: triple.subject().to_string(),
654                    predicate: triple.predicate().to_string(),
655                    object: triple.object().to_string(),
656                    graph: None,
657                };
658                target.rdf_store.insert((*triple).clone());
659                target.log_wal(&record)?;
660            }
661            for name in self.rdf_store.graph_names() {
662                target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
663                if let Some(src_graph) = self.rdf_store.graph(&name) {
664                    let dst_graph = target.rdf_store.graph_or_create(&name);
665                    for triple in src_graph.triples() {
666                        let record = WalRecord::InsertRdfTriple {
667                            subject: triple.subject().to_string(),
668                            predicate: triple.predicate().to_string(),
669                            object: triple.object().to_string(),
670                            graph: Some(name.clone()),
671                        };
672                        dst_graph.insert((*triple).clone());
673                        target.log_wal(&record)?;
674                    }
675                }
676            }
677        }
678
679        // Checkpoint and close the target database
680        target.close()?;
681
682        Ok(())
683    }
684
685    /// Creates an in-memory copy of this database.
686    ///
687    /// Returns a new database that is completely independent, including
688    /// all named graph data.
689    /// Useful for:
690    /// Saves the database to a single `.grafeo` file.
691    #[cfg(feature = "grafeo-file")]
692    fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
693        use grafeo_adapters::storage::file::GrafeoFileManager;
694
695        let snapshot_data = self.export_snapshot()?;
696        let epoch = self.lpg_store().current_epoch();
697        let transaction_id = self
698            .transaction_manager
699            .last_assigned_transaction_id()
700            .map_or(0, |t| t.0);
701        let node_count = self.lpg_store().node_count() as u64;
702        let edge_count = self.lpg_store().edge_count() as u64;
703
704        let fm = GrafeoFileManager::create(path)?;
705        fm.write_snapshot(
706            &snapshot_data,
707            epoch.0,
708            transaction_id,
709            node_count,
710            edge_count,
711        )?;
712        Ok(())
713    }
714
715    /// - Testing modifications without affecting the original
716    /// - Faster operations when persistence isn't needed
717    ///
718    /// # Errors
719    ///
720    /// Returns an error if the copy operation fails.
721    pub fn to_memory(&self) -> Result<Self> {
722        let config = Config::in_memory();
723        let target = Self::with_config(config)?;
724
725        // Copy default graph nodes
726        for node in self.lpg_store().all_nodes() {
727            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
728            target
729                .lpg_store()
730                .create_node_with_id(node.id, &label_refs)?;
731            for (key, value) in node.properties {
732                target
733                    .lpg_store()
734                    .set_node_property(node.id, key.as_str(), value);
735            }
736        }
737
738        // Copy default graph edges
739        for edge in self.lpg_store().all_edges() {
740            target
741                .lpg_store()
742                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
743            for (key, value) in edge.properties {
744                target
745                    .lpg_store()
746                    .set_edge_property(edge.id, key.as_str(), value);
747            }
748        }
749
750        // Copy named graphs
751        for graph_name in self.lpg_store().graph_names() {
752            if let Some(src_graph) = self.lpg_store().graph(&graph_name) {
753                target
754                    .lpg_store()
755                    .create_graph(&graph_name)
756                    .map_err(|e| Error::Internal(e.to_string()))?;
757                if let Some(dst_graph) = target.lpg_store().graph(&graph_name) {
758                    for node in src_graph.all_nodes() {
759                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
760                        dst_graph.create_node_with_id(node.id, &label_refs)?;
761                        for (key, value) in node.properties {
762                            dst_graph.set_node_property(node.id, key.as_str(), value);
763                        }
764                    }
765                    for edge in src_graph.all_edges() {
766                        dst_graph.create_edge_with_id(
767                            edge.id,
768                            edge.src,
769                            edge.dst,
770                            &edge.edge_type,
771                        )?;
772                        for (key, value) in edge.properties {
773                            dst_graph.set_edge_property(edge.id, key.as_str(), value);
774                        }
775                    }
776                }
777            }
778        }
779
780        // Copy RDF data
781        #[cfg(feature = "rdf")]
782        {
783            for triple in self.rdf_store.triples() {
784                target.rdf_store.insert((*triple).clone());
785            }
786            for name in self.rdf_store.graph_names() {
787                if let Some(src_graph) = self.rdf_store.graph(&name) {
788                    let dst_graph = target.rdf_store.graph_or_create(&name);
789                    for triple in src_graph.triples() {
790                        dst_graph.insert((*triple).clone());
791                    }
792                }
793            }
794        }
795
796        Ok(target)
797    }
798
799    /// Opens a database file and loads it entirely into memory.
800    ///
801    /// The returned database has no connection to the original file.
802    /// Changes will NOT be written back to the file.
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the file can't be opened or loaded.
807    #[cfg(feature = "wal")]
808    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
809        // Open the source database (triggers WAL recovery)
810        let source = Self::open(path)?;
811
812        // Create in-memory copy
813        let target = source.to_memory()?;
814
815        // Close the source (releases file handles)
816        source.close()?;
817
818        Ok(target)
819    }
820
821    // =========================================================================
822    // ADMIN API: Snapshot Export/Import
823    // =========================================================================
824
825    /// Exports the entire database to a binary snapshot.
826    ///
827    /// The returned bytes can be stored (e.g. in IndexedDB) and later
828    /// restored with [`import_snapshot()`](Self::import_snapshot).
829    /// Includes all named graph data.
830    ///
831    /// Properties are stored as version-history lists. When `temporal` is
832    /// enabled, the full history is captured. Otherwise, each property is
833    /// wrapped as a single-entry list at epoch 0.
834    ///
835    /// # Errors
836    ///
837    /// Returns an error if serialization fails.
838    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
839        let nodes = collect_snapshot_nodes(self.lpg_store());
840        let edges = collect_snapshot_edges(self.lpg_store());
841
842        // Collect named graphs
843        let named_graphs: Vec<NamedGraphSnapshot> = self
844            .lpg_store()
845            .graph_names()
846            .into_iter()
847            .filter_map(|name| {
848                self.lpg_store()
849                    .graph(&name)
850                    .map(|graph_store| NamedGraphSnapshot {
851                        name,
852                        nodes: collect_snapshot_nodes(&graph_store),
853                        edges: collect_snapshot_edges(&graph_store),
854                    })
855            })
856            .collect();
857
858        // Collect RDF triples
859        #[cfg(feature = "rdf")]
860        let rdf_triples = collect_rdf_triples(&self.rdf_store);
861        #[cfg(not(feature = "rdf"))]
862        let rdf_triples = Vec::new();
863
864        #[cfg(feature = "rdf")]
865        let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
866            .rdf_store
867            .graph_names()
868            .into_iter()
869            .filter_map(|name| {
870                self.rdf_store
871                    .graph(&name)
872                    .map(|graph| RdfNamedGraphSnapshot {
873                        name,
874                        triples: collect_rdf_triples(&graph),
875                    })
876            })
877            .collect();
878        #[cfg(not(feature = "rdf"))]
879        let rdf_named_graphs = Vec::new();
880
881        let schema = collect_schema(&self.catalog);
882        let indexes = collect_index_metadata(self.lpg_store());
883
884        let snapshot = Snapshot {
885            version: SNAPSHOT_VERSION,
886            nodes,
887            edges,
888            named_graphs,
889            rdf_triples,
890            rdf_named_graphs,
891            schema,
892            indexes,
893            #[cfg(feature = "temporal")]
894            epoch: self.transaction_manager.current_epoch().as_u64(),
895            #[cfg(not(feature = "temporal"))]
896            epoch: 0,
897        };
898
899        let config = bincode::config::standard();
900        bincode::serde::encode_to_vec(&snapshot, config)
901            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
902    }
903
904    /// Creates a new in-memory database from a binary snapshot.
905    ///
906    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
907    ///
908    /// All edge references are validated before any data is inserted: every
909    /// edge's source and destination must reference a node present in the
910    /// snapshot, and duplicate node/edge IDs are rejected. If validation
911    /// fails, no database is created.
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the snapshot is invalid, contains dangling edge
916    /// references, has duplicate IDs, or deserialization fails.
917    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
918        if data.is_empty() {
919            return Err(Error::Internal("empty snapshot data".to_string()));
920        }
921
922        let version = data[0];
923        if version != 4 {
924            return Err(Error::Internal(format!(
925                "unsupported snapshot version: {version} (expected 4)"
926            )));
927        }
928
929        let config = bincode::config::standard();
930        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
931            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
932
933        // Validate default graph data
934        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
935
936        // Validate each named graph
937        for ng in &snapshot.named_graphs {
938            validate_snapshot_data(&ng.nodes, &ng.edges)?;
939        }
940
941        let db = Self::new_in_memory();
942        populate_store_from_snapshot(db.lpg_store(), snapshot.nodes, snapshot.edges)?;
943
944        // Restore epoch from snapshot
945        #[cfg(feature = "temporal")]
946        {
947            let epoch = EpochId::new(snapshot.epoch);
948            db.lpg_store().sync_epoch(epoch);
949            db.transaction_manager.sync_epoch(epoch);
950        }
951
952        // Capture epoch before moving snapshot fields
953        #[cfg(feature = "temporal")]
954        let snapshot_epoch = EpochId::new(snapshot.epoch);
955
956        // Restore named graphs
957        for ng in snapshot.named_graphs {
958            db.lpg_store()
959                .create_graph(&ng.name)
960                .map_err(|e| Error::Internal(e.to_string()))?;
961            if let Some(graph_store) = db.lpg_store().graph(&ng.name) {
962                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
963                // Named graph stores need the same epoch so temporal property
964                // lookups via current_epoch() return the correct values.
965                #[cfg(feature = "temporal")]
966                graph_store.sync_epoch(snapshot_epoch);
967            }
968        }
969
970        // Restore RDF triples
971        #[cfg(feature = "rdf")]
972        {
973            populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
974            for rng in &snapshot.rdf_named_graphs {
975                let graph = db.rdf_store.graph_or_create(&rng.name);
976                populate_rdf_store(&graph, &rng.triples);
977            }
978        }
979
980        // Restore schema
981        restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
982
983        // Restore indexes (must come after data population)
984        restore_indexes_from_snapshot(&db, &snapshot.indexes);
985
986        Ok(db)
987    }
988
989    /// Replaces the current database contents with data from a binary snapshot.
990    ///
991    /// The `data` must have been produced by
992    /// [`export_snapshot()`](Self::export_snapshot).
993    ///
994    /// All validation (duplicate IDs, dangling edge references) is performed
995    /// before any data is modified. If validation fails, the current database
996    /// is left unchanged. If validation passes, the store is cleared and
997    /// rebuilt from the snapshot atomically (from the perspective of
998    /// subsequent queries).
999    ///
1000    /// # Errors
1001    ///
1002    /// Returns an error if the snapshot is invalid, contains dangling edge
1003    /// references, has duplicate IDs, or deserialization fails.
1004    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
1005        if data.is_empty() {
1006            return Err(Error::Internal("empty snapshot data".to_string()));
1007        }
1008
1009        let version = data[0];
1010        if version != 4 {
1011            return Err(Error::Internal(format!(
1012                "unsupported snapshot version: {version} (expected 4)"
1013            )));
1014        }
1015
1016        let config = bincode::config::standard();
1017        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1018            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1019
1020        // Validate all data before making any changes
1021        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1022        for ng in &snapshot.named_graphs {
1023            validate_snapshot_data(&ng.nodes, &ng.edges)?;
1024        }
1025
1026        // Drop all existing named graphs, then clear default store
1027        for name in self.lpg_store().graph_names() {
1028            self.lpg_store().drop_graph(&name);
1029        }
1030        self.lpg_store().clear();
1031
1032        populate_store_from_snapshot(self.lpg_store(), snapshot.nodes, snapshot.edges)?;
1033
1034        // Restore epoch from temporal snapshot
1035        #[cfg(feature = "temporal")]
1036        let snapshot_epoch = {
1037            let epoch = EpochId::new(snapshot.epoch);
1038            self.lpg_store().sync_epoch(epoch);
1039            self.transaction_manager.sync_epoch(epoch);
1040            epoch
1041        };
1042
1043        // Restore named graphs
1044        for ng in snapshot.named_graphs {
1045            self.lpg_store()
1046                .create_graph(&ng.name)
1047                .map_err(|e| Error::Internal(e.to_string()))?;
1048            if let Some(graph_store) = self.lpg_store().graph(&ng.name) {
1049                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1050                #[cfg(feature = "temporal")]
1051                graph_store.sync_epoch(snapshot_epoch);
1052            }
1053        }
1054
1055        // Restore RDF data
1056        #[cfg(feature = "rdf")]
1057        {
1058            // Clear existing RDF data
1059            self.rdf_store.clear();
1060            for name in self.rdf_store.graph_names() {
1061                self.rdf_store.drop_graph(&name);
1062            }
1063            populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1064            for rng in &snapshot.rdf_named_graphs {
1065                let graph = self.rdf_store.graph_or_create(&rng.name);
1066                populate_rdf_store(&graph, &rng.triples);
1067            }
1068        }
1069
1070        // Restore schema
1071        restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
1072
1073        // Restore indexes (must come after data population)
1074        restore_indexes_from_snapshot(self, &snapshot.indexes);
1075
1076        Ok(())
1077    }
1078
1079    // =========================================================================
1080    // ADMIN API: Iteration
1081    // =========================================================================
1082
1083    /// Returns an iterator over all nodes in the database.
1084    ///
1085    /// Useful for dump/export operations.
1086    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1087        self.lpg_store().all_nodes()
1088    }
1089
1090    /// Returns an iterator over all edges in the database.
1091    ///
1092    /// Useful for dump/export operations.
1093    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1094        self.lpg_store().all_edges()
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use grafeo_common::types::{EdgeId, NodeId, Value};
1101
1102    use super::super::GrafeoDB;
1103    use super::{
1104        SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1105    };
1106
1107    #[test]
1108    fn test_restore_snapshot_basic() {
1109        let db = GrafeoDB::new_in_memory();
1110        let session = db.session();
1111
1112        // Populate
1113        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1114        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1115
1116        let snapshot = db.export_snapshot().unwrap();
1117
1118        // Modify
1119        session
1120            .execute("INSERT (:Person {name: 'Vincent'})")
1121            .unwrap();
1122        assert_eq!(db.lpg_store().node_count(), 3);
1123
1124        // Restore original
1125        db.restore_snapshot(&snapshot).unwrap();
1126
1127        assert_eq!(db.lpg_store().node_count(), 2);
1128        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1129        assert_eq!(result.rows.len(), 2);
1130    }
1131
1132    #[test]
1133    fn test_restore_snapshot_validation_failure() {
1134        let db = GrafeoDB::new_in_memory();
1135        let session = db.session();
1136
1137        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1138
1139        // Corrupt snapshot: just garbage bytes
1140        let result = db.restore_snapshot(b"garbage");
1141        assert!(result.is_err());
1142
1143        // DB should be unchanged
1144        assert_eq!(db.lpg_store().node_count(), 1);
1145    }
1146
1147    #[test]
1148    fn test_restore_snapshot_empty_db() {
1149        let db = GrafeoDB::new_in_memory();
1150
1151        // Export empty snapshot, then populate, then restore to empty
1152        let empty_snapshot = db.export_snapshot().unwrap();
1153
1154        let session = db.session();
1155        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1156        assert_eq!(db.lpg_store().node_count(), 1);
1157
1158        db.restore_snapshot(&empty_snapshot).unwrap();
1159        assert_eq!(db.lpg_store().node_count(), 0);
1160    }
1161
1162    #[test]
1163    fn test_restore_snapshot_with_edges() {
1164        let db = GrafeoDB::new_in_memory();
1165        let session = db.session();
1166
1167        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1168        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1169        session
1170            .execute(
1171                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1172            )
1173            .unwrap();
1174
1175        let snapshot = db.export_snapshot().unwrap();
1176        assert_eq!(db.lpg_store().edge_count(), 1);
1177
1178        // Modify: add more data
1179        session
1180            .execute("INSERT (:Person {name: 'Vincent'})")
1181            .unwrap();
1182
1183        // Restore
1184        db.restore_snapshot(&snapshot).unwrap();
1185        assert_eq!(db.lpg_store().node_count(), 2);
1186        assert_eq!(db.lpg_store().edge_count(), 1);
1187    }
1188
1189    #[test]
1190    fn test_restore_snapshot_preserves_sessions() {
1191        let db = GrafeoDB::new_in_memory();
1192        let session = db.session();
1193
1194        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1195        let snapshot = db.export_snapshot().unwrap();
1196
1197        // Modify
1198        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1199
1200        // Restore
1201        db.restore_snapshot(&snapshot).unwrap();
1202
1203        // Session should still work and see restored data
1204        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1205        assert_eq!(result.rows.len(), 1);
1206    }
1207
1208    #[test]
1209    fn test_export_import_roundtrip() {
1210        let db = GrafeoDB::new_in_memory();
1211        let session = db.session();
1212
1213        session
1214            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1215            .unwrap();
1216
1217        let snapshot = db.export_snapshot().unwrap();
1218        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1219        let session2 = db2.session();
1220
1221        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1222        assert_eq!(result.rows.len(), 1);
1223    }
1224
1225    // --- to_memory() ---
1226
1227    #[test]
1228    fn test_to_memory_empty() {
1229        let db = GrafeoDB::new_in_memory();
1230        let copy = db.to_memory().unwrap();
1231        assert_eq!(copy.lpg_store().node_count(), 0);
1232        assert_eq!(copy.lpg_store().edge_count(), 0);
1233    }
1234
1235    #[test]
1236    fn test_to_memory_copies_nodes_and_properties() {
1237        let db = GrafeoDB::new_in_memory();
1238        let session = db.session();
1239        session
1240            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1241            .unwrap();
1242        session
1243            .execute("INSERT (:Person {name: 'Gus', age: 25})")
1244            .unwrap();
1245
1246        let copy = db.to_memory().unwrap();
1247        assert_eq!(copy.lpg_store().node_count(), 2);
1248
1249        let s2 = copy.session();
1250        let result = s2
1251            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1252            .unwrap();
1253        assert_eq!(result.rows.len(), 2);
1254        assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1255        assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1256    }
1257
1258    #[test]
1259    fn test_to_memory_copies_edges_and_properties() {
1260        let db = GrafeoDB::new_in_memory();
1261        let a = db.create_node(&["Person"]);
1262        db.set_node_property(a, "name", "Alix".into());
1263        let b = db.create_node(&["Person"]);
1264        db.set_node_property(b, "name", "Gus".into());
1265        let edge = db.create_edge(a, b, "KNOWS");
1266        db.set_edge_property(edge, "since", Value::Int64(2020));
1267
1268        let copy = db.to_memory().unwrap();
1269        assert_eq!(copy.lpg_store().node_count(), 2);
1270        assert_eq!(copy.lpg_store().edge_count(), 1);
1271
1272        let s2 = copy.session();
1273        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1274        assert_eq!(result.rows[0][0], Value::Int64(2020));
1275    }
1276
1277    #[test]
1278    fn test_to_memory_is_independent() {
1279        let db = GrafeoDB::new_in_memory();
1280        let session = db.session();
1281        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1282
1283        let copy = db.to_memory().unwrap();
1284
1285        // Mutating original should not affect copy
1286        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1287        assert_eq!(db.lpg_store().node_count(), 2);
1288        assert_eq!(copy.lpg_store().node_count(), 1);
1289    }
1290
1291    // --- iter_nodes() / iter_edges() ---
1292
1293    #[test]
1294    fn test_iter_nodes_empty() {
1295        let db = GrafeoDB::new_in_memory();
1296        assert_eq!(db.iter_nodes().count(), 0);
1297    }
1298
1299    #[test]
1300    fn test_iter_nodes_returns_all() {
1301        let db = GrafeoDB::new_in_memory();
1302        let id1 = db.create_node(&["Person"]);
1303        db.set_node_property(id1, "name", "Alix".into());
1304        let id2 = db.create_node(&["Animal"]);
1305        db.set_node_property(id2, "name", "Fido".into());
1306
1307        let nodes: Vec<_> = db.iter_nodes().collect();
1308        assert_eq!(nodes.len(), 2);
1309
1310        let names: Vec<_> = nodes
1311            .iter()
1312            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1313            .map(|(_, v)| v.clone())
1314            .collect();
1315        assert!(names.contains(&Value::String("Alix".into())));
1316        assert!(names.contains(&Value::String("Fido".into())));
1317    }
1318
1319    #[test]
1320    fn test_iter_edges_empty() {
1321        let db = GrafeoDB::new_in_memory();
1322        assert_eq!(db.iter_edges().count(), 0);
1323    }
1324
1325    #[test]
1326    fn test_iter_edges_returns_all() {
1327        let db = GrafeoDB::new_in_memory();
1328        let a = db.create_node(&["A"]);
1329        let b = db.create_node(&["B"]);
1330        let c = db.create_node(&["C"]);
1331        db.create_edge(a, b, "R1");
1332        db.create_edge(b, c, "R2");
1333
1334        let edges: Vec<_> = db.iter_edges().collect();
1335        assert_eq!(edges.len(), 2);
1336
1337        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1338        assert!(types.contains(&"R1"));
1339        assert!(types.contains(&"R2"));
1340    }
1341
1342    // --- restore_snapshot() validation ---
1343
1344    fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1345        let snap = Snapshot {
1346            version,
1347            nodes,
1348            edges,
1349            named_graphs: vec![],
1350            rdf_triples: vec![],
1351            rdf_named_graphs: vec![],
1352            schema: SnapshotSchema::default(),
1353            indexes: SnapshotIndexes::default(),
1354            epoch: 0,
1355        };
1356        bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1357    }
1358
1359    #[test]
1360    fn test_restore_rejects_unsupported_version() {
1361        let db = GrafeoDB::new_in_memory();
1362        let session = db.session();
1363        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1364
1365        let bytes = make_snapshot(99, vec![], vec![]);
1366
1367        let result = db.restore_snapshot(&bytes);
1368        assert!(result.is_err());
1369        let err = result.unwrap_err().to_string();
1370        assert!(err.contains("unsupported snapshot version"), "got: {err}");
1371
1372        // DB unchanged
1373        assert_eq!(db.lpg_store().node_count(), 1);
1374    }
1375
1376    #[test]
1377    fn test_restore_rejects_duplicate_node_ids() {
1378        let db = GrafeoDB::new_in_memory();
1379        let session = db.session();
1380        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1381
1382        let bytes = make_snapshot(
1383            SNAPSHOT_VERSION,
1384            vec![
1385                SnapshotNode {
1386                    id: NodeId::new(0),
1387                    labels: vec!["A".into()],
1388                    properties: vec![],
1389                },
1390                SnapshotNode {
1391                    id: NodeId::new(0),
1392                    labels: vec!["B".into()],
1393                    properties: vec![],
1394                },
1395            ],
1396            vec![],
1397        );
1398
1399        let result = db.restore_snapshot(&bytes);
1400        assert!(result.is_err());
1401        let err = result.unwrap_err().to_string();
1402        assert!(err.contains("duplicate node ID"), "got: {err}");
1403        assert_eq!(db.lpg_store().node_count(), 1);
1404    }
1405
1406    #[test]
1407    fn test_restore_rejects_duplicate_edge_ids() {
1408        let db = GrafeoDB::new_in_memory();
1409
1410        let bytes = make_snapshot(
1411            SNAPSHOT_VERSION,
1412            vec![
1413                SnapshotNode {
1414                    id: NodeId::new(0),
1415                    labels: vec![],
1416                    properties: vec![],
1417                },
1418                SnapshotNode {
1419                    id: NodeId::new(1),
1420                    labels: vec![],
1421                    properties: vec![],
1422                },
1423            ],
1424            vec![
1425                SnapshotEdge {
1426                    id: EdgeId::new(0),
1427                    src: NodeId::new(0),
1428                    dst: NodeId::new(1),
1429                    edge_type: "REL".into(),
1430                    properties: vec![],
1431                },
1432                SnapshotEdge {
1433                    id: EdgeId::new(0),
1434                    src: NodeId::new(0),
1435                    dst: NodeId::new(1),
1436                    edge_type: "REL".into(),
1437                    properties: vec![],
1438                },
1439            ],
1440        );
1441
1442        let result = db.restore_snapshot(&bytes);
1443        assert!(result.is_err());
1444        let err = result.unwrap_err().to_string();
1445        assert!(err.contains("duplicate edge ID"), "got: {err}");
1446    }
1447
1448    #[test]
1449    fn test_restore_rejects_dangling_source() {
1450        let db = GrafeoDB::new_in_memory();
1451
1452        let bytes = make_snapshot(
1453            SNAPSHOT_VERSION,
1454            vec![SnapshotNode {
1455                id: NodeId::new(0),
1456                labels: vec![],
1457                properties: vec![],
1458            }],
1459            vec![SnapshotEdge {
1460                id: EdgeId::new(0),
1461                src: NodeId::new(999),
1462                dst: NodeId::new(0),
1463                edge_type: "REL".into(),
1464                properties: vec![],
1465            }],
1466        );
1467
1468        let result = db.restore_snapshot(&bytes);
1469        assert!(result.is_err());
1470        let err = result.unwrap_err().to_string();
1471        assert!(err.contains("non-existent source node"), "got: {err}");
1472    }
1473
1474    #[test]
1475    fn test_restore_rejects_dangling_destination() {
1476        let db = GrafeoDB::new_in_memory();
1477
1478        let bytes = make_snapshot(
1479            SNAPSHOT_VERSION,
1480            vec![SnapshotNode {
1481                id: NodeId::new(0),
1482                labels: vec![],
1483                properties: vec![],
1484            }],
1485            vec![SnapshotEdge {
1486                id: EdgeId::new(0),
1487                src: NodeId::new(0),
1488                dst: NodeId::new(999),
1489                edge_type: "REL".into(),
1490                properties: vec![],
1491            }],
1492        );
1493
1494        let result = db.restore_snapshot(&bytes);
1495        assert!(result.is_err());
1496        let err = result.unwrap_err().to_string();
1497        assert!(err.contains("non-existent destination node"), "got: {err}");
1498    }
1499
1500    // --- index metadata roundtrip ---
1501
1502    #[test]
1503    fn test_snapshot_roundtrip_property_index() {
1504        let db = GrafeoDB::new_in_memory();
1505        let session = db.session();
1506
1507        session
1508            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1509            .unwrap();
1510        db.create_property_index("email");
1511        assert!(db.has_property_index("email"));
1512
1513        let snapshot = db.export_snapshot().unwrap();
1514        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1515
1516        assert!(db2.has_property_index("email"));
1517
1518        // Verify the index actually works for O(1) lookups
1519        let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1520        assert_eq!(found.len(), 1);
1521    }
1522
1523    #[cfg(feature = "vector-index")]
1524    #[test]
1525    fn test_snapshot_roundtrip_vector_index() {
1526        use std::sync::Arc;
1527
1528        let db = GrafeoDB::new_in_memory();
1529
1530        let n1 = db.create_node(&["Doc"]);
1531        db.set_node_property(
1532            n1,
1533            "embedding",
1534            Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1535        );
1536        let n2 = db.create_node(&["Doc"]);
1537        db.set_node_property(
1538            n2,
1539            "embedding",
1540            Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1541        );
1542
1543        db.create_vector_index("Doc", "embedding", None, Some("cosine"), Some(4), Some(32))
1544            .unwrap();
1545
1546        let snapshot = db.export_snapshot().unwrap();
1547        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1548
1549        // Vector search should work on the restored database
1550        let results = db2
1551            .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1552            .unwrap();
1553        assert_eq!(results.len(), 2);
1554        // Closest to [1,0,0] should be n1
1555        assert_eq!(results[0].0, n1);
1556    }
1557
1558    #[cfg(feature = "text-index")]
1559    #[test]
1560    fn test_snapshot_roundtrip_text_index() {
1561        let db = GrafeoDB::new_in_memory();
1562
1563        let n1 = db.create_node(&["Article"]);
1564        db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1565        let n2 = db.create_node(&["Article"]);
1566        db.set_node_property(n2, "body", Value::String("python web framework".into()));
1567
1568        db.create_text_index("Article", "body").unwrap();
1569
1570        let snapshot = db.export_snapshot().unwrap();
1571        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1572
1573        // Text search should work on the restored database
1574        let results = db2
1575            .text_search("Article", "body", "graph database", 10)
1576            .unwrap();
1577        assert_eq!(results.len(), 1);
1578        assert_eq!(results[0].0, n1);
1579    }
1580
1581    #[test]
1582    fn test_snapshot_roundtrip_property_index_via_restore() {
1583        let db = GrafeoDB::new_in_memory();
1584        let session = db.session();
1585
1586        session
1587            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1588            .unwrap();
1589        db.create_property_index("email");
1590
1591        let snapshot = db.export_snapshot().unwrap();
1592
1593        // Mutate the database
1594        session
1595            .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1596            .unwrap();
1597        db.drop_property_index("email");
1598        assert!(!db.has_property_index("email"));
1599
1600        // Restore should bring back the index
1601        db.restore_snapshot(&snapshot).unwrap();
1602        assert!(db.has_property_index("email"));
1603    }
1604}