Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use std::path::Path;
5
6#[cfg(any(feature = "vector-index", feature = "text-index"))]
7use grafeo_common::grafeo_warn;
8use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
9use grafeo_common::utils::error::{Error, Result};
10use hashbrown::HashSet;
11
12use crate::config::Config;
13
14#[cfg(feature = "wal")]
15use grafeo_adapters::storage::wal::WalRecord;
16
17use crate::catalog::{
18    EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
19};
20
21/// Current snapshot version.
22const SNAPSHOT_VERSION: u8 = 4;
23
24/// Binary snapshot format (v4: graph data, named graphs, RDF, schema, index metadata,
25/// and property version history for temporal support).
26#[derive(serde::Serialize, serde::Deserialize)]
27struct Snapshot {
28    version: u8,
29    nodes: Vec<SnapshotNode>,
30    edges: Vec<SnapshotEdge>,
31    named_graphs: Vec<NamedGraphSnapshot>,
32    rdf_triples: Vec<SnapshotTriple>,
33    rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
34    schema: SnapshotSchema,
35    indexes: SnapshotIndexes,
36    /// Current store epoch at snapshot time (0 when temporal is disabled).
37    epoch: u64,
38}
39
40/// Schema metadata within a snapshot.
41#[derive(serde::Serialize, serde::Deserialize, Default)]
42struct SnapshotSchema {
43    node_types: Vec<NodeTypeDefinition>,
44    edge_types: Vec<EdgeTypeDefinition>,
45    graph_types: Vec<GraphTypeDefinition>,
46    procedures: Vec<ProcedureDefinition>,
47    schemas: Vec<String>,
48    graph_type_bindings: Vec<(String, String)>,
49}
50
51/// Index metadata within a snapshot (definitions only, not index data).
52#[derive(serde::Serialize, serde::Deserialize, Default)]
53struct SnapshotIndexes {
54    property_indexes: Vec<String>,
55    vector_indexes: Vec<SnapshotVectorIndex>,
56    text_indexes: Vec<SnapshotTextIndex>,
57}
58
59/// Vector index definition for snapshot persistence.
60#[derive(serde::Serialize, serde::Deserialize)]
61struct SnapshotVectorIndex {
62    label: String,
63    property: String,
64    dimensions: usize,
65    metric: grafeo_core::index::vector::DistanceMetric,
66    m: usize,
67    ef_construction: usize,
68}
69
70/// Text index definition for snapshot persistence.
71#[derive(serde::Serialize, serde::Deserialize)]
72struct SnapshotTextIndex {
73    label: String,
74    property: String,
75}
76
77/// A named graph partition within a v2 snapshot.
78#[derive(serde::Serialize, serde::Deserialize)]
79struct NamedGraphSnapshot {
80    name: String,
81    nodes: Vec<SnapshotNode>,
82    edges: Vec<SnapshotEdge>,
83}
84
85/// An RDF triple in snapshot format (N-Triples encoded terms).
86#[derive(serde::Serialize, serde::Deserialize)]
87struct SnapshotTriple {
88    subject: String,
89    predicate: String,
90    object: String,
91}
92
93/// An RDF named graph in snapshot format.
94#[derive(serde::Serialize, serde::Deserialize)]
95struct RdfNamedGraphSnapshot {
96    name: String,
97    triples: Vec<SnapshotTriple>,
98}
99
100#[derive(serde::Serialize, serde::Deserialize)]
101struct SnapshotNode {
102    id: NodeId,
103    labels: Vec<String>,
104    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
105    properties: Vec<(String, Vec<(EpochId, Value)>)>,
106}
107
108#[derive(serde::Serialize, serde::Deserialize)]
109struct SnapshotEdge {
110    id: EdgeId,
111    src: NodeId,
112    dst: NodeId,
113    edge_type: String,
114    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
115    properties: Vec<(String, Vec<(EpochId, Value)>)>,
116}
117
118/// Collects all nodes from a store into snapshot format.
119///
120/// With `temporal`: stores full property version history.
121/// Without: wraps each current value as a single-entry version list at epoch 0.
122fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
123    store
124        .all_nodes()
125        .map(|n| {
126            #[cfg(feature = "temporal")]
127            let properties = store
128                .node_property_history(n.id)
129                .into_iter()
130                .map(|(k, entries)| (k.to_string(), entries))
131                .collect();
132
133            #[cfg(not(feature = "temporal"))]
134            let properties = n
135                .properties
136                .into_iter()
137                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
138                .collect();
139
140            SnapshotNode {
141                id: n.id,
142                labels: n.labels.iter().map(|l| l.to_string()).collect(),
143                properties,
144            }
145        })
146        .collect()
147}
148
149/// Collects all edges from a store into snapshot format.
150///
151/// With `temporal`: stores full property version history.
152/// Without: wraps each current value as a single-entry version list at epoch 0.
153fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
154    store
155        .all_edges()
156        .map(|e| {
157            #[cfg(feature = "temporal")]
158            let properties = store
159                .edge_property_history(e.id)
160                .into_iter()
161                .map(|(k, entries)| (k.to_string(), entries))
162                .collect();
163
164            #[cfg(not(feature = "temporal"))]
165            let properties = e
166                .properties
167                .into_iter()
168                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
169                .collect();
170
171            SnapshotEdge {
172                id: e.id,
173                src: e.src,
174                dst: e.dst,
175                edge_type: e.edge_type.to_string(),
176                properties,
177            }
178        })
179        .collect()
180}
181
182/// Populates a store from snapshot node/edge data.
183///
184/// With `temporal`: replays all `(epoch, value)` entries into version logs.
185/// Without: reads the latest value from each property's version list.
186fn populate_store_from_snapshot(
187    store: &grafeo_core::graph::lpg::LpgStore,
188    nodes: Vec<SnapshotNode>,
189    edges: Vec<SnapshotEdge>,
190) -> Result<()> {
191    for node in nodes {
192        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
193        store.create_node_with_id(node.id, &label_refs)?;
194        for (key, entries) in node.properties {
195            #[cfg(feature = "temporal")]
196            for (epoch, value) in entries {
197                store.set_node_property_at_epoch(node.id, &key, value, epoch);
198            }
199            #[cfg(not(feature = "temporal"))]
200            if let Some((_, value)) = entries.into_iter().last() {
201                store.set_node_property(node.id, &key, value);
202            }
203        }
204    }
205    for edge in edges {
206        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
207        for (key, entries) in edge.properties {
208            #[cfg(feature = "temporal")]
209            for (epoch, value) in entries {
210                store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
211            }
212            #[cfg(not(feature = "temporal"))]
213            if let Some((_, value)) = entries.into_iter().last() {
214                store.set_edge_property(edge.id, &key, value);
215            }
216        }
217    }
218    Ok(())
219}
220
221/// Validates snapshot nodes/edges for duplicates and dangling references.
222fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
223    let mut node_ids = HashSet::with_capacity(nodes.len());
224    for node in nodes {
225        if !node_ids.insert(node.id) {
226            return Err(Error::Internal(format!(
227                "snapshot contains duplicate node ID {}",
228                node.id
229            )));
230        }
231    }
232    let mut edge_ids = HashSet::with_capacity(edges.len());
233    for edge in edges {
234        if !edge_ids.insert(edge.id) {
235            return Err(Error::Internal(format!(
236                "snapshot contains duplicate edge ID {}",
237                edge.id
238            )));
239        }
240        if !node_ids.contains(&edge.src) {
241            return Err(Error::Internal(format!(
242                "snapshot edge {} references non-existent source node {}",
243                edge.id, edge.src
244            )));
245        }
246        if !node_ids.contains(&edge.dst) {
247            return Err(Error::Internal(format!(
248                "snapshot edge {} references non-existent destination node {}",
249                edge.id, edge.dst
250            )));
251        }
252    }
253    Ok(())
254}
255
256/// Collects all triples from an RDF store into snapshot format.
257#[cfg(feature = "rdf")]
258fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
259    store
260        .triples()
261        .into_iter()
262        .map(|t| SnapshotTriple {
263            subject: t.subject().to_string(),
264            predicate: t.predicate().to_string(),
265            object: t.object().to_string(),
266        })
267        .collect()
268}
269
270/// Populates an RDF store from snapshot triples.
271#[cfg(feature = "rdf")]
272fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
273    use grafeo_core::graph::rdf::{Term, Triple};
274    for triple in triples {
275        if let (Some(s), Some(p), Some(o)) = (
276            Term::from_ntriples(&triple.subject),
277            Term::from_ntriples(&triple.predicate),
278            Term::from_ntriples(&triple.object),
279        ) {
280            store.insert(Triple::new(s, p, o));
281        }
282    }
283}
284
285// =========================================================================
286// Snapshot deserialization helpers (used by single-file format)
287// =========================================================================
288
289/// Decodes snapshot bytes and populates a store and catalog.
290#[cfg(feature = "grafeo-file")]
291pub(super) fn load_snapshot_into_store(
292    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
293    catalog: &std::sync::Arc<crate::catalog::Catalog>,
294    #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
295    data: &[u8],
296) -> grafeo_common::utils::error::Result<()> {
297    use grafeo_common::utils::error::Error;
298
299    let config = bincode::config::standard();
300    let (snapshot, _) =
301        bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
302            Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
303        })?;
304
305    populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
306
307    // Restore epoch from snapshot (store-level only; TransactionManager
308    // sync is handled in with_config() after all recovery completes).
309    #[cfg(feature = "temporal")]
310    store.sync_epoch(EpochId::new(snapshot.epoch));
311
312    for graph in &snapshot.named_graphs {
313        store
314            .create_graph(&graph.name)
315            .map_err(|e| Error::Internal(e.to_string()))?;
316        if let Some(graph_store) = store.graph(&graph.name) {
317            populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
318            #[cfg(feature = "temporal")]
319            graph_store.sync_epoch(EpochId::new(snapshot.epoch));
320        }
321    }
322    restore_schema_from_snapshot(catalog, &snapshot.schema);
323
324    // Restore RDF triples
325    #[cfg(feature = "rdf")]
326    {
327        populate_rdf_store(rdf_store, &snapshot.rdf_triples);
328        for rdf_graph in &snapshot.rdf_named_graphs {
329            rdf_store.create_graph(&rdf_graph.name);
330            if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
331                populate_rdf_store(&graph_store, &rdf_graph.triples);
332            }
333        }
334    }
335
336    Ok(())
337}
338
339/// Populates a store from snapshot refs (borrowed, for single-file loading).
340#[cfg(feature = "grafeo-file")]
341fn populate_store_from_snapshot_ref(
342    store: &grafeo_core::graph::lpg::LpgStore,
343    nodes: &[SnapshotNode],
344    edges: &[SnapshotEdge],
345) -> grafeo_common::utils::error::Result<()> {
346    for node in nodes {
347        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
348        store.create_node_with_id(node.id, &label_refs)?;
349        for (key, entries) in &node.properties {
350            #[cfg(feature = "temporal")]
351            for (epoch, value) in entries {
352                store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
353            }
354            #[cfg(not(feature = "temporal"))]
355            if let Some((_, value)) = entries.last() {
356                store.set_node_property(node.id, key, value.clone());
357            }
358        }
359    }
360    for edge in edges {
361        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
362        for (key, entries) in &edge.properties {
363            #[cfg(feature = "temporal")]
364            for (epoch, value) in entries {
365                store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
366            }
367            #[cfg(not(feature = "temporal"))]
368            if let Some((_, value)) = entries.last() {
369                store.set_edge_property(edge.id, key, value.clone());
370            }
371        }
372    }
373    Ok(())
374}
375
376/// Restores schema definitions from a snapshot into the catalog.
377fn restore_schema_from_snapshot(
378    catalog: &std::sync::Arc<crate::catalog::Catalog>,
379    schema: &SnapshotSchema,
380) {
381    for def in &schema.node_types {
382        catalog.register_or_replace_node_type(def.clone());
383    }
384    for def in &schema.edge_types {
385        catalog.register_or_replace_edge_type_def(def.clone());
386    }
387    for def in &schema.graph_types {
388        let _ = catalog.register_graph_type(def.clone());
389    }
390    for def in &schema.procedures {
391        catalog.replace_procedure(def.clone()).ok();
392    }
393    for name in &schema.schemas {
394        let _ = catalog.register_schema_namespace(name.clone());
395    }
396    for (graph_name, type_name) in &schema.graph_type_bindings {
397        let _ = catalog.bind_graph_type(graph_name, type_name.clone());
398    }
399}
400
401/// Collects schema definitions from the catalog into snapshot format.
402fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
403    SnapshotSchema {
404        node_types: catalog.all_node_type_defs(),
405        edge_types: catalog.all_edge_type_defs(),
406        graph_types: catalog.all_graph_type_defs(),
407        procedures: catalog.all_procedure_defs(),
408        schemas: catalog.schema_names(),
409        graph_type_bindings: catalog.all_graph_type_bindings(),
410    }
411}
412
413/// Restores indexes from snapshot metadata by rebuilding them from existing data.
414///
415/// Must be called after all nodes/edges have been populated, since index
416/// creation scans existing data.
417fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
418    for name in &indexes.property_indexes {
419        db.store.create_property_index(name);
420    }
421
422    #[cfg(feature = "vector-index")]
423    for vi in &indexes.vector_indexes {
424        if let Err(err) = db.create_vector_index(
425            &vi.label,
426            &vi.property,
427            Some(vi.dimensions),
428            Some(vi.metric.name()),
429            Some(vi.m),
430            Some(vi.ef_construction),
431        ) {
432            grafeo_warn!(
433                "Failed to restore vector index :{label}({property}): {err}",
434                label = vi.label,
435                property = vi.property,
436            );
437        }
438    }
439
440    #[cfg(feature = "text-index")]
441    for ti in &indexes.text_indexes {
442        if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
443            grafeo_warn!(
444                "Failed to restore text index :{label}({property}): {err}",
445                label = ti.label,
446                property = ti.property,
447            );
448        }
449    }
450}
451
452/// Collects index metadata from a store into snapshot format.
453fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
454    let property_indexes = store.property_index_keys();
455
456    #[cfg(feature = "vector-index")]
457    let vector_indexes: Vec<SnapshotVectorIndex> = store
458        .vector_index_entries()
459        .into_iter()
460        .filter_map(|(key, index)| {
461            let (label, property) = key.split_once(':')?;
462            let config = index.config();
463            Some(SnapshotVectorIndex {
464                label: label.to_string(),
465                property: property.to_string(),
466                dimensions: config.dimensions,
467                metric: config.metric,
468                m: config.m,
469                ef_construction: config.ef_construction,
470            })
471        })
472        .collect();
473    #[cfg(not(feature = "vector-index"))]
474    let vector_indexes = Vec::new();
475
476    #[cfg(feature = "text-index")]
477    let text_indexes: Vec<SnapshotTextIndex> = store
478        .text_index_entries()
479        .into_iter()
480        .filter_map(|(key, _)| {
481            let (label, property) = key.split_once(':')?;
482            Some(SnapshotTextIndex {
483                label: label.to_string(),
484                property: property.to_string(),
485            })
486        })
487        .collect();
488    #[cfg(not(feature = "text-index"))]
489    let text_indexes = Vec::new();
490
491    SnapshotIndexes {
492        property_indexes,
493        vector_indexes,
494        text_indexes,
495    }
496}
497
498impl super::GrafeoDB {
499    // =========================================================================
500    // ADMIN API: Persistence Control
501    // =========================================================================
502
503    /// Saves the database to a file path.
504    ///
505    /// - If the path ends in `.grafeo`: creates a single-file database
506    /// - Otherwise: creates a WAL directory-backed database at the path
507    /// - If in-memory: creates a new persistent database at path
508    /// - If file-backed: creates a copy at the new path
509    ///
510    /// The original database remains unchanged.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if the save operation fails.
515    ///
516    /// Requires the `wal` feature for persistence support.
517    #[cfg(feature = "wal")]
518    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
519        let path = path.as_ref();
520
521        // Single-file format: export snapshot directly to a .grafeo file
522        #[cfg(feature = "grafeo-file")]
523        if path.extension().is_some_and(|ext| ext == "grafeo") {
524            return self.save_as_grafeo_file(path);
525        }
526
527        // Create target database with WAL enabled
528        let target_config = Config::persistent(path);
529        let target = Self::with_config(target_config)?;
530
531        // Copy all nodes using WAL-enabled methods
532        for node in self.store.all_nodes() {
533            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
534            target.store.create_node_with_id(node.id, &label_refs)?;
535
536            // Log to WAL
537            target.log_wal(&WalRecord::CreateNode {
538                id: node.id,
539                labels: node.labels.iter().map(|s| s.to_string()).collect(),
540            })?;
541
542            // Copy properties
543            for (key, value) in node.properties {
544                target
545                    .store
546                    .set_node_property(node.id, key.as_str(), value.clone());
547                target.log_wal(&WalRecord::SetNodeProperty {
548                    id: node.id,
549                    key: key.to_string(),
550                    value,
551                })?;
552            }
553        }
554
555        // Copy all edges using WAL-enabled methods
556        for edge in self.store.all_edges() {
557            target
558                .store
559                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
560
561            // Log to WAL
562            target.log_wal(&WalRecord::CreateEdge {
563                id: edge.id,
564                src: edge.src,
565                dst: edge.dst,
566                edge_type: edge.edge_type.to_string(),
567            })?;
568
569            // Copy properties
570            for (key, value) in edge.properties {
571                target
572                    .store
573                    .set_edge_property(edge.id, key.as_str(), value.clone());
574                target.log_wal(&WalRecord::SetEdgeProperty {
575                    id: edge.id,
576                    key: key.to_string(),
577                    value,
578                })?;
579            }
580        }
581
582        // Copy named graphs
583        for graph_name in self.store.graph_names() {
584            if let Some(src_graph) = self.store.graph(&graph_name) {
585                target.log_wal(&WalRecord::CreateNamedGraph {
586                    name: graph_name.clone(),
587                })?;
588                target
589                    .store
590                    .create_graph(&graph_name)
591                    .map_err(|e| Error::Internal(e.to_string()))?;
592
593                if let Some(dst_graph) = target.store.graph(&graph_name) {
594                    // Switch WAL context to this named graph
595                    target.log_wal(&WalRecord::SwitchGraph {
596                        name: Some(graph_name.clone()),
597                    })?;
598
599                    for node in src_graph.all_nodes() {
600                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
601                        dst_graph.create_node_with_id(node.id, &label_refs)?;
602                        target.log_wal(&WalRecord::CreateNode {
603                            id: node.id,
604                            labels: node.labels.iter().map(|s| s.to_string()).collect(),
605                        })?;
606                        for (key, value) in node.properties {
607                            dst_graph.set_node_property(node.id, key.as_str(), value.clone());
608                            target.log_wal(&WalRecord::SetNodeProperty {
609                                id: node.id,
610                                key: key.to_string(),
611                                value,
612                            })?;
613                        }
614                    }
615                    for edge in src_graph.all_edges() {
616                        dst_graph.create_edge_with_id(
617                            edge.id,
618                            edge.src,
619                            edge.dst,
620                            &edge.edge_type,
621                        )?;
622                        target.log_wal(&WalRecord::CreateEdge {
623                            id: edge.id,
624                            src: edge.src,
625                            dst: edge.dst,
626                            edge_type: edge.edge_type.to_string(),
627                        })?;
628                        for (key, value) in edge.properties {
629                            dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
630                            target.log_wal(&WalRecord::SetEdgeProperty {
631                                id: edge.id,
632                                key: key.to_string(),
633                                value,
634                            })?;
635                        }
636                    }
637                }
638            }
639        }
640
641        // Switch WAL context back to default graph
642        if !self.store.graph_names().is_empty() {
643            target.log_wal(&WalRecord::SwitchGraph { name: None })?;
644        }
645
646        // Copy RDF data with WAL logging
647        #[cfg(feature = "rdf")]
648        {
649            for triple in self.rdf_store.triples() {
650                let record = WalRecord::InsertRdfTriple {
651                    subject: triple.subject().to_string(),
652                    predicate: triple.predicate().to_string(),
653                    object: triple.object().to_string(),
654                    graph: None,
655                };
656                target.rdf_store.insert((*triple).clone());
657                target.log_wal(&record)?;
658            }
659            for name in self.rdf_store.graph_names() {
660                target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
661                if let Some(src_graph) = self.rdf_store.graph(&name) {
662                    let dst_graph = target.rdf_store.graph_or_create(&name);
663                    for triple in src_graph.triples() {
664                        let record = WalRecord::InsertRdfTriple {
665                            subject: triple.subject().to_string(),
666                            predicate: triple.predicate().to_string(),
667                            object: triple.object().to_string(),
668                            graph: Some(name.clone()),
669                        };
670                        dst_graph.insert((*triple).clone());
671                        target.log_wal(&record)?;
672                    }
673                }
674            }
675        }
676
677        // Checkpoint and close the target database
678        target.close()?;
679
680        Ok(())
681    }
682
683    /// Creates an in-memory copy of this database.
684    ///
685    /// Returns a new database that is completely independent, including
686    /// all named graph data.
687    /// Useful for:
688    /// Saves the database to a single `.grafeo` file.
689    #[cfg(feature = "grafeo-file")]
690    fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
691        use grafeo_adapters::storage::file::GrafeoFileManager;
692
693        let snapshot_data = self.export_snapshot()?;
694        let epoch = self.store.current_epoch();
695        let transaction_id = self
696            .transaction_manager
697            .last_assigned_transaction_id()
698            .map_or(0, |t| t.0);
699        let node_count = self.store.node_count() as u64;
700        let edge_count = self.store.edge_count() as u64;
701
702        let fm = GrafeoFileManager::create(path)?;
703        fm.write_snapshot(
704            &snapshot_data,
705            epoch.0,
706            transaction_id,
707            node_count,
708            edge_count,
709        )?;
710        Ok(())
711    }
712
713    /// - Testing modifications without affecting the original
714    /// - Faster operations when persistence isn't needed
715    ///
716    /// # Errors
717    ///
718    /// Returns an error if the copy operation fails.
719    pub fn to_memory(&self) -> Result<Self> {
720        let config = Config::in_memory();
721        let target = Self::with_config(config)?;
722
723        // Copy default graph nodes
724        for node in self.store.all_nodes() {
725            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
726            target.store.create_node_with_id(node.id, &label_refs)?;
727            for (key, value) in node.properties {
728                target.store.set_node_property(node.id, key.as_str(), value);
729            }
730        }
731
732        // Copy default graph edges
733        for edge in self.store.all_edges() {
734            target
735                .store
736                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
737            for (key, value) in edge.properties {
738                target.store.set_edge_property(edge.id, key.as_str(), value);
739            }
740        }
741
742        // Copy named graphs
743        for graph_name in self.store.graph_names() {
744            if let Some(src_graph) = self.store.graph(&graph_name) {
745                target
746                    .store
747                    .create_graph(&graph_name)
748                    .map_err(|e| Error::Internal(e.to_string()))?;
749                if let Some(dst_graph) = target.store.graph(&graph_name) {
750                    for node in src_graph.all_nodes() {
751                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
752                        dst_graph.create_node_with_id(node.id, &label_refs)?;
753                        for (key, value) in node.properties {
754                            dst_graph.set_node_property(node.id, key.as_str(), value);
755                        }
756                    }
757                    for edge in src_graph.all_edges() {
758                        dst_graph.create_edge_with_id(
759                            edge.id,
760                            edge.src,
761                            edge.dst,
762                            &edge.edge_type,
763                        )?;
764                        for (key, value) in edge.properties {
765                            dst_graph.set_edge_property(edge.id, key.as_str(), value);
766                        }
767                    }
768                }
769            }
770        }
771
772        // Copy RDF data
773        #[cfg(feature = "rdf")]
774        {
775            for triple in self.rdf_store.triples() {
776                target.rdf_store.insert((*triple).clone());
777            }
778            for name in self.rdf_store.graph_names() {
779                if let Some(src_graph) = self.rdf_store.graph(&name) {
780                    let dst_graph = target.rdf_store.graph_or_create(&name);
781                    for triple in src_graph.triples() {
782                        dst_graph.insert((*triple).clone());
783                    }
784                }
785            }
786        }
787
788        Ok(target)
789    }
790
791    /// Opens a database file and loads it entirely into memory.
792    ///
793    /// The returned database has no connection to the original file.
794    /// Changes will NOT be written back to the file.
795    ///
796    /// # Errors
797    ///
798    /// Returns an error if the file can't be opened or loaded.
799    #[cfg(feature = "wal")]
800    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
801        // Open the source database (triggers WAL recovery)
802        let source = Self::open(path)?;
803
804        // Create in-memory copy
805        let target = source.to_memory()?;
806
807        // Close the source (releases file handles)
808        source.close()?;
809
810        Ok(target)
811    }
812
813    // =========================================================================
814    // ADMIN API: Snapshot Export/Import
815    // =========================================================================
816
817    /// Exports the entire database to a binary snapshot.
818    ///
819    /// The returned bytes can be stored (e.g. in IndexedDB) and later
820    /// restored with [`import_snapshot()`](Self::import_snapshot).
821    /// Includes all named graph data.
822    ///
823    /// Properties are stored as version-history lists. When `temporal` is
824    /// enabled, the full history is captured. Otherwise, each property is
825    /// wrapped as a single-entry list at epoch 0.
826    ///
827    /// # Errors
828    ///
829    /// Returns an error if serialization fails.
830    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
831        let nodes = collect_snapshot_nodes(&self.store);
832        let edges = collect_snapshot_edges(&self.store);
833
834        // Collect named graphs
835        let named_graphs: Vec<NamedGraphSnapshot> = self
836            .store
837            .graph_names()
838            .into_iter()
839            .filter_map(|name| {
840                self.store
841                    .graph(&name)
842                    .map(|graph_store| NamedGraphSnapshot {
843                        name,
844                        nodes: collect_snapshot_nodes(&graph_store),
845                        edges: collect_snapshot_edges(&graph_store),
846                    })
847            })
848            .collect();
849
850        // Collect RDF triples
851        #[cfg(feature = "rdf")]
852        let rdf_triples = collect_rdf_triples(&self.rdf_store);
853        #[cfg(not(feature = "rdf"))]
854        let rdf_triples = Vec::new();
855
856        #[cfg(feature = "rdf")]
857        let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
858            .rdf_store
859            .graph_names()
860            .into_iter()
861            .filter_map(|name| {
862                self.rdf_store
863                    .graph(&name)
864                    .map(|graph| RdfNamedGraphSnapshot {
865                        name,
866                        triples: collect_rdf_triples(&graph),
867                    })
868            })
869            .collect();
870        #[cfg(not(feature = "rdf"))]
871        let rdf_named_graphs = Vec::new();
872
873        let schema = collect_schema(&self.catalog);
874        let indexes = collect_index_metadata(&self.store);
875
876        let snapshot = Snapshot {
877            version: SNAPSHOT_VERSION,
878            nodes,
879            edges,
880            named_graphs,
881            rdf_triples,
882            rdf_named_graphs,
883            schema,
884            indexes,
885            #[cfg(feature = "temporal")]
886            epoch: self.transaction_manager.current_epoch().as_u64(),
887            #[cfg(not(feature = "temporal"))]
888            epoch: 0,
889        };
890
891        let config = bincode::config::standard();
892        bincode::serde::encode_to_vec(&snapshot, config)
893            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
894    }
895
896    /// Creates a new in-memory database from a binary snapshot.
897    ///
898    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
899    ///
900    /// All edge references are validated before any data is inserted: every
901    /// edge's source and destination must reference a node present in the
902    /// snapshot, and duplicate node/edge IDs are rejected. If validation
903    /// fails, no database is created.
904    ///
905    /// # Errors
906    ///
907    /// Returns an error if the snapshot is invalid, contains dangling edge
908    /// references, has duplicate IDs, or deserialization fails.
909    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
910        if data.is_empty() {
911            return Err(Error::Internal("empty snapshot data".to_string()));
912        }
913
914        let version = data[0];
915        if version != 4 {
916            return Err(Error::Internal(format!(
917                "unsupported snapshot version: {version} (expected 4)"
918            )));
919        }
920
921        let config = bincode::config::standard();
922        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
923            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
924
925        // Validate default graph data
926        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
927
928        // Validate each named graph
929        for ng in &snapshot.named_graphs {
930            validate_snapshot_data(&ng.nodes, &ng.edges)?;
931        }
932
933        let db = Self::new_in_memory();
934        populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
935
936        // Restore epoch from snapshot
937        #[cfg(feature = "temporal")]
938        {
939            let epoch = EpochId::new(snapshot.epoch);
940            db.store.sync_epoch(epoch);
941            db.transaction_manager.sync_epoch(epoch);
942        }
943
944        // Capture epoch before moving snapshot fields
945        #[cfg(feature = "temporal")]
946        let snapshot_epoch = EpochId::new(snapshot.epoch);
947
948        // Restore named graphs
949        for ng in snapshot.named_graphs {
950            db.store
951                .create_graph(&ng.name)
952                .map_err(|e| Error::Internal(e.to_string()))?;
953            if let Some(graph_store) = db.store.graph(&ng.name) {
954                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
955                // Named graph stores need the same epoch so temporal property
956                // lookups via current_epoch() return the correct values.
957                #[cfg(feature = "temporal")]
958                graph_store.sync_epoch(snapshot_epoch);
959            }
960        }
961
962        // Restore RDF triples
963        #[cfg(feature = "rdf")]
964        {
965            populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
966            for rng in &snapshot.rdf_named_graphs {
967                let graph = db.rdf_store.graph_or_create(&rng.name);
968                populate_rdf_store(&graph, &rng.triples);
969            }
970        }
971
972        // Restore schema
973        restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
974
975        // Restore indexes (must come after data population)
976        restore_indexes_from_snapshot(&db, &snapshot.indexes);
977
978        Ok(db)
979    }
980
981    /// Replaces the current database contents with data from a binary snapshot.
982    ///
983    /// The `data` must have been produced by
984    /// [`export_snapshot()`](Self::export_snapshot).
985    ///
986    /// All validation (duplicate IDs, dangling edge references) is performed
987    /// before any data is modified. If validation fails, the current database
988    /// is left unchanged. If validation passes, the store is cleared and
989    /// rebuilt from the snapshot atomically (from the perspective of
990    /// subsequent queries).
991    ///
992    /// # Errors
993    ///
994    /// Returns an error if the snapshot is invalid, contains dangling edge
995    /// references, has duplicate IDs, or deserialization fails.
996    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
997        if data.is_empty() {
998            return Err(Error::Internal("empty snapshot data".to_string()));
999        }
1000
1001        let version = data[0];
1002        if version != 4 {
1003            return Err(Error::Internal(format!(
1004                "unsupported snapshot version: {version} (expected 4)"
1005            )));
1006        }
1007
1008        let config = bincode::config::standard();
1009        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1010            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1011
1012        // Validate all data before making any changes
1013        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1014        for ng in &snapshot.named_graphs {
1015            validate_snapshot_data(&ng.nodes, &ng.edges)?;
1016        }
1017
1018        // Drop all existing named graphs, then clear default store
1019        for name in self.store.graph_names() {
1020            self.store.drop_graph(&name);
1021        }
1022        self.store.clear();
1023
1024        populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
1025
1026        // Restore epoch from temporal snapshot
1027        #[cfg(feature = "temporal")]
1028        let snapshot_epoch = {
1029            let epoch = EpochId::new(snapshot.epoch);
1030            self.store.sync_epoch(epoch);
1031            self.transaction_manager.sync_epoch(epoch);
1032            epoch
1033        };
1034
1035        // Restore named graphs
1036        for ng in snapshot.named_graphs {
1037            self.store
1038                .create_graph(&ng.name)
1039                .map_err(|e| Error::Internal(e.to_string()))?;
1040            if let Some(graph_store) = self.store.graph(&ng.name) {
1041                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1042                #[cfg(feature = "temporal")]
1043                graph_store.sync_epoch(snapshot_epoch);
1044            }
1045        }
1046
1047        // Restore RDF data
1048        #[cfg(feature = "rdf")]
1049        {
1050            // Clear existing RDF data
1051            self.rdf_store.clear();
1052            for name in self.rdf_store.graph_names() {
1053                self.rdf_store.drop_graph(&name);
1054            }
1055            populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1056            for rng in &snapshot.rdf_named_graphs {
1057                let graph = self.rdf_store.graph_or_create(&rng.name);
1058                populate_rdf_store(&graph, &rng.triples);
1059            }
1060        }
1061
1062        // Restore schema
1063        restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
1064
1065        // Restore indexes (must come after data population)
1066        restore_indexes_from_snapshot(self, &snapshot.indexes);
1067
1068        Ok(())
1069    }
1070
1071    // =========================================================================
1072    // ADMIN API: Iteration
1073    // =========================================================================
1074
1075    /// Returns an iterator over all nodes in the database.
1076    ///
1077    /// Useful for dump/export operations.
1078    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1079        self.store.all_nodes()
1080    }
1081
1082    /// Returns an iterator over all edges in the database.
1083    ///
1084    /// Useful for dump/export operations.
1085    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1086        self.store.all_edges()
1087    }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use grafeo_common::types::{EdgeId, NodeId, Value};
1093
1094    use super::super::GrafeoDB;
1095    use super::{
1096        SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1097    };
1098
1099    #[test]
1100    fn test_restore_snapshot_basic() {
1101        let db = GrafeoDB::new_in_memory();
1102        let session = db.session();
1103
1104        // Populate
1105        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1106        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1107
1108        let snapshot = db.export_snapshot().unwrap();
1109
1110        // Modify
1111        session
1112            .execute("INSERT (:Person {name: 'Vincent'})")
1113            .unwrap();
1114        assert_eq!(db.store.node_count(), 3);
1115
1116        // Restore original
1117        db.restore_snapshot(&snapshot).unwrap();
1118
1119        assert_eq!(db.store.node_count(), 2);
1120        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1121        assert_eq!(result.rows.len(), 2);
1122    }
1123
1124    #[test]
1125    fn test_restore_snapshot_validation_failure() {
1126        let db = GrafeoDB::new_in_memory();
1127        let session = db.session();
1128
1129        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1130
1131        // Corrupt snapshot: just garbage bytes
1132        let result = db.restore_snapshot(b"garbage");
1133        assert!(result.is_err());
1134
1135        // DB should be unchanged
1136        assert_eq!(db.store.node_count(), 1);
1137    }
1138
1139    #[test]
1140    fn test_restore_snapshot_empty_db() {
1141        let db = GrafeoDB::new_in_memory();
1142
1143        // Export empty snapshot, then populate, then restore to empty
1144        let empty_snapshot = db.export_snapshot().unwrap();
1145
1146        let session = db.session();
1147        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1148        assert_eq!(db.store.node_count(), 1);
1149
1150        db.restore_snapshot(&empty_snapshot).unwrap();
1151        assert_eq!(db.store.node_count(), 0);
1152    }
1153
1154    #[test]
1155    fn test_restore_snapshot_with_edges() {
1156        let db = GrafeoDB::new_in_memory();
1157        let session = db.session();
1158
1159        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1160        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1161        session
1162            .execute(
1163                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1164            )
1165            .unwrap();
1166
1167        let snapshot = db.export_snapshot().unwrap();
1168        assert_eq!(db.store.edge_count(), 1);
1169
1170        // Modify: add more data
1171        session
1172            .execute("INSERT (:Person {name: 'Vincent'})")
1173            .unwrap();
1174
1175        // Restore
1176        db.restore_snapshot(&snapshot).unwrap();
1177        assert_eq!(db.store.node_count(), 2);
1178        assert_eq!(db.store.edge_count(), 1);
1179    }
1180
1181    #[test]
1182    fn test_restore_snapshot_preserves_sessions() {
1183        let db = GrafeoDB::new_in_memory();
1184        let session = db.session();
1185
1186        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1187        let snapshot = db.export_snapshot().unwrap();
1188
1189        // Modify
1190        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1191
1192        // Restore
1193        db.restore_snapshot(&snapshot).unwrap();
1194
1195        // Session should still work and see restored data
1196        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1197        assert_eq!(result.rows.len(), 1);
1198    }
1199
1200    #[test]
1201    fn test_export_import_roundtrip() {
1202        let db = GrafeoDB::new_in_memory();
1203        let session = db.session();
1204
1205        session
1206            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1207            .unwrap();
1208
1209        let snapshot = db.export_snapshot().unwrap();
1210        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1211        let session2 = db2.session();
1212
1213        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1214        assert_eq!(result.rows.len(), 1);
1215    }
1216
1217    // --- to_memory() ---
1218
1219    #[test]
1220    fn test_to_memory_empty() {
1221        let db = GrafeoDB::new_in_memory();
1222        let copy = db.to_memory().unwrap();
1223        assert_eq!(copy.store.node_count(), 0);
1224        assert_eq!(copy.store.edge_count(), 0);
1225    }
1226
1227    #[test]
1228    fn test_to_memory_copies_nodes_and_properties() {
1229        let db = GrafeoDB::new_in_memory();
1230        let session = db.session();
1231        session
1232            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1233            .unwrap();
1234        session
1235            .execute("INSERT (:Person {name: 'Gus', age: 25})")
1236            .unwrap();
1237
1238        let copy = db.to_memory().unwrap();
1239        assert_eq!(copy.store.node_count(), 2);
1240
1241        let s2 = copy.session();
1242        let result = s2
1243            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1244            .unwrap();
1245        assert_eq!(result.rows.len(), 2);
1246        assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1247        assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1248    }
1249
1250    #[test]
1251    fn test_to_memory_copies_edges_and_properties() {
1252        let db = GrafeoDB::new_in_memory();
1253        let a = db.create_node(&["Person"]);
1254        db.set_node_property(a, "name", "Alix".into());
1255        let b = db.create_node(&["Person"]);
1256        db.set_node_property(b, "name", "Gus".into());
1257        let edge = db.create_edge(a, b, "KNOWS");
1258        db.set_edge_property(edge, "since", Value::Int64(2020));
1259
1260        let copy = db.to_memory().unwrap();
1261        assert_eq!(copy.store.node_count(), 2);
1262        assert_eq!(copy.store.edge_count(), 1);
1263
1264        let s2 = copy.session();
1265        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1266        assert_eq!(result.rows[0][0], Value::Int64(2020));
1267    }
1268
1269    #[test]
1270    fn test_to_memory_is_independent() {
1271        let db = GrafeoDB::new_in_memory();
1272        let session = db.session();
1273        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1274
1275        let copy = db.to_memory().unwrap();
1276
1277        // Mutating original should not affect copy
1278        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1279        assert_eq!(db.store.node_count(), 2);
1280        assert_eq!(copy.store.node_count(), 1);
1281    }
1282
1283    // --- iter_nodes() / iter_edges() ---
1284
1285    #[test]
1286    fn test_iter_nodes_empty() {
1287        let db = GrafeoDB::new_in_memory();
1288        assert_eq!(db.iter_nodes().count(), 0);
1289    }
1290
1291    #[test]
1292    fn test_iter_nodes_returns_all() {
1293        let db = GrafeoDB::new_in_memory();
1294        let id1 = db.create_node(&["Person"]);
1295        db.set_node_property(id1, "name", "Alix".into());
1296        let id2 = db.create_node(&["Animal"]);
1297        db.set_node_property(id2, "name", "Fido".into());
1298
1299        let nodes: Vec<_> = db.iter_nodes().collect();
1300        assert_eq!(nodes.len(), 2);
1301
1302        let names: Vec<_> = nodes
1303            .iter()
1304            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1305            .map(|(_, v)| v.clone())
1306            .collect();
1307        assert!(names.contains(&Value::String("Alix".into())));
1308        assert!(names.contains(&Value::String("Fido".into())));
1309    }
1310
1311    #[test]
1312    fn test_iter_edges_empty() {
1313        let db = GrafeoDB::new_in_memory();
1314        assert_eq!(db.iter_edges().count(), 0);
1315    }
1316
1317    #[test]
1318    fn test_iter_edges_returns_all() {
1319        let db = GrafeoDB::new_in_memory();
1320        let a = db.create_node(&["A"]);
1321        let b = db.create_node(&["B"]);
1322        let c = db.create_node(&["C"]);
1323        db.create_edge(a, b, "R1");
1324        db.create_edge(b, c, "R2");
1325
1326        let edges: Vec<_> = db.iter_edges().collect();
1327        assert_eq!(edges.len(), 2);
1328
1329        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1330        assert!(types.contains(&"R1"));
1331        assert!(types.contains(&"R2"));
1332    }
1333
1334    // --- restore_snapshot() validation ---
1335
1336    fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1337        let snap = Snapshot {
1338            version,
1339            nodes,
1340            edges,
1341            named_graphs: vec![],
1342            rdf_triples: vec![],
1343            rdf_named_graphs: vec![],
1344            schema: SnapshotSchema::default(),
1345            indexes: SnapshotIndexes::default(),
1346            epoch: 0,
1347        };
1348        bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1349    }
1350
1351    #[test]
1352    fn test_restore_rejects_unsupported_version() {
1353        let db = GrafeoDB::new_in_memory();
1354        let session = db.session();
1355        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1356
1357        let bytes = make_snapshot(99, vec![], vec![]);
1358
1359        let result = db.restore_snapshot(&bytes);
1360        assert!(result.is_err());
1361        let err = result.unwrap_err().to_string();
1362        assert!(err.contains("unsupported snapshot version"), "got: {err}");
1363
1364        // DB unchanged
1365        assert_eq!(db.store.node_count(), 1);
1366    }
1367
1368    #[test]
1369    fn test_restore_rejects_duplicate_node_ids() {
1370        let db = GrafeoDB::new_in_memory();
1371        let session = db.session();
1372        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1373
1374        let bytes = make_snapshot(
1375            SNAPSHOT_VERSION,
1376            vec![
1377                SnapshotNode {
1378                    id: NodeId::new(0),
1379                    labels: vec!["A".into()],
1380                    properties: vec![],
1381                },
1382                SnapshotNode {
1383                    id: NodeId::new(0),
1384                    labels: vec!["B".into()],
1385                    properties: vec![],
1386                },
1387            ],
1388            vec![],
1389        );
1390
1391        let result = db.restore_snapshot(&bytes);
1392        assert!(result.is_err());
1393        let err = result.unwrap_err().to_string();
1394        assert!(err.contains("duplicate node ID"), "got: {err}");
1395        assert_eq!(db.store.node_count(), 1);
1396    }
1397
1398    #[test]
1399    fn test_restore_rejects_duplicate_edge_ids() {
1400        let db = GrafeoDB::new_in_memory();
1401
1402        let bytes = make_snapshot(
1403            SNAPSHOT_VERSION,
1404            vec![
1405                SnapshotNode {
1406                    id: NodeId::new(0),
1407                    labels: vec![],
1408                    properties: vec![],
1409                },
1410                SnapshotNode {
1411                    id: NodeId::new(1),
1412                    labels: vec![],
1413                    properties: vec![],
1414                },
1415            ],
1416            vec![
1417                SnapshotEdge {
1418                    id: EdgeId::new(0),
1419                    src: NodeId::new(0),
1420                    dst: NodeId::new(1),
1421                    edge_type: "REL".into(),
1422                    properties: vec![],
1423                },
1424                SnapshotEdge {
1425                    id: EdgeId::new(0),
1426                    src: NodeId::new(0),
1427                    dst: NodeId::new(1),
1428                    edge_type: "REL".into(),
1429                    properties: vec![],
1430                },
1431            ],
1432        );
1433
1434        let result = db.restore_snapshot(&bytes);
1435        assert!(result.is_err());
1436        let err = result.unwrap_err().to_string();
1437        assert!(err.contains("duplicate edge ID"), "got: {err}");
1438    }
1439
1440    #[test]
1441    fn test_restore_rejects_dangling_source() {
1442        let db = GrafeoDB::new_in_memory();
1443
1444        let bytes = make_snapshot(
1445            SNAPSHOT_VERSION,
1446            vec![SnapshotNode {
1447                id: NodeId::new(0),
1448                labels: vec![],
1449                properties: vec![],
1450            }],
1451            vec![SnapshotEdge {
1452                id: EdgeId::new(0),
1453                src: NodeId::new(999),
1454                dst: NodeId::new(0),
1455                edge_type: "REL".into(),
1456                properties: vec![],
1457            }],
1458        );
1459
1460        let result = db.restore_snapshot(&bytes);
1461        assert!(result.is_err());
1462        let err = result.unwrap_err().to_string();
1463        assert!(err.contains("non-existent source node"), "got: {err}");
1464    }
1465
1466    #[test]
1467    fn test_restore_rejects_dangling_destination() {
1468        let db = GrafeoDB::new_in_memory();
1469
1470        let bytes = make_snapshot(
1471            SNAPSHOT_VERSION,
1472            vec![SnapshotNode {
1473                id: NodeId::new(0),
1474                labels: vec![],
1475                properties: vec![],
1476            }],
1477            vec![SnapshotEdge {
1478                id: EdgeId::new(0),
1479                src: NodeId::new(0),
1480                dst: NodeId::new(999),
1481                edge_type: "REL".into(),
1482                properties: vec![],
1483            }],
1484        );
1485
1486        let result = db.restore_snapshot(&bytes);
1487        assert!(result.is_err());
1488        let err = result.unwrap_err().to_string();
1489        assert!(err.contains("non-existent destination node"), "got: {err}");
1490    }
1491
1492    // --- index metadata roundtrip ---
1493
1494    #[test]
1495    fn test_snapshot_roundtrip_property_index() {
1496        let db = GrafeoDB::new_in_memory();
1497        let session = db.session();
1498
1499        session
1500            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1501            .unwrap();
1502        db.create_property_index("email");
1503        assert!(db.has_property_index("email"));
1504
1505        let snapshot = db.export_snapshot().unwrap();
1506        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1507
1508        assert!(db2.has_property_index("email"));
1509
1510        // Verify the index actually works for O(1) lookups
1511        let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1512        assert_eq!(found.len(), 1);
1513    }
1514
1515    #[cfg(feature = "vector-index")]
1516    #[test]
1517    fn test_snapshot_roundtrip_vector_index() {
1518        use std::sync::Arc;
1519
1520        let db = GrafeoDB::new_in_memory();
1521
1522        let n1 = db.create_node(&["Doc"]);
1523        db.set_node_property(
1524            n1,
1525            "embedding",
1526            Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1527        );
1528        let n2 = db.create_node(&["Doc"]);
1529        db.set_node_property(
1530            n2,
1531            "embedding",
1532            Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1533        );
1534
1535        db.create_vector_index("Doc", "embedding", None, Some("cosine"), Some(4), Some(32))
1536            .unwrap();
1537
1538        let snapshot = db.export_snapshot().unwrap();
1539        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1540
1541        // Vector search should work on the restored database
1542        let results = db2
1543            .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1544            .unwrap();
1545        assert_eq!(results.len(), 2);
1546        // Closest to [1,0,0] should be n1
1547        assert_eq!(results[0].0, n1);
1548    }
1549
1550    #[cfg(feature = "text-index")]
1551    #[test]
1552    fn test_snapshot_roundtrip_text_index() {
1553        let db = GrafeoDB::new_in_memory();
1554
1555        let n1 = db.create_node(&["Article"]);
1556        db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1557        let n2 = db.create_node(&["Article"]);
1558        db.set_node_property(n2, "body", Value::String("python web framework".into()));
1559
1560        db.create_text_index("Article", "body").unwrap();
1561
1562        let snapshot = db.export_snapshot().unwrap();
1563        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1564
1565        // Text search should work on the restored database
1566        let results = db2
1567            .text_search("Article", "body", "graph database", 10)
1568            .unwrap();
1569        assert_eq!(results.len(), 1);
1570        assert_eq!(results[0].0, n1);
1571    }
1572
1573    #[test]
1574    fn test_snapshot_roundtrip_property_index_via_restore() {
1575        let db = GrafeoDB::new_in_memory();
1576        let session = db.session();
1577
1578        session
1579            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1580            .unwrap();
1581        db.create_property_index("email");
1582
1583        let snapshot = db.export_snapshot().unwrap();
1584
1585        // Mutate the database
1586        session
1587            .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1588            .unwrap();
1589        db.drop_property_index("email");
1590        assert!(!db.has_property_index("email"));
1591
1592        // Restore should bring back the index
1593        db.restore_snapshot(&snapshot).unwrap();
1594        assert!(db.has_property_index("email"));
1595    }
1596}