Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, EpochId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15use crate::catalog::{
16    EdgeTypeDefinition, GraphTypeDefinition, NodeTypeDefinition, ProcedureDefinition,
17};
18
19/// Current snapshot version.
20const SNAPSHOT_VERSION: u8 = 4;
21
22/// Binary snapshot format (v4: graph data, named graphs, RDF, schema, index metadata,
23/// and property version history for temporal support).
24#[derive(serde::Serialize, serde::Deserialize)]
25struct Snapshot {
26    version: u8,
27    nodes: Vec<SnapshotNode>,
28    edges: Vec<SnapshotEdge>,
29    named_graphs: Vec<NamedGraphSnapshot>,
30    rdf_triples: Vec<SnapshotTriple>,
31    rdf_named_graphs: Vec<RdfNamedGraphSnapshot>,
32    schema: SnapshotSchema,
33    indexes: SnapshotIndexes,
34    /// Current store epoch at snapshot time (0 when temporal is disabled).
35    epoch: u64,
36}
37
38/// Schema metadata within a snapshot.
39#[derive(serde::Serialize, serde::Deserialize, Default)]
40struct SnapshotSchema {
41    node_types: Vec<NodeTypeDefinition>,
42    edge_types: Vec<EdgeTypeDefinition>,
43    graph_types: Vec<GraphTypeDefinition>,
44    procedures: Vec<ProcedureDefinition>,
45    schemas: Vec<String>,
46    graph_type_bindings: Vec<(String, String)>,
47}
48
49/// Index metadata within a snapshot (definitions only, not index data).
50#[derive(serde::Serialize, serde::Deserialize, Default)]
51struct SnapshotIndexes {
52    property_indexes: Vec<String>,
53    vector_indexes: Vec<SnapshotVectorIndex>,
54    text_indexes: Vec<SnapshotTextIndex>,
55}
56
57/// Vector index definition for snapshot persistence.
58#[derive(serde::Serialize, serde::Deserialize)]
59struct SnapshotVectorIndex {
60    label: String,
61    property: String,
62    dimensions: usize,
63    metric: grafeo_core::index::vector::DistanceMetric,
64    m: usize,
65    ef_construction: usize,
66}
67
68/// Text index definition for snapshot persistence.
69#[derive(serde::Serialize, serde::Deserialize)]
70struct SnapshotTextIndex {
71    label: String,
72    property: String,
73}
74
75/// A named graph partition within a v2 snapshot.
76#[derive(serde::Serialize, serde::Deserialize)]
77struct NamedGraphSnapshot {
78    name: String,
79    nodes: Vec<SnapshotNode>,
80    edges: Vec<SnapshotEdge>,
81}
82
83/// An RDF triple in snapshot format (N-Triples encoded terms).
84#[derive(serde::Serialize, serde::Deserialize)]
85struct SnapshotTriple {
86    subject: String,
87    predicate: String,
88    object: String,
89}
90
91/// An RDF named graph in snapshot format.
92#[derive(serde::Serialize, serde::Deserialize)]
93struct RdfNamedGraphSnapshot {
94    name: String,
95    triples: Vec<SnapshotTriple>,
96}
97
98#[derive(serde::Serialize, serde::Deserialize)]
99struct SnapshotNode {
100    id: NodeId,
101    labels: Vec<String>,
102    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
103    properties: Vec<(String, Vec<(EpochId, Value)>)>,
104}
105
106#[derive(serde::Serialize, serde::Deserialize)]
107struct SnapshotEdge {
108    id: EdgeId,
109    src: NodeId,
110    dst: NodeId,
111    edge_type: String,
112    /// Each property has a list of `(epoch, value)` entries (ascending epoch order).
113    properties: Vec<(String, Vec<(EpochId, Value)>)>,
114}
115
116/// Collects all nodes from a store into snapshot format.
117///
118/// With `temporal`: stores full property version history.
119/// Without: wraps each current value as a single-entry version list at epoch 0.
120fn collect_snapshot_nodes(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotNode> {
121    store
122        .all_nodes()
123        .map(|n| {
124            #[cfg(feature = "temporal")]
125            let properties = store
126                .node_property_history(n.id)
127                .into_iter()
128                .map(|(k, entries)| (k.to_string(), entries))
129                .collect();
130
131            #[cfg(not(feature = "temporal"))]
132            let properties = n
133                .properties
134                .into_iter()
135                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
136                .collect();
137
138            SnapshotNode {
139                id: n.id,
140                labels: n.labels.iter().map(|l| l.to_string()).collect(),
141                properties,
142            }
143        })
144        .collect()
145}
146
147/// Collects all edges from a store into snapshot format.
148///
149/// With `temporal`: stores full property version history.
150/// Without: wraps each current value as a single-entry version list at epoch 0.
151fn collect_snapshot_edges(store: &grafeo_core::graph::lpg::LpgStore) -> Vec<SnapshotEdge> {
152    store
153        .all_edges()
154        .map(|e| {
155            #[cfg(feature = "temporal")]
156            let properties = store
157                .edge_property_history(e.id)
158                .into_iter()
159                .map(|(k, entries)| (k.to_string(), entries))
160                .collect();
161
162            #[cfg(not(feature = "temporal"))]
163            let properties = e
164                .properties
165                .into_iter()
166                .map(|(k, v)| (k.to_string(), vec![(EpochId::new(0), v)]))
167                .collect();
168
169            SnapshotEdge {
170                id: e.id,
171                src: e.src,
172                dst: e.dst,
173                edge_type: e.edge_type.to_string(),
174                properties,
175            }
176        })
177        .collect()
178}
179
180/// Populates a store from snapshot node/edge data.
181///
182/// With `temporal`: replays all `(epoch, value)` entries into version logs.
183/// Without: reads the latest value from each property's version list.
184fn populate_store_from_snapshot(
185    store: &grafeo_core::graph::lpg::LpgStore,
186    nodes: Vec<SnapshotNode>,
187    edges: Vec<SnapshotEdge>,
188) -> Result<()> {
189    for node in nodes {
190        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
191        store.create_node_with_id(node.id, &label_refs)?;
192        for (key, entries) in node.properties {
193            #[cfg(feature = "temporal")]
194            for (epoch, value) in entries {
195                store.set_node_property_at_epoch(node.id, &key, value, epoch);
196            }
197            #[cfg(not(feature = "temporal"))]
198            if let Some((_, value)) = entries.into_iter().last() {
199                store.set_node_property(node.id, &key, value);
200            }
201        }
202    }
203    for edge in edges {
204        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
205        for (key, entries) in edge.properties {
206            #[cfg(feature = "temporal")]
207            for (epoch, value) in entries {
208                store.set_edge_property_at_epoch(edge.id, &key, value, epoch);
209            }
210            #[cfg(not(feature = "temporal"))]
211            if let Some((_, value)) = entries.into_iter().last() {
212                store.set_edge_property(edge.id, &key, value);
213            }
214        }
215    }
216    Ok(())
217}
218
219/// Validates snapshot nodes/edges for duplicates and dangling references.
220fn validate_snapshot_data(nodes: &[SnapshotNode], edges: &[SnapshotEdge]) -> Result<()> {
221    let mut node_ids = HashSet::with_capacity(nodes.len());
222    for node in nodes {
223        if !node_ids.insert(node.id) {
224            return Err(Error::Internal(format!(
225                "snapshot contains duplicate node ID {}",
226                node.id
227            )));
228        }
229    }
230    let mut edge_ids = HashSet::with_capacity(edges.len());
231    for edge in edges {
232        if !edge_ids.insert(edge.id) {
233            return Err(Error::Internal(format!(
234                "snapshot contains duplicate edge ID {}",
235                edge.id
236            )));
237        }
238        if !node_ids.contains(&edge.src) {
239            return Err(Error::Internal(format!(
240                "snapshot edge {} references non-existent source node {}",
241                edge.id, edge.src
242            )));
243        }
244        if !node_ids.contains(&edge.dst) {
245            return Err(Error::Internal(format!(
246                "snapshot edge {} references non-existent destination node {}",
247                edge.id, edge.dst
248            )));
249        }
250    }
251    Ok(())
252}
253
254/// Collects all triples from an RDF store into snapshot format.
255#[cfg(feature = "rdf")]
256fn collect_rdf_triples(store: &grafeo_core::graph::rdf::RdfStore) -> Vec<SnapshotTriple> {
257    store
258        .triples()
259        .into_iter()
260        .map(|t| SnapshotTriple {
261            subject: t.subject().to_string(),
262            predicate: t.predicate().to_string(),
263            object: t.object().to_string(),
264        })
265        .collect()
266}
267
268/// Populates an RDF store from snapshot triples.
269#[cfg(feature = "rdf")]
270fn populate_rdf_store(store: &grafeo_core::graph::rdf::RdfStore, triples: &[SnapshotTriple]) {
271    use grafeo_core::graph::rdf::{Term, Triple};
272    for triple in triples {
273        if let (Some(s), Some(p), Some(o)) = (
274            Term::from_ntriples(&triple.subject),
275            Term::from_ntriples(&triple.predicate),
276            Term::from_ntriples(&triple.object),
277        ) {
278            store.insert(Triple::new(s, p, o));
279        }
280    }
281}
282
283// =========================================================================
284// Snapshot deserialization helpers (used by single-file format)
285// =========================================================================
286
287/// Decodes snapshot bytes and populates a store and catalog.
288#[cfg(feature = "grafeo-file")]
289pub(super) fn load_snapshot_into_store(
290    store: &std::sync::Arc<grafeo_core::graph::lpg::LpgStore>,
291    catalog: &std::sync::Arc<crate::catalog::Catalog>,
292    #[cfg(feature = "rdf")] rdf_store: &std::sync::Arc<grafeo_core::graph::rdf::RdfStore>,
293    data: &[u8],
294) -> grafeo_common::utils::error::Result<()> {
295    use grafeo_common::utils::error::Error;
296
297    let config = bincode::config::standard();
298    let (snapshot, _) =
299        bincode::serde::decode_from_slice::<Snapshot, _>(data, config).map_err(|e| {
300            Error::Serialization(format!("failed to decode snapshot from .grafeo file: {e}"))
301        })?;
302
303    populate_store_from_snapshot_ref(store, &snapshot.nodes, &snapshot.edges)?;
304
305    // Restore epoch from snapshot (store-level only; TransactionManager
306    // sync is handled in with_config() after all recovery completes).
307    #[cfg(feature = "temporal")]
308    store.sync_epoch(EpochId::new(snapshot.epoch));
309
310    for graph in &snapshot.named_graphs {
311        store
312            .create_graph(&graph.name)
313            .map_err(|e| Error::Internal(e.to_string()))?;
314        if let Some(graph_store) = store.graph(&graph.name) {
315            populate_store_from_snapshot_ref(&graph_store, &graph.nodes, &graph.edges)?;
316            #[cfg(feature = "temporal")]
317            graph_store.sync_epoch(EpochId::new(snapshot.epoch));
318        }
319    }
320    restore_schema_from_snapshot(catalog, &snapshot.schema);
321
322    // Restore RDF triples
323    #[cfg(feature = "rdf")]
324    {
325        populate_rdf_store(rdf_store, &snapshot.rdf_triples);
326        for rdf_graph in &snapshot.rdf_named_graphs {
327            rdf_store.create_graph(&rdf_graph.name);
328            if let Some(graph_store) = rdf_store.graph(&rdf_graph.name) {
329                populate_rdf_store(&graph_store, &rdf_graph.triples);
330            }
331        }
332    }
333
334    Ok(())
335}
336
337/// Populates a store from snapshot refs (borrowed, for single-file loading).
338#[cfg(feature = "grafeo-file")]
339fn populate_store_from_snapshot_ref(
340    store: &grafeo_core::graph::lpg::LpgStore,
341    nodes: &[SnapshotNode],
342    edges: &[SnapshotEdge],
343) -> grafeo_common::utils::error::Result<()> {
344    for node in nodes {
345        let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
346        store.create_node_with_id(node.id, &label_refs)?;
347        for (key, entries) in &node.properties {
348            #[cfg(feature = "temporal")]
349            for (epoch, value) in entries {
350                store.set_node_property_at_epoch(node.id, key, value.clone(), *epoch);
351            }
352            #[cfg(not(feature = "temporal"))]
353            if let Some((_, value)) = entries.last() {
354                store.set_node_property(node.id, key, value.clone());
355            }
356        }
357    }
358    for edge in edges {
359        store.create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
360        for (key, entries) in &edge.properties {
361            #[cfg(feature = "temporal")]
362            for (epoch, value) in entries {
363                store.set_edge_property_at_epoch(edge.id, key, value.clone(), *epoch);
364            }
365            #[cfg(not(feature = "temporal"))]
366            if let Some((_, value)) = entries.last() {
367                store.set_edge_property(edge.id, key, value.clone());
368            }
369        }
370    }
371    Ok(())
372}
373
374/// Restores schema definitions from a snapshot into the catalog.
375fn restore_schema_from_snapshot(
376    catalog: &std::sync::Arc<crate::catalog::Catalog>,
377    schema: &SnapshotSchema,
378) {
379    for def in &schema.node_types {
380        catalog.register_or_replace_node_type(def.clone());
381    }
382    for def in &schema.edge_types {
383        catalog.register_or_replace_edge_type_def(def.clone());
384    }
385    for def in &schema.graph_types {
386        let _ = catalog.register_graph_type(def.clone());
387    }
388    for def in &schema.procedures {
389        catalog.replace_procedure(def.clone()).ok();
390    }
391    for name in &schema.schemas {
392        let _ = catalog.register_schema_namespace(name.clone());
393    }
394    for (graph_name, type_name) in &schema.graph_type_bindings {
395        let _ = catalog.bind_graph_type(graph_name, type_name.clone());
396    }
397}
398
399/// Collects schema definitions from the catalog into snapshot format.
400fn collect_schema(catalog: &std::sync::Arc<crate::catalog::Catalog>) -> SnapshotSchema {
401    SnapshotSchema {
402        node_types: catalog.all_node_type_defs(),
403        edge_types: catalog.all_edge_type_defs(),
404        graph_types: catalog.all_graph_type_defs(),
405        procedures: catalog.all_procedure_defs(),
406        schemas: catalog.schema_names(),
407        graph_type_bindings: catalog.all_graph_type_bindings(),
408    }
409}
410
411/// Restores indexes from snapshot metadata by rebuilding them from existing data.
412///
413/// Must be called after all nodes/edges have been populated, since index
414/// creation scans existing data.
415fn restore_indexes_from_snapshot(db: &super::GrafeoDB, indexes: &SnapshotIndexes) {
416    for name in &indexes.property_indexes {
417        db.store.create_property_index(name);
418    }
419
420    #[cfg(feature = "vector-index")]
421    for vi in &indexes.vector_indexes {
422        if let Err(err) = db.create_vector_index(
423            &vi.label,
424            &vi.property,
425            Some(vi.dimensions),
426            Some(vi.metric.name()),
427            Some(vi.m),
428            Some(vi.ef_construction),
429        ) {
430            tracing::warn!(
431                "Failed to restore vector index :{label}({property}): {err}",
432                label = vi.label,
433                property = vi.property,
434            );
435        }
436    }
437
438    #[cfg(feature = "text-index")]
439    for ti in &indexes.text_indexes {
440        if let Err(err) = db.create_text_index(&ti.label, &ti.property) {
441            tracing::warn!(
442                "Failed to restore text index :{label}({property}): {err}",
443                label = ti.label,
444                property = ti.property,
445            );
446        }
447    }
448}
449
450/// Collects index metadata from a store into snapshot format.
451fn collect_index_metadata(store: &grafeo_core::graph::lpg::LpgStore) -> SnapshotIndexes {
452    let property_indexes = store.property_index_keys();
453
454    #[cfg(feature = "vector-index")]
455    let vector_indexes: Vec<SnapshotVectorIndex> = store
456        .vector_index_entries()
457        .into_iter()
458        .filter_map(|(key, index)| {
459            let (label, property) = key.split_once(':')?;
460            let config = index.config();
461            Some(SnapshotVectorIndex {
462                label: label.to_string(),
463                property: property.to_string(),
464                dimensions: config.dimensions,
465                metric: config.metric,
466                m: config.m,
467                ef_construction: config.ef_construction,
468            })
469        })
470        .collect();
471    #[cfg(not(feature = "vector-index"))]
472    let vector_indexes = Vec::new();
473
474    #[cfg(feature = "text-index")]
475    let text_indexes: Vec<SnapshotTextIndex> = store
476        .text_index_entries()
477        .into_iter()
478        .filter_map(|(key, _)| {
479            let (label, property) = key.split_once(':')?;
480            Some(SnapshotTextIndex {
481                label: label.to_string(),
482                property: property.to_string(),
483            })
484        })
485        .collect();
486    #[cfg(not(feature = "text-index"))]
487    let text_indexes = Vec::new();
488
489    SnapshotIndexes {
490        property_indexes,
491        vector_indexes,
492        text_indexes,
493    }
494}
495
496impl super::GrafeoDB {
497    // =========================================================================
498    // ADMIN API: Persistence Control
499    // =========================================================================
500
501    /// Saves the database to a file path.
502    ///
503    /// - If the path ends in `.grafeo`: creates a single-file database
504    /// - Otherwise: creates a WAL directory-backed database at the path
505    /// - If in-memory: creates a new persistent database at path
506    /// - If file-backed: creates a copy at the new path
507    ///
508    /// The original database remains unchanged.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if the save operation fails.
513    ///
514    /// Requires the `wal` feature for persistence support.
515    #[cfg(feature = "wal")]
516    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
517        let path = path.as_ref();
518
519        // Single-file format: export snapshot directly to a .grafeo file
520        #[cfg(feature = "grafeo-file")]
521        if path.extension().is_some_and(|ext| ext == "grafeo") {
522            return self.save_as_grafeo_file(path);
523        }
524
525        // Create target database with WAL enabled
526        let target_config = Config::persistent(path);
527        let target = Self::with_config(target_config)?;
528
529        // Copy all nodes using WAL-enabled methods
530        for node in self.store.all_nodes() {
531            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
532            target.store.create_node_with_id(node.id, &label_refs)?;
533
534            // Log to WAL
535            target.log_wal(&WalRecord::CreateNode {
536                id: node.id,
537                labels: node.labels.iter().map(|s| s.to_string()).collect(),
538            })?;
539
540            // Copy properties
541            for (key, value) in node.properties {
542                target
543                    .store
544                    .set_node_property(node.id, key.as_str(), value.clone());
545                target.log_wal(&WalRecord::SetNodeProperty {
546                    id: node.id,
547                    key: key.to_string(),
548                    value,
549                })?;
550            }
551        }
552
553        // Copy all edges using WAL-enabled methods
554        for edge in self.store.all_edges() {
555            target
556                .store
557                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
558
559            // Log to WAL
560            target.log_wal(&WalRecord::CreateEdge {
561                id: edge.id,
562                src: edge.src,
563                dst: edge.dst,
564                edge_type: edge.edge_type.to_string(),
565            })?;
566
567            // Copy properties
568            for (key, value) in edge.properties {
569                target
570                    .store
571                    .set_edge_property(edge.id, key.as_str(), value.clone());
572                target.log_wal(&WalRecord::SetEdgeProperty {
573                    id: edge.id,
574                    key: key.to_string(),
575                    value,
576                })?;
577            }
578        }
579
580        // Copy named graphs
581        for graph_name in self.store.graph_names() {
582            if let Some(src_graph) = self.store.graph(&graph_name) {
583                target.log_wal(&WalRecord::CreateNamedGraph {
584                    name: graph_name.clone(),
585                })?;
586                target
587                    .store
588                    .create_graph(&graph_name)
589                    .map_err(|e| Error::Internal(e.to_string()))?;
590
591                if let Some(dst_graph) = target.store.graph(&graph_name) {
592                    // Switch WAL context to this named graph
593                    target.log_wal(&WalRecord::SwitchGraph {
594                        name: Some(graph_name.clone()),
595                    })?;
596
597                    for node in src_graph.all_nodes() {
598                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
599                        dst_graph.create_node_with_id(node.id, &label_refs)?;
600                        target.log_wal(&WalRecord::CreateNode {
601                            id: node.id,
602                            labels: node.labels.iter().map(|s| s.to_string()).collect(),
603                        })?;
604                        for (key, value) in node.properties {
605                            dst_graph.set_node_property(node.id, key.as_str(), value.clone());
606                            target.log_wal(&WalRecord::SetNodeProperty {
607                                id: node.id,
608                                key: key.to_string(),
609                                value,
610                            })?;
611                        }
612                    }
613                    for edge in src_graph.all_edges() {
614                        dst_graph.create_edge_with_id(
615                            edge.id,
616                            edge.src,
617                            edge.dst,
618                            &edge.edge_type,
619                        )?;
620                        target.log_wal(&WalRecord::CreateEdge {
621                            id: edge.id,
622                            src: edge.src,
623                            dst: edge.dst,
624                            edge_type: edge.edge_type.to_string(),
625                        })?;
626                        for (key, value) in edge.properties {
627                            dst_graph.set_edge_property(edge.id, key.as_str(), value.clone());
628                            target.log_wal(&WalRecord::SetEdgeProperty {
629                                id: edge.id,
630                                key: key.to_string(),
631                                value,
632                            })?;
633                        }
634                    }
635                }
636            }
637        }
638
639        // Switch WAL context back to default graph
640        if !self.store.graph_names().is_empty() {
641            target.log_wal(&WalRecord::SwitchGraph { name: None })?;
642        }
643
644        // Copy RDF data with WAL logging
645        #[cfg(feature = "rdf")]
646        {
647            for triple in self.rdf_store.triples() {
648                let record = WalRecord::InsertRdfTriple {
649                    subject: triple.subject().to_string(),
650                    predicate: triple.predicate().to_string(),
651                    object: triple.object().to_string(),
652                    graph: None,
653                };
654                target.rdf_store.insert((*triple).clone());
655                target.log_wal(&record)?;
656            }
657            for name in self.rdf_store.graph_names() {
658                target.log_wal(&WalRecord::CreateRdfGraph { name: name.clone() })?;
659                if let Some(src_graph) = self.rdf_store.graph(&name) {
660                    let dst_graph = target.rdf_store.graph_or_create(&name);
661                    for triple in src_graph.triples() {
662                        let record = WalRecord::InsertRdfTriple {
663                            subject: triple.subject().to_string(),
664                            predicate: triple.predicate().to_string(),
665                            object: triple.object().to_string(),
666                            graph: Some(name.clone()),
667                        };
668                        dst_graph.insert((*triple).clone());
669                        target.log_wal(&record)?;
670                    }
671                }
672            }
673        }
674
675        // Checkpoint and close the target database
676        target.close()?;
677
678        Ok(())
679    }
680
681    /// Creates an in-memory copy of this database.
682    ///
683    /// Returns a new database that is completely independent, including
684    /// all named graph data.
685    /// Useful for:
686    /// Saves the database to a single `.grafeo` file.
687    #[cfg(feature = "grafeo-file")]
688    fn save_as_grafeo_file(&self, path: &Path) -> Result<()> {
689        use grafeo_adapters::storage::file::GrafeoFileManager;
690
691        let snapshot_data = self.export_snapshot()?;
692        let epoch = self.store.current_epoch();
693        let transaction_id = self
694            .transaction_manager
695            .last_assigned_transaction_id()
696            .map_or(0, |t| t.0);
697        let node_count = self.store.node_count() as u64;
698        let edge_count = self.store.edge_count() as u64;
699
700        let fm = GrafeoFileManager::create(path)?;
701        fm.write_snapshot(
702            &snapshot_data,
703            epoch.0,
704            transaction_id,
705            node_count,
706            edge_count,
707        )?;
708        Ok(())
709    }
710
711    /// - Testing modifications without affecting the original
712    /// - Faster operations when persistence isn't needed
713    ///
714    /// # Errors
715    ///
716    /// Returns an error if the copy operation fails.
717    pub fn to_memory(&self) -> Result<Self> {
718        let config = Config::in_memory();
719        let target = Self::with_config(config)?;
720
721        // Copy default graph nodes
722        for node in self.store.all_nodes() {
723            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
724            target.store.create_node_with_id(node.id, &label_refs)?;
725            for (key, value) in node.properties {
726                target.store.set_node_property(node.id, key.as_str(), value);
727            }
728        }
729
730        // Copy default graph edges
731        for edge in self.store.all_edges() {
732            target
733                .store
734                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type)?;
735            for (key, value) in edge.properties {
736                target.store.set_edge_property(edge.id, key.as_str(), value);
737            }
738        }
739
740        // Copy named graphs
741        for graph_name in self.store.graph_names() {
742            if let Some(src_graph) = self.store.graph(&graph_name) {
743                target
744                    .store
745                    .create_graph(&graph_name)
746                    .map_err(|e| Error::Internal(e.to_string()))?;
747                if let Some(dst_graph) = target.store.graph(&graph_name) {
748                    for node in src_graph.all_nodes() {
749                        let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
750                        dst_graph.create_node_with_id(node.id, &label_refs)?;
751                        for (key, value) in node.properties {
752                            dst_graph.set_node_property(node.id, key.as_str(), value);
753                        }
754                    }
755                    for edge in src_graph.all_edges() {
756                        dst_graph.create_edge_with_id(
757                            edge.id,
758                            edge.src,
759                            edge.dst,
760                            &edge.edge_type,
761                        )?;
762                        for (key, value) in edge.properties {
763                            dst_graph.set_edge_property(edge.id, key.as_str(), value);
764                        }
765                    }
766                }
767            }
768        }
769
770        // Copy RDF data
771        #[cfg(feature = "rdf")]
772        {
773            for triple in self.rdf_store.triples() {
774                target.rdf_store.insert((*triple).clone());
775            }
776            for name in self.rdf_store.graph_names() {
777                if let Some(src_graph) = self.rdf_store.graph(&name) {
778                    let dst_graph = target.rdf_store.graph_or_create(&name);
779                    for triple in src_graph.triples() {
780                        dst_graph.insert((*triple).clone());
781                    }
782                }
783            }
784        }
785
786        Ok(target)
787    }
788
789    /// Opens a database file and loads it entirely into memory.
790    ///
791    /// The returned database has no connection to the original file.
792    /// Changes will NOT be written back to the file.
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the file can't be opened or loaded.
797    #[cfg(feature = "wal")]
798    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
799        // Open the source database (triggers WAL recovery)
800        let source = Self::open(path)?;
801
802        // Create in-memory copy
803        let target = source.to_memory()?;
804
805        // Close the source (releases file handles)
806        source.close()?;
807
808        Ok(target)
809    }
810
811    // =========================================================================
812    // ADMIN API: Snapshot Export/Import
813    // =========================================================================
814
815    /// Exports the entire database to a binary snapshot.
816    ///
817    /// The returned bytes can be stored (e.g. in IndexedDB) and later
818    /// restored with [`import_snapshot()`](Self::import_snapshot).
819    /// Includes all named graph data.
820    ///
821    /// Properties are stored as version-history lists. When `temporal` is
822    /// enabled, the full history is captured. Otherwise, each property is
823    /// wrapped as a single-entry list at epoch 0.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if serialization fails.
828    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
829        let nodes = collect_snapshot_nodes(&self.store);
830        let edges = collect_snapshot_edges(&self.store);
831
832        // Collect named graphs
833        let named_graphs: Vec<NamedGraphSnapshot> = self
834            .store
835            .graph_names()
836            .into_iter()
837            .filter_map(|name| {
838                self.store
839                    .graph(&name)
840                    .map(|graph_store| NamedGraphSnapshot {
841                        name,
842                        nodes: collect_snapshot_nodes(&graph_store),
843                        edges: collect_snapshot_edges(&graph_store),
844                    })
845            })
846            .collect();
847
848        // Collect RDF triples
849        #[cfg(feature = "rdf")]
850        let rdf_triples = collect_rdf_triples(&self.rdf_store);
851        #[cfg(not(feature = "rdf"))]
852        let rdf_triples = Vec::new();
853
854        #[cfg(feature = "rdf")]
855        let rdf_named_graphs: Vec<RdfNamedGraphSnapshot> = self
856            .rdf_store
857            .graph_names()
858            .into_iter()
859            .filter_map(|name| {
860                self.rdf_store
861                    .graph(&name)
862                    .map(|graph| RdfNamedGraphSnapshot {
863                        name,
864                        triples: collect_rdf_triples(&graph),
865                    })
866            })
867            .collect();
868        #[cfg(not(feature = "rdf"))]
869        let rdf_named_graphs = Vec::new();
870
871        let schema = collect_schema(&self.catalog);
872        let indexes = collect_index_metadata(&self.store);
873
874        let snapshot = Snapshot {
875            version: SNAPSHOT_VERSION,
876            nodes,
877            edges,
878            named_graphs,
879            rdf_triples,
880            rdf_named_graphs,
881            schema,
882            indexes,
883            #[cfg(feature = "temporal")]
884            epoch: self.transaction_manager.current_epoch().as_u64(),
885            #[cfg(not(feature = "temporal"))]
886            epoch: 0,
887        };
888
889        let config = bincode::config::standard();
890        bincode::serde::encode_to_vec(&snapshot, config)
891            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
892    }
893
894    /// Creates a new in-memory database from a binary snapshot.
895    ///
896    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
897    ///
898    /// All edge references are validated before any data is inserted: every
899    /// edge's source and destination must reference a node present in the
900    /// snapshot, and duplicate node/edge IDs are rejected. If validation
901    /// fails, no database is created.
902    ///
903    /// # Errors
904    ///
905    /// Returns an error if the snapshot is invalid, contains dangling edge
906    /// references, has duplicate IDs, or deserialization fails.
907    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
908        if data.is_empty() {
909            return Err(Error::Internal("empty snapshot data".to_string()));
910        }
911
912        let version = data[0];
913        if version != 4 {
914            return Err(Error::Internal(format!(
915                "unsupported snapshot version: {version} (expected 4)"
916            )));
917        }
918
919        let config = bincode::config::standard();
920        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
921            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
922
923        // Validate default graph data
924        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
925
926        // Validate each named graph
927        for ng in &snapshot.named_graphs {
928            validate_snapshot_data(&ng.nodes, &ng.edges)?;
929        }
930
931        let db = Self::new_in_memory();
932        populate_store_from_snapshot(&db.store, snapshot.nodes, snapshot.edges)?;
933
934        // Restore epoch from snapshot
935        #[cfg(feature = "temporal")]
936        {
937            let epoch = EpochId::new(snapshot.epoch);
938            db.store.sync_epoch(epoch);
939            db.transaction_manager.sync_epoch(epoch);
940        }
941
942        // Capture epoch before moving snapshot fields
943        #[cfg(feature = "temporal")]
944        let snapshot_epoch = EpochId::new(snapshot.epoch);
945
946        // Restore named graphs
947        for ng in snapshot.named_graphs {
948            db.store
949                .create_graph(&ng.name)
950                .map_err(|e| Error::Internal(e.to_string()))?;
951            if let Some(graph_store) = db.store.graph(&ng.name) {
952                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
953                // Named graph stores need the same epoch so temporal property
954                // lookups via current_epoch() return the correct values.
955                #[cfg(feature = "temporal")]
956                graph_store.sync_epoch(snapshot_epoch);
957            }
958        }
959
960        // Restore RDF triples
961        #[cfg(feature = "rdf")]
962        {
963            populate_rdf_store(&db.rdf_store, &snapshot.rdf_triples);
964            for rng in &snapshot.rdf_named_graphs {
965                let graph = db.rdf_store.graph_or_create(&rng.name);
966                populate_rdf_store(&graph, &rng.triples);
967            }
968        }
969
970        // Restore schema
971        restore_schema_from_snapshot(&db.catalog, &snapshot.schema);
972
973        // Restore indexes (must come after data population)
974        restore_indexes_from_snapshot(&db, &snapshot.indexes);
975
976        Ok(db)
977    }
978
979    /// Replaces the current database contents with data from a binary snapshot.
980    ///
981    /// The `data` must have been produced by
982    /// [`export_snapshot()`](Self::export_snapshot).
983    ///
984    /// All validation (duplicate IDs, dangling edge references) is performed
985    /// before any data is modified. If validation fails, the current database
986    /// is left unchanged. If validation passes, the store is cleared and
987    /// rebuilt from the snapshot atomically (from the perspective of
988    /// subsequent queries).
989    ///
990    /// # Errors
991    ///
992    /// Returns an error if the snapshot is invalid, contains dangling edge
993    /// references, has duplicate IDs, or deserialization fails.
994    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
995        if data.is_empty() {
996            return Err(Error::Internal("empty snapshot data".to_string()));
997        }
998
999        let version = data[0];
1000        if version != 4 {
1001            return Err(Error::Internal(format!(
1002                "unsupported snapshot version: {version} (expected 4)"
1003            )));
1004        }
1005
1006        let config = bincode::config::standard();
1007        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
1008            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
1009
1010        // Validate all data before making any changes
1011        validate_snapshot_data(&snapshot.nodes, &snapshot.edges)?;
1012        for ng in &snapshot.named_graphs {
1013            validate_snapshot_data(&ng.nodes, &ng.edges)?;
1014        }
1015
1016        // Drop all existing named graphs, then clear default store
1017        for name in self.store.graph_names() {
1018            self.store.drop_graph(&name);
1019        }
1020        self.store.clear();
1021
1022        populate_store_from_snapshot(&self.store, snapshot.nodes, snapshot.edges)?;
1023
1024        // Restore epoch from temporal snapshot
1025        #[cfg(feature = "temporal")]
1026        let snapshot_epoch = {
1027            let epoch = EpochId::new(snapshot.epoch);
1028            self.store.sync_epoch(epoch);
1029            self.transaction_manager.sync_epoch(epoch);
1030            epoch
1031        };
1032
1033        // Restore named graphs
1034        for ng in snapshot.named_graphs {
1035            self.store
1036                .create_graph(&ng.name)
1037                .map_err(|e| Error::Internal(e.to_string()))?;
1038            if let Some(graph_store) = self.store.graph(&ng.name) {
1039                populate_store_from_snapshot(&graph_store, ng.nodes, ng.edges)?;
1040                #[cfg(feature = "temporal")]
1041                graph_store.sync_epoch(snapshot_epoch);
1042            }
1043        }
1044
1045        // Restore RDF data
1046        #[cfg(feature = "rdf")]
1047        {
1048            // Clear existing RDF data
1049            self.rdf_store.clear();
1050            for name in self.rdf_store.graph_names() {
1051                self.rdf_store.drop_graph(&name);
1052            }
1053            populate_rdf_store(&self.rdf_store, &snapshot.rdf_triples);
1054            for rng in &snapshot.rdf_named_graphs {
1055                let graph = self.rdf_store.graph_or_create(&rng.name);
1056                populate_rdf_store(&graph, &rng.triples);
1057            }
1058        }
1059
1060        // Restore schema
1061        restore_schema_from_snapshot(&self.catalog, &snapshot.schema);
1062
1063        // Restore indexes (must come after data population)
1064        restore_indexes_from_snapshot(self, &snapshot.indexes);
1065
1066        Ok(())
1067    }
1068
1069    // =========================================================================
1070    // ADMIN API: Iteration
1071    // =========================================================================
1072
1073    /// Returns an iterator over all nodes in the database.
1074    ///
1075    /// Useful for dump/export operations.
1076    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
1077        self.store.all_nodes()
1078    }
1079
1080    /// Returns an iterator over all edges in the database.
1081    ///
1082    /// Useful for dump/export operations.
1083    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
1084        self.store.all_edges()
1085    }
1086}
1087
1088#[cfg(test)]
1089mod tests {
1090    use grafeo_common::types::{EdgeId, NodeId, Value};
1091
1092    use super::super::GrafeoDB;
1093    use super::{
1094        SNAPSHOT_VERSION, Snapshot, SnapshotEdge, SnapshotIndexes, SnapshotNode, SnapshotSchema,
1095    };
1096
1097    #[test]
1098    fn test_restore_snapshot_basic() {
1099        let db = GrafeoDB::new_in_memory();
1100        let session = db.session();
1101
1102        // Populate
1103        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1104        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1105
1106        let snapshot = db.export_snapshot().unwrap();
1107
1108        // Modify
1109        session
1110            .execute("INSERT (:Person {name: 'Vincent'})")
1111            .unwrap();
1112        assert_eq!(db.store.node_count(), 3);
1113
1114        // Restore original
1115        db.restore_snapshot(&snapshot).unwrap();
1116
1117        assert_eq!(db.store.node_count(), 2);
1118        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1119        assert_eq!(result.rows.len(), 2);
1120    }
1121
1122    #[test]
1123    fn test_restore_snapshot_validation_failure() {
1124        let db = GrafeoDB::new_in_memory();
1125        let session = db.session();
1126
1127        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1128
1129        // Corrupt snapshot: just garbage bytes
1130        let result = db.restore_snapshot(b"garbage");
1131        assert!(result.is_err());
1132
1133        // DB should be unchanged
1134        assert_eq!(db.store.node_count(), 1);
1135    }
1136
1137    #[test]
1138    fn test_restore_snapshot_empty_db() {
1139        let db = GrafeoDB::new_in_memory();
1140
1141        // Export empty snapshot, then populate, then restore to empty
1142        let empty_snapshot = db.export_snapshot().unwrap();
1143
1144        let session = db.session();
1145        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1146        assert_eq!(db.store.node_count(), 1);
1147
1148        db.restore_snapshot(&empty_snapshot).unwrap();
1149        assert_eq!(db.store.node_count(), 0);
1150    }
1151
1152    #[test]
1153    fn test_restore_snapshot_with_edges() {
1154        let db = GrafeoDB::new_in_memory();
1155        let session = db.session();
1156
1157        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1158        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1159        session
1160            .execute(
1161                "MATCH (a:Person {name: 'Alix'}), (b:Person {name: 'Gus'}) INSERT (a)-[:KNOWS]->(b)",
1162            )
1163            .unwrap();
1164
1165        let snapshot = db.export_snapshot().unwrap();
1166        assert_eq!(db.store.edge_count(), 1);
1167
1168        // Modify: add more data
1169        session
1170            .execute("INSERT (:Person {name: 'Vincent'})")
1171            .unwrap();
1172
1173        // Restore
1174        db.restore_snapshot(&snapshot).unwrap();
1175        assert_eq!(db.store.node_count(), 2);
1176        assert_eq!(db.store.edge_count(), 1);
1177    }
1178
1179    #[test]
1180    fn test_restore_snapshot_preserves_sessions() {
1181        let db = GrafeoDB::new_in_memory();
1182        let session = db.session();
1183
1184        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1185        let snapshot = db.export_snapshot().unwrap();
1186
1187        // Modify
1188        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1189
1190        // Restore
1191        db.restore_snapshot(&snapshot).unwrap();
1192
1193        // Session should still work and see restored data
1194        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
1195        assert_eq!(result.rows.len(), 1);
1196    }
1197
1198    #[test]
1199    fn test_export_import_roundtrip() {
1200        let db = GrafeoDB::new_in_memory();
1201        let session = db.session();
1202
1203        session
1204            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1205            .unwrap();
1206
1207        let snapshot = db.export_snapshot().unwrap();
1208        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1209        let session2 = db2.session();
1210
1211        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
1212        assert_eq!(result.rows.len(), 1);
1213    }
1214
1215    // --- to_memory() ---
1216
1217    #[test]
1218    fn test_to_memory_empty() {
1219        let db = GrafeoDB::new_in_memory();
1220        let copy = db.to_memory().unwrap();
1221        assert_eq!(copy.store.node_count(), 0);
1222        assert_eq!(copy.store.edge_count(), 0);
1223    }
1224
1225    #[test]
1226    fn test_to_memory_copies_nodes_and_properties() {
1227        let db = GrafeoDB::new_in_memory();
1228        let session = db.session();
1229        session
1230            .execute("INSERT (:Person {name: 'Alix', age: 30})")
1231            .unwrap();
1232        session
1233            .execute("INSERT (:Person {name: 'Gus', age: 25})")
1234            .unwrap();
1235
1236        let copy = db.to_memory().unwrap();
1237        assert_eq!(copy.store.node_count(), 2);
1238
1239        let s2 = copy.session();
1240        let result = s2
1241            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
1242            .unwrap();
1243        assert_eq!(result.rows.len(), 2);
1244        assert_eq!(result.rows[0][0], Value::String("Alix".into()));
1245        assert_eq!(result.rows[1][0], Value::String("Gus".into()));
1246    }
1247
1248    #[test]
1249    fn test_to_memory_copies_edges_and_properties() {
1250        let db = GrafeoDB::new_in_memory();
1251        let a = db.create_node(&["Person"]);
1252        db.set_node_property(a, "name", "Alix".into());
1253        let b = db.create_node(&["Person"]);
1254        db.set_node_property(b, "name", "Gus".into());
1255        let edge = db.create_edge(a, b, "KNOWS");
1256        db.set_edge_property(edge, "since", Value::Int64(2020));
1257
1258        let copy = db.to_memory().unwrap();
1259        assert_eq!(copy.store.node_count(), 2);
1260        assert_eq!(copy.store.edge_count(), 1);
1261
1262        let s2 = copy.session();
1263        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
1264        assert_eq!(result.rows[0][0], Value::Int64(2020));
1265    }
1266
1267    #[test]
1268    fn test_to_memory_is_independent() {
1269        let db = GrafeoDB::new_in_memory();
1270        let session = db.session();
1271        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1272
1273        let copy = db.to_memory().unwrap();
1274
1275        // Mutating original should not affect copy
1276        session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
1277        assert_eq!(db.store.node_count(), 2);
1278        assert_eq!(copy.store.node_count(), 1);
1279    }
1280
1281    // --- iter_nodes() / iter_edges() ---
1282
1283    #[test]
1284    fn test_iter_nodes_empty() {
1285        let db = GrafeoDB::new_in_memory();
1286        assert_eq!(db.iter_nodes().count(), 0);
1287    }
1288
1289    #[test]
1290    fn test_iter_nodes_returns_all() {
1291        let db = GrafeoDB::new_in_memory();
1292        let id1 = db.create_node(&["Person"]);
1293        db.set_node_property(id1, "name", "Alix".into());
1294        let id2 = db.create_node(&["Animal"]);
1295        db.set_node_property(id2, "name", "Fido".into());
1296
1297        let nodes: Vec<_> = db.iter_nodes().collect();
1298        assert_eq!(nodes.len(), 2);
1299
1300        let names: Vec<_> = nodes
1301            .iter()
1302            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
1303            .map(|(_, v)| v.clone())
1304            .collect();
1305        assert!(names.contains(&Value::String("Alix".into())));
1306        assert!(names.contains(&Value::String("Fido".into())));
1307    }
1308
1309    #[test]
1310    fn test_iter_edges_empty() {
1311        let db = GrafeoDB::new_in_memory();
1312        assert_eq!(db.iter_edges().count(), 0);
1313    }
1314
1315    #[test]
1316    fn test_iter_edges_returns_all() {
1317        let db = GrafeoDB::new_in_memory();
1318        let a = db.create_node(&["A"]);
1319        let b = db.create_node(&["B"]);
1320        let c = db.create_node(&["C"]);
1321        db.create_edge(a, b, "R1");
1322        db.create_edge(b, c, "R2");
1323
1324        let edges: Vec<_> = db.iter_edges().collect();
1325        assert_eq!(edges.len(), 2);
1326
1327        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
1328        assert!(types.contains(&"R1"));
1329        assert!(types.contains(&"R2"));
1330    }
1331
1332    // --- restore_snapshot() validation ---
1333
1334    fn make_snapshot(version: u8, nodes: Vec<SnapshotNode>, edges: Vec<SnapshotEdge>) -> Vec<u8> {
1335        let snap = Snapshot {
1336            version,
1337            nodes,
1338            edges,
1339            named_graphs: vec![],
1340            rdf_triples: vec![],
1341            rdf_named_graphs: vec![],
1342            schema: SnapshotSchema::default(),
1343            indexes: SnapshotIndexes::default(),
1344            epoch: 0,
1345        };
1346        bincode::serde::encode_to_vec(&snap, bincode::config::standard()).unwrap()
1347    }
1348
1349    #[test]
1350    fn test_restore_rejects_unsupported_version() {
1351        let db = GrafeoDB::new_in_memory();
1352        let session = db.session();
1353        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1354
1355        let bytes = make_snapshot(99, vec![], vec![]);
1356
1357        let result = db.restore_snapshot(&bytes);
1358        assert!(result.is_err());
1359        let err = result.unwrap_err().to_string();
1360        assert!(err.contains("unsupported snapshot version"), "got: {err}");
1361
1362        // DB unchanged
1363        assert_eq!(db.store.node_count(), 1);
1364    }
1365
1366    #[test]
1367    fn test_restore_rejects_duplicate_node_ids() {
1368        let db = GrafeoDB::new_in_memory();
1369        let session = db.session();
1370        session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
1371
1372        let bytes = make_snapshot(
1373            SNAPSHOT_VERSION,
1374            vec![
1375                SnapshotNode {
1376                    id: NodeId::new(0),
1377                    labels: vec!["A".into()],
1378                    properties: vec![],
1379                },
1380                SnapshotNode {
1381                    id: NodeId::new(0),
1382                    labels: vec!["B".into()],
1383                    properties: vec![],
1384                },
1385            ],
1386            vec![],
1387        );
1388
1389        let result = db.restore_snapshot(&bytes);
1390        assert!(result.is_err());
1391        let err = result.unwrap_err().to_string();
1392        assert!(err.contains("duplicate node ID"), "got: {err}");
1393        assert_eq!(db.store.node_count(), 1);
1394    }
1395
1396    #[test]
1397    fn test_restore_rejects_duplicate_edge_ids() {
1398        let db = GrafeoDB::new_in_memory();
1399
1400        let bytes = make_snapshot(
1401            SNAPSHOT_VERSION,
1402            vec![
1403                SnapshotNode {
1404                    id: NodeId::new(0),
1405                    labels: vec![],
1406                    properties: vec![],
1407                },
1408                SnapshotNode {
1409                    id: NodeId::new(1),
1410                    labels: vec![],
1411                    properties: vec![],
1412                },
1413            ],
1414            vec![
1415                SnapshotEdge {
1416                    id: EdgeId::new(0),
1417                    src: NodeId::new(0),
1418                    dst: NodeId::new(1),
1419                    edge_type: "REL".into(),
1420                    properties: vec![],
1421                },
1422                SnapshotEdge {
1423                    id: EdgeId::new(0),
1424                    src: NodeId::new(0),
1425                    dst: NodeId::new(1),
1426                    edge_type: "REL".into(),
1427                    properties: vec![],
1428                },
1429            ],
1430        );
1431
1432        let result = db.restore_snapshot(&bytes);
1433        assert!(result.is_err());
1434        let err = result.unwrap_err().to_string();
1435        assert!(err.contains("duplicate edge ID"), "got: {err}");
1436    }
1437
1438    #[test]
1439    fn test_restore_rejects_dangling_source() {
1440        let db = GrafeoDB::new_in_memory();
1441
1442        let bytes = make_snapshot(
1443            SNAPSHOT_VERSION,
1444            vec![SnapshotNode {
1445                id: NodeId::new(0),
1446                labels: vec![],
1447                properties: vec![],
1448            }],
1449            vec![SnapshotEdge {
1450                id: EdgeId::new(0),
1451                src: NodeId::new(999),
1452                dst: NodeId::new(0),
1453                edge_type: "REL".into(),
1454                properties: vec![],
1455            }],
1456        );
1457
1458        let result = db.restore_snapshot(&bytes);
1459        assert!(result.is_err());
1460        let err = result.unwrap_err().to_string();
1461        assert!(err.contains("non-existent source node"), "got: {err}");
1462    }
1463
1464    #[test]
1465    fn test_restore_rejects_dangling_destination() {
1466        let db = GrafeoDB::new_in_memory();
1467
1468        let bytes = make_snapshot(
1469            SNAPSHOT_VERSION,
1470            vec![SnapshotNode {
1471                id: NodeId::new(0),
1472                labels: vec![],
1473                properties: vec![],
1474            }],
1475            vec![SnapshotEdge {
1476                id: EdgeId::new(0),
1477                src: NodeId::new(0),
1478                dst: NodeId::new(999),
1479                edge_type: "REL".into(),
1480                properties: vec![],
1481            }],
1482        );
1483
1484        let result = db.restore_snapshot(&bytes);
1485        assert!(result.is_err());
1486        let err = result.unwrap_err().to_string();
1487        assert!(err.contains("non-existent destination node"), "got: {err}");
1488    }
1489
1490    // --- index metadata roundtrip ---
1491
1492    #[test]
1493    fn test_snapshot_roundtrip_property_index() {
1494        let db = GrafeoDB::new_in_memory();
1495        let session = db.session();
1496
1497        session
1498            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1499            .unwrap();
1500        db.create_property_index("email");
1501        assert!(db.has_property_index("email"));
1502
1503        let snapshot = db.export_snapshot().unwrap();
1504        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1505
1506        assert!(db2.has_property_index("email"));
1507
1508        // Verify the index actually works for O(1) lookups
1509        let found = db2.find_nodes_by_property("email", &Value::String("alix@example.com".into()));
1510        assert_eq!(found.len(), 1);
1511    }
1512
1513    #[cfg(feature = "vector-index")]
1514    #[test]
1515    fn test_snapshot_roundtrip_vector_index() {
1516        use std::sync::Arc;
1517
1518        let db = GrafeoDB::new_in_memory();
1519
1520        let n1 = db.create_node(&["Doc"]);
1521        db.set_node_property(
1522            n1,
1523            "embedding",
1524            Value::Vector(Arc::from([1.0_f32, 0.0, 0.0])),
1525        );
1526        let n2 = db.create_node(&["Doc"]);
1527        db.set_node_property(
1528            n2,
1529            "embedding",
1530            Value::Vector(Arc::from([0.0_f32, 1.0, 0.0])),
1531        );
1532
1533        db.create_vector_index("Doc", "embedding", None, Some("cosine"), Some(4), Some(32))
1534            .unwrap();
1535
1536        let snapshot = db.export_snapshot().unwrap();
1537        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1538
1539        // Vector search should work on the restored database
1540        let results = db2
1541            .vector_search("Doc", "embedding", &[1.0, 0.0, 0.0], 2, None, None)
1542            .unwrap();
1543        assert_eq!(results.len(), 2);
1544        // Closest to [1,0,0] should be n1
1545        assert_eq!(results[0].0, n1);
1546    }
1547
1548    #[cfg(feature = "text-index")]
1549    #[test]
1550    fn test_snapshot_roundtrip_text_index() {
1551        let db = GrafeoDB::new_in_memory();
1552
1553        let n1 = db.create_node(&["Article"]);
1554        db.set_node_property(n1, "body", Value::String("rust graph database".into()));
1555        let n2 = db.create_node(&["Article"]);
1556        db.set_node_property(n2, "body", Value::String("python web framework".into()));
1557
1558        db.create_text_index("Article", "body").unwrap();
1559
1560        let snapshot = db.export_snapshot().unwrap();
1561        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
1562
1563        // Text search should work on the restored database
1564        let results = db2
1565            .text_search("Article", "body", "graph database", 10)
1566            .unwrap();
1567        assert_eq!(results.len(), 1);
1568        assert_eq!(results[0].0, n1);
1569    }
1570
1571    #[test]
1572    fn test_snapshot_roundtrip_property_index_via_restore() {
1573        let db = GrafeoDB::new_in_memory();
1574        let session = db.session();
1575
1576        session
1577            .execute("INSERT (:Person {name: 'Alix', email: 'alix@example.com'})")
1578            .unwrap();
1579        db.create_property_index("email");
1580
1581        let snapshot = db.export_snapshot().unwrap();
1582
1583        // Mutate the database
1584        session
1585            .execute("INSERT (:Person {name: 'Gus', email: 'gus@example.com'})")
1586            .unwrap();
1587        db.drop_property_index("email");
1588        assert!(!db.has_property_index("email"));
1589
1590        // Restore should bring back the index
1591        db.restore_snapshot(&snapshot).unwrap();
1592        assert!(db.has_property_index("email"));
1593    }
1594}