Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3use std::path::Path;
4
5use grafeo_common::types::{EdgeId, NodeId, Value};
6use grafeo_common::utils::error::{Error, Result};
7use hashbrown::HashSet;
8
9use crate::config::Config;
10
11#[cfg(feature = "wal")]
12use grafeo_adapters::storage::wal::WalRecord;
13
14/// Binary snapshot format for database export/import.
15#[derive(serde::Serialize, serde::Deserialize)]
16struct Snapshot {
17    version: u8,
18    nodes: Vec<SnapshotNode>,
19    edges: Vec<SnapshotEdge>,
20}
21
22#[derive(serde::Serialize, serde::Deserialize)]
23struct SnapshotNode {
24    id: NodeId,
25    labels: Vec<String>,
26    properties: Vec<(String, Value)>,
27}
28
29#[derive(serde::Serialize, serde::Deserialize)]
30struct SnapshotEdge {
31    id: EdgeId,
32    src: NodeId,
33    dst: NodeId,
34    edge_type: String,
35    properties: Vec<(String, Value)>,
36}
37
38impl super::GrafeoDB {
39    // =========================================================================
40    // ADMIN API: Persistence Control
41    // =========================================================================
42
43    /// Saves the database to a file path.
44    ///
45    /// - If in-memory: creates a new persistent database at path
46    /// - If file-backed: creates a copy at the new path
47    ///
48    /// The original database remains unchanged.
49    ///
50    /// # Errors
51    ///
52    /// Returns an error if the save operation fails.
53    ///
54    /// Requires the `wal` feature for persistence support.
55    #[cfg(feature = "wal")]
56    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
57        let path = path.as_ref();
58
59        // Create target database with WAL enabled
60        let target_config = Config::persistent(path);
61        let target = Self::with_config(target_config)?;
62
63        // Copy all nodes using WAL-enabled methods
64        for node in self.store.all_nodes() {
65            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
66            target.store.create_node_with_id(node.id, &label_refs);
67
68            // Log to WAL
69            target.log_wal(&WalRecord::CreateNode {
70                id: node.id,
71                labels: node.labels.iter().map(|s| s.to_string()).collect(),
72            })?;
73
74            // Copy properties
75            for (key, value) in node.properties {
76                target
77                    .store
78                    .set_node_property(node.id, key.as_str(), value.clone());
79                target.log_wal(&WalRecord::SetNodeProperty {
80                    id: node.id,
81                    key: key.to_string(),
82                    value,
83                })?;
84            }
85        }
86
87        // Copy all edges using WAL-enabled methods
88        for edge in self.store.all_edges() {
89            target
90                .store
91                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
92
93            // Log to WAL
94            target.log_wal(&WalRecord::CreateEdge {
95                id: edge.id,
96                src: edge.src,
97                dst: edge.dst,
98                edge_type: edge.edge_type.to_string(),
99            })?;
100
101            // Copy properties
102            for (key, value) in edge.properties {
103                target
104                    .store
105                    .set_edge_property(edge.id, key.as_str(), value.clone());
106                target.log_wal(&WalRecord::SetEdgeProperty {
107                    id: edge.id,
108                    key: key.to_string(),
109                    value,
110                })?;
111            }
112        }
113
114        // Checkpoint and close the target database
115        target.close()?;
116
117        Ok(())
118    }
119
120    /// Creates an in-memory copy of this database.
121    ///
122    /// Returns a new database that is completely independent.
123    /// Useful for:
124    /// - Testing modifications without affecting the original
125    /// - Faster operations when persistence isn't needed
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the copy operation fails.
130    pub fn to_memory(&self) -> Result<Self> {
131        let config = Config::in_memory();
132        let target = Self::with_config(config)?;
133
134        // Copy all nodes
135        for node in self.store.all_nodes() {
136            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
137            target.store.create_node_with_id(node.id, &label_refs);
138
139            // Copy properties
140            for (key, value) in node.properties {
141                target.store.set_node_property(node.id, key.as_str(), value);
142            }
143        }
144
145        // Copy all edges
146        for edge in self.store.all_edges() {
147            target
148                .store
149                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
150
151            // Copy properties
152            for (key, value) in edge.properties {
153                target.store.set_edge_property(edge.id, key.as_str(), value);
154            }
155        }
156
157        Ok(target)
158    }
159
160    /// Opens a database file and loads it entirely into memory.
161    ///
162    /// The returned database has no connection to the original file.
163    /// Changes will NOT be written back to the file.
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the file can't be opened or loaded.
168    #[cfg(feature = "wal")]
169    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
170        // Open the source database (triggers WAL recovery)
171        let source = Self::open(path)?;
172
173        // Create in-memory copy
174        let target = source.to_memory()?;
175
176        // Close the source (releases file handles)
177        source.close()?;
178
179        Ok(target)
180    }
181
182    // =========================================================================
183    // ADMIN API: Snapshot Export/Import
184    // =========================================================================
185
186    /// Exports the entire database to a binary snapshot.
187    ///
188    /// The returned bytes can be stored (e.g. in IndexedDB) and later
189    /// restored with [`import_snapshot()`](Self::import_snapshot).
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if serialization fails.
194    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
195        let nodes: Vec<SnapshotNode> = self
196            .store
197            .all_nodes()
198            .map(|n| SnapshotNode {
199                id: n.id,
200                labels: n.labels.iter().map(|l| l.to_string()).collect(),
201                properties: n
202                    .properties
203                    .into_iter()
204                    .map(|(k, v)| (k.to_string(), v))
205                    .collect(),
206            })
207            .collect();
208
209        let edges: Vec<SnapshotEdge> = self
210            .store
211            .all_edges()
212            .map(|e| SnapshotEdge {
213                id: e.id,
214                src: e.src,
215                dst: e.dst,
216                edge_type: e.edge_type.to_string(),
217                properties: e
218                    .properties
219                    .into_iter()
220                    .map(|(k, v)| (k.to_string(), v))
221                    .collect(),
222            })
223            .collect();
224
225        let snapshot = Snapshot {
226            version: 1,
227            nodes,
228            edges,
229        };
230
231        let config = bincode::config::standard();
232        bincode::serde::encode_to_vec(&snapshot, config)
233            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
234    }
235
236    /// Creates a new in-memory database from a binary snapshot.
237    ///
238    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
239    ///
240    /// All edge references are validated before any data is inserted: every
241    /// edge's source and destination must reference a node present in the
242    /// snapshot, and duplicate node/edge IDs are rejected. If validation
243    /// fails, no database is created.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the snapshot is invalid, contains dangling edge
248    /// references, has duplicate IDs, or deserialization fails.
249    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
250        let config = bincode::config::standard();
251        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
252            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
253
254        if snapshot.version != 1 {
255            return Err(Error::Internal(format!(
256                "unsupported snapshot version: {}",
257                snapshot.version
258            )));
259        }
260
261        // Pre-validate: collect all node IDs and check for duplicates
262        let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
263        for node in &snapshot.nodes {
264            if !node_ids.insert(node.id) {
265                return Err(Error::Internal(format!(
266                    "snapshot contains duplicate node ID {}",
267                    node.id
268                )));
269            }
270        }
271
272        // Validate edge references and check for duplicate edge IDs
273        let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
274        for edge in &snapshot.edges {
275            if !edge_ids.insert(edge.id) {
276                return Err(Error::Internal(format!(
277                    "snapshot contains duplicate edge ID {}",
278                    edge.id
279                )));
280            }
281            if !node_ids.contains(&edge.src) {
282                return Err(Error::Internal(format!(
283                    "snapshot edge {} references non-existent source node {}",
284                    edge.id, edge.src
285                )));
286            }
287            if !node_ids.contains(&edge.dst) {
288                return Err(Error::Internal(format!(
289                    "snapshot edge {} references non-existent destination node {}",
290                    edge.id, edge.dst
291                )));
292            }
293        }
294
295        // Validation passed — build the database
296        let db = Self::new_in_memory();
297
298        for node in snapshot.nodes {
299            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
300            db.store.create_node_with_id(node.id, &label_refs);
301            for (key, value) in node.properties {
302                db.store.set_node_property(node.id, &key, value);
303            }
304        }
305
306        for edge in snapshot.edges {
307            db.store
308                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
309            for (key, value) in edge.properties {
310                db.store.set_edge_property(edge.id, &key, value);
311            }
312        }
313
314        Ok(db)
315    }
316
317    /// Replaces the current database contents with data from a binary snapshot.
318    ///
319    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
320    ///
321    /// All validation (duplicate IDs, dangling edge references) is performed
322    /// before any data is modified. If validation fails, the current database
323    /// is left unchanged. If validation passes, the store is cleared and
324    /// rebuilt from the snapshot atomically (from the perspective of
325    /// subsequent queries).
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if the snapshot is invalid, contains dangling edge
330    /// references, has duplicate IDs, or deserialization fails.
331    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
332        let config = bincode::config::standard();
333        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
334            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
335
336        if snapshot.version != 1 {
337            return Err(Error::Internal(format!(
338                "unsupported snapshot version: {}",
339                snapshot.version
340            )));
341        }
342
343        // Pre-validate: collect all node IDs and check for duplicates
344        let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
345        for node in &snapshot.nodes {
346            if !node_ids.insert(node.id) {
347                return Err(Error::Internal(format!(
348                    "snapshot contains duplicate node ID {}",
349                    node.id
350                )));
351            }
352        }
353
354        // Validate edge references and check for duplicate edge IDs
355        let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
356        for edge in &snapshot.edges {
357            if !edge_ids.insert(edge.id) {
358                return Err(Error::Internal(format!(
359                    "snapshot contains duplicate edge ID {}",
360                    edge.id
361                )));
362            }
363            if !node_ids.contains(&edge.src) {
364                return Err(Error::Internal(format!(
365                    "snapshot edge {} references non-existent source node {}",
366                    edge.id, edge.src
367                )));
368            }
369            if !node_ids.contains(&edge.dst) {
370                return Err(Error::Internal(format!(
371                    "snapshot edge {} references non-existent destination node {}",
372                    edge.id, edge.dst
373                )));
374            }
375        }
376
377        // Validation passed: clear and rebuild
378        self.store.clear();
379
380        for node in snapshot.nodes {
381            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
382            self.store.create_node_with_id(node.id, &label_refs);
383            for (key, value) in node.properties {
384                self.store.set_node_property(node.id, &key, value);
385            }
386        }
387
388        for edge in snapshot.edges {
389            self.store
390                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
391            for (key, value) in edge.properties {
392                self.store.set_edge_property(edge.id, &key, value);
393            }
394        }
395
396        Ok(())
397    }
398
399    // =========================================================================
400    // ADMIN API: Iteration
401    // =========================================================================
402
403    /// Returns an iterator over all nodes in the database.
404    ///
405    /// Useful for dump/export operations.
406    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
407        self.store.all_nodes()
408    }
409
410    /// Returns an iterator over all edges in the database.
411    ///
412    /// Useful for dump/export operations.
413    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
414        self.store.all_edges()
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use grafeo_common::types::{EdgeId, NodeId, Value};
421
422    use super::super::GrafeoDB;
423    use super::{Snapshot, SnapshotEdge, SnapshotNode};
424
425    #[test]
426    fn test_restore_snapshot_basic() {
427        let db = GrafeoDB::new_in_memory();
428        let session = db.session();
429
430        // Populate
431        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
432        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
433
434        let snapshot = db.export_snapshot().unwrap();
435
436        // Modify
437        session
438            .execute("INSERT (:Person {name: 'Charlie'})")
439            .unwrap();
440        assert_eq!(db.store.node_count(), 3);
441
442        // Restore original
443        db.restore_snapshot(&snapshot).unwrap();
444
445        assert_eq!(db.store.node_count(), 2);
446        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
447        assert_eq!(result.rows.len(), 2);
448    }
449
450    #[test]
451    fn test_restore_snapshot_validation_failure() {
452        let db = GrafeoDB::new_in_memory();
453        let session = db.session();
454
455        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
456
457        // Corrupt snapshot: just garbage bytes
458        let result = db.restore_snapshot(b"garbage");
459        assert!(result.is_err());
460
461        // DB should be unchanged
462        assert_eq!(db.store.node_count(), 1);
463    }
464
465    #[test]
466    fn test_restore_snapshot_empty_db() {
467        let db = GrafeoDB::new_in_memory();
468
469        // Export empty snapshot, then populate, then restore to empty
470        let empty_snapshot = db.export_snapshot().unwrap();
471
472        let session = db.session();
473        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
474        assert_eq!(db.store.node_count(), 1);
475
476        db.restore_snapshot(&empty_snapshot).unwrap();
477        assert_eq!(db.store.node_count(), 0);
478    }
479
480    #[test]
481    fn test_restore_snapshot_with_edges() {
482        let db = GrafeoDB::new_in_memory();
483        let session = db.session();
484
485        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
486        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
487        session
488            .execute(
489                "MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) INSERT (a)-[:KNOWS]->(b)",
490            )
491            .unwrap();
492
493        let snapshot = db.export_snapshot().unwrap();
494        assert_eq!(db.store.edge_count(), 1);
495
496        // Modify: add more data
497        session
498            .execute("INSERT (:Person {name: 'Charlie'})")
499            .unwrap();
500
501        // Restore
502        db.restore_snapshot(&snapshot).unwrap();
503        assert_eq!(db.store.node_count(), 2);
504        assert_eq!(db.store.edge_count(), 1);
505    }
506
507    #[test]
508    fn test_restore_snapshot_preserves_sessions() {
509        let db = GrafeoDB::new_in_memory();
510        let session = db.session();
511
512        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
513        let snapshot = db.export_snapshot().unwrap();
514
515        // Modify
516        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
517
518        // Restore
519        db.restore_snapshot(&snapshot).unwrap();
520
521        // Session should still work and see restored data
522        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
523        assert_eq!(result.rows.len(), 1);
524    }
525
526    #[test]
527    fn test_export_import_roundtrip() {
528        let db = GrafeoDB::new_in_memory();
529        let session = db.session();
530
531        session
532            .execute("INSERT (:Person {name: 'Alice', age: 30})")
533            .unwrap();
534
535        let snapshot = db.export_snapshot().unwrap();
536        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
537        let session2 = db2.session();
538
539        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
540        assert_eq!(result.rows.len(), 1);
541    }
542
543    // --- to_memory() ---
544
545    #[test]
546    fn test_to_memory_empty() {
547        let db = GrafeoDB::new_in_memory();
548        let copy = db.to_memory().unwrap();
549        assert_eq!(copy.store.node_count(), 0);
550        assert_eq!(copy.store.edge_count(), 0);
551    }
552
553    #[test]
554    fn test_to_memory_copies_nodes_and_properties() {
555        let db = GrafeoDB::new_in_memory();
556        let session = db.session();
557        session
558            .execute("INSERT (:Person {name: 'Alice', age: 30})")
559            .unwrap();
560        session
561            .execute("INSERT (:Person {name: 'Bob', age: 25})")
562            .unwrap();
563
564        let copy = db.to_memory().unwrap();
565        assert_eq!(copy.store.node_count(), 2);
566
567        let s2 = copy.session();
568        let result = s2
569            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
570            .unwrap();
571        assert_eq!(result.rows.len(), 2);
572        assert_eq!(result.rows[0][0], Value::String("Alice".into()));
573        assert_eq!(result.rows[1][0], Value::String("Bob".into()));
574    }
575
576    #[test]
577    fn test_to_memory_copies_edges_and_properties() {
578        let db = GrafeoDB::new_in_memory();
579        let a = db.create_node(&["Person"]);
580        db.set_node_property(a, "name", "Alice".into());
581        let b = db.create_node(&["Person"]);
582        db.set_node_property(b, "name", "Bob".into());
583        let edge = db.create_edge(a, b, "KNOWS");
584        db.set_edge_property(edge, "since", Value::Int64(2020));
585
586        let copy = db.to_memory().unwrap();
587        assert_eq!(copy.store.node_count(), 2);
588        assert_eq!(copy.store.edge_count(), 1);
589
590        let s2 = copy.session();
591        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
592        assert_eq!(result.rows[0][0], Value::Int64(2020));
593    }
594
595    #[test]
596    fn test_to_memory_is_independent() {
597        let db = GrafeoDB::new_in_memory();
598        let session = db.session();
599        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
600
601        let copy = db.to_memory().unwrap();
602
603        // Mutating original should not affect copy
604        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
605        assert_eq!(db.store.node_count(), 2);
606        assert_eq!(copy.store.node_count(), 1);
607    }
608
609    // --- iter_nodes() / iter_edges() ---
610
611    #[test]
612    fn test_iter_nodes_empty() {
613        let db = GrafeoDB::new_in_memory();
614        assert_eq!(db.iter_nodes().count(), 0);
615    }
616
617    #[test]
618    fn test_iter_nodes_returns_all() {
619        let db = GrafeoDB::new_in_memory();
620        let id1 = db.create_node(&["Person"]);
621        db.set_node_property(id1, "name", "Alice".into());
622        let id2 = db.create_node(&["Animal"]);
623        db.set_node_property(id2, "name", "Fido".into());
624
625        let nodes: Vec<_> = db.iter_nodes().collect();
626        assert_eq!(nodes.len(), 2);
627
628        let names: Vec<_> = nodes
629            .iter()
630            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
631            .map(|(_, v)| v.clone())
632            .collect();
633        assert!(names.contains(&Value::String("Alice".into())));
634        assert!(names.contains(&Value::String("Fido".into())));
635    }
636
637    #[test]
638    fn test_iter_edges_empty() {
639        let db = GrafeoDB::new_in_memory();
640        assert_eq!(db.iter_edges().count(), 0);
641    }
642
643    #[test]
644    fn test_iter_edges_returns_all() {
645        let db = GrafeoDB::new_in_memory();
646        let a = db.create_node(&["A"]);
647        let b = db.create_node(&["B"]);
648        let c = db.create_node(&["C"]);
649        db.create_edge(a, b, "R1");
650        db.create_edge(b, c, "R2");
651
652        let edges: Vec<_> = db.iter_edges().collect();
653        assert_eq!(edges.len(), 2);
654
655        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
656        assert!(types.contains(&"R1"));
657        assert!(types.contains(&"R2"));
658    }
659
660    // --- restore_snapshot() validation ---
661
662    fn encode_snapshot(snap: &Snapshot) -> Vec<u8> {
663        bincode::serde::encode_to_vec(snap, bincode::config::standard()).unwrap()
664    }
665
666    #[test]
667    fn test_restore_rejects_unsupported_version() {
668        let db = GrafeoDB::new_in_memory();
669        let session = db.session();
670        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
671
672        let snap = Snapshot {
673            version: 99,
674            nodes: vec![],
675            edges: vec![],
676        };
677        let bytes = encode_snapshot(&snap);
678
679        let result = db.restore_snapshot(&bytes);
680        assert!(result.is_err());
681        let err = result.unwrap_err().to_string();
682        assert!(err.contains("unsupported snapshot version"), "got: {err}");
683
684        // DB unchanged
685        assert_eq!(db.store.node_count(), 1);
686    }
687
688    #[test]
689    fn test_restore_rejects_duplicate_node_ids() {
690        let db = GrafeoDB::new_in_memory();
691        let session = db.session();
692        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
693
694        let snap = Snapshot {
695            version: 1,
696            nodes: vec![
697                SnapshotNode {
698                    id: NodeId::new(0),
699                    labels: vec!["A".into()],
700                    properties: vec![],
701                },
702                SnapshotNode {
703                    id: NodeId::new(0),
704                    labels: vec!["B".into()],
705                    properties: vec![],
706                },
707            ],
708            edges: vec![],
709        };
710        let bytes = encode_snapshot(&snap);
711
712        let result = db.restore_snapshot(&bytes);
713        assert!(result.is_err());
714        let err = result.unwrap_err().to_string();
715        assert!(err.contains("duplicate node ID"), "got: {err}");
716        assert_eq!(db.store.node_count(), 1);
717    }
718
719    #[test]
720    fn test_restore_rejects_duplicate_edge_ids() {
721        let db = GrafeoDB::new_in_memory();
722
723        let snap = Snapshot {
724            version: 1,
725            nodes: vec![
726                SnapshotNode {
727                    id: NodeId::new(0),
728                    labels: vec![],
729                    properties: vec![],
730                },
731                SnapshotNode {
732                    id: NodeId::new(1),
733                    labels: vec![],
734                    properties: vec![],
735                },
736            ],
737            edges: vec![
738                SnapshotEdge {
739                    id: EdgeId::new(0),
740                    src: NodeId::new(0),
741                    dst: NodeId::new(1),
742                    edge_type: "REL".into(),
743                    properties: vec![],
744                },
745                SnapshotEdge {
746                    id: EdgeId::new(0),
747                    src: NodeId::new(0),
748                    dst: NodeId::new(1),
749                    edge_type: "REL".into(),
750                    properties: vec![],
751                },
752            ],
753        };
754        let bytes = encode_snapshot(&snap);
755
756        let result = db.restore_snapshot(&bytes);
757        assert!(result.is_err());
758        let err = result.unwrap_err().to_string();
759        assert!(err.contains("duplicate edge ID"), "got: {err}");
760    }
761
762    #[test]
763    fn test_restore_rejects_dangling_source() {
764        let db = GrafeoDB::new_in_memory();
765
766        let snap = Snapshot {
767            version: 1,
768            nodes: vec![SnapshotNode {
769                id: NodeId::new(0),
770                labels: vec![],
771                properties: vec![],
772            }],
773            edges: vec![SnapshotEdge {
774                id: EdgeId::new(0),
775                src: NodeId::new(999),
776                dst: NodeId::new(0),
777                edge_type: "REL".into(),
778                properties: vec![],
779            }],
780        };
781        let bytes = encode_snapshot(&snap);
782
783        let result = db.restore_snapshot(&bytes);
784        assert!(result.is_err());
785        let err = result.unwrap_err().to_string();
786        assert!(err.contains("non-existent source node"), "got: {err}");
787    }
788
789    #[test]
790    fn test_restore_rejects_dangling_destination() {
791        let db = GrafeoDB::new_in_memory();
792
793        let snap = Snapshot {
794            version: 1,
795            nodes: vec![SnapshotNode {
796                id: NodeId::new(0),
797                labels: vec![],
798                properties: vec![],
799            }],
800            edges: vec![SnapshotEdge {
801                id: EdgeId::new(0),
802                src: NodeId::new(0),
803                dst: NodeId::new(999),
804                edge_type: "REL".into(),
805                properties: vec![],
806            }],
807        };
808        let bytes = encode_snapshot(&snap);
809
810        let result = db.restore_snapshot(&bytes);
811        assert!(result.is_err());
812        let err = result.unwrap_err().to_string();
813        assert!(err.contains("non-existent destination node"), "got: {err}");
814    }
815}