Skip to main content

grafeo_engine/database/
persistence.rs

1//! Persistence, snapshots, and data export for GrafeoDB.
2
3#[cfg(feature = "wal")]
4use std::path::Path;
5
6use grafeo_common::types::{EdgeId, NodeId, Value};
7use grafeo_common::utils::error::{Error, Result};
8use hashbrown::HashSet;
9
10use crate::config::Config;
11
12#[cfg(feature = "wal")]
13use grafeo_adapters::storage::wal::WalRecord;
14
15/// Binary snapshot format for database export/import.
16#[derive(serde::Serialize, serde::Deserialize)]
17struct Snapshot {
18    version: u8,
19    nodes: Vec<SnapshotNode>,
20    edges: Vec<SnapshotEdge>,
21}
22
23#[derive(serde::Serialize, serde::Deserialize)]
24struct SnapshotNode {
25    id: NodeId,
26    labels: Vec<String>,
27    properties: Vec<(String, Value)>,
28}
29
30#[derive(serde::Serialize, serde::Deserialize)]
31struct SnapshotEdge {
32    id: EdgeId,
33    src: NodeId,
34    dst: NodeId,
35    edge_type: String,
36    properties: Vec<(String, Value)>,
37}
38
39impl super::GrafeoDB {
40    // =========================================================================
41    // ADMIN API: Persistence Control
42    // =========================================================================
43
44    /// Saves the database to a file path.
45    ///
46    /// - If in-memory: creates a new persistent database at path
47    /// - If file-backed: creates a copy at the new path
48    ///
49    /// The original database remains unchanged.
50    ///
51    /// # Errors
52    ///
53    /// Returns an error if the save operation fails.
54    ///
55    /// Requires the `wal` feature for persistence support.
56    #[cfg(feature = "wal")]
57    pub fn save(&self, path: impl AsRef<Path>) -> Result<()> {
58        let path = path.as_ref();
59
60        // Create target database with WAL enabled
61        let target_config = Config::persistent(path);
62        let target = Self::with_config(target_config)?;
63
64        // Copy all nodes using WAL-enabled methods
65        for node in self.store.all_nodes() {
66            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
67            target.store.create_node_with_id(node.id, &label_refs);
68
69            // Log to WAL
70            target.log_wal(&WalRecord::CreateNode {
71                id: node.id,
72                labels: node.labels.iter().map(|s| s.to_string()).collect(),
73            })?;
74
75            // Copy properties
76            for (key, value) in node.properties {
77                target
78                    .store
79                    .set_node_property(node.id, key.as_str(), value.clone());
80                target.log_wal(&WalRecord::SetNodeProperty {
81                    id: node.id,
82                    key: key.to_string(),
83                    value,
84                })?;
85            }
86        }
87
88        // Copy all edges using WAL-enabled methods
89        for edge in self.store.all_edges() {
90            target
91                .store
92                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
93
94            // Log to WAL
95            target.log_wal(&WalRecord::CreateEdge {
96                id: edge.id,
97                src: edge.src,
98                dst: edge.dst,
99                edge_type: edge.edge_type.to_string(),
100            })?;
101
102            // Copy properties
103            for (key, value) in edge.properties {
104                target
105                    .store
106                    .set_edge_property(edge.id, key.as_str(), value.clone());
107                target.log_wal(&WalRecord::SetEdgeProperty {
108                    id: edge.id,
109                    key: key.to_string(),
110                    value,
111                })?;
112            }
113        }
114
115        // Checkpoint and close the target database
116        target.close()?;
117
118        Ok(())
119    }
120
121    /// Creates an in-memory copy of this database.
122    ///
123    /// Returns a new database that is completely independent.
124    /// Useful for:
125    /// - Testing modifications without affecting the original
126    /// - Faster operations when persistence isn't needed
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the copy operation fails.
131    pub fn to_memory(&self) -> Result<Self> {
132        let config = Config::in_memory();
133        let target = Self::with_config(config)?;
134
135        // Copy all nodes
136        for node in self.store.all_nodes() {
137            let label_refs: Vec<&str> = node.labels.iter().map(|s| &**s).collect();
138            target.store.create_node_with_id(node.id, &label_refs);
139
140            // Copy properties
141            for (key, value) in node.properties {
142                target.store.set_node_property(node.id, key.as_str(), value);
143            }
144        }
145
146        // Copy all edges
147        for edge in self.store.all_edges() {
148            target
149                .store
150                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
151
152            // Copy properties
153            for (key, value) in edge.properties {
154                target.store.set_edge_property(edge.id, key.as_str(), value);
155            }
156        }
157
158        Ok(target)
159    }
160
161    /// Opens a database file and loads it entirely into memory.
162    ///
163    /// The returned database has no connection to the original file.
164    /// Changes will NOT be written back to the file.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the file can't be opened or loaded.
169    #[cfg(feature = "wal")]
170    pub fn open_in_memory(path: impl AsRef<Path>) -> Result<Self> {
171        // Open the source database (triggers WAL recovery)
172        let source = Self::open(path)?;
173
174        // Create in-memory copy
175        let target = source.to_memory()?;
176
177        // Close the source (releases file handles)
178        source.close()?;
179
180        Ok(target)
181    }
182
183    // =========================================================================
184    // ADMIN API: Snapshot Export/Import
185    // =========================================================================
186
187    /// Exports the entire database to a binary snapshot.
188    ///
189    /// The returned bytes can be stored (e.g. in IndexedDB) and later
190    /// restored with [`import_snapshot()`](Self::import_snapshot).
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if serialization fails.
195    pub fn export_snapshot(&self) -> Result<Vec<u8>> {
196        let nodes: Vec<SnapshotNode> = self
197            .store
198            .all_nodes()
199            .map(|n| SnapshotNode {
200                id: n.id,
201                labels: n.labels.iter().map(|l| l.to_string()).collect(),
202                properties: n
203                    .properties
204                    .into_iter()
205                    .map(|(k, v)| (k.to_string(), v))
206                    .collect(),
207            })
208            .collect();
209
210        let edges: Vec<SnapshotEdge> = self
211            .store
212            .all_edges()
213            .map(|e| SnapshotEdge {
214                id: e.id,
215                src: e.src,
216                dst: e.dst,
217                edge_type: e.edge_type.to_string(),
218                properties: e
219                    .properties
220                    .into_iter()
221                    .map(|(k, v)| (k.to_string(), v))
222                    .collect(),
223            })
224            .collect();
225
226        let snapshot = Snapshot {
227            version: 1,
228            nodes,
229            edges,
230        };
231
232        let config = bincode::config::standard();
233        bincode::serde::encode_to_vec(&snapshot, config)
234            .map_err(|e| Error::Internal(format!("snapshot export failed: {e}")))
235    }
236
237    /// Creates a new in-memory database from a binary snapshot.
238    ///
239    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
240    ///
241    /// All edge references are validated before any data is inserted: every
242    /// edge's source and destination must reference a node present in the
243    /// snapshot, and duplicate node/edge IDs are rejected. If validation
244    /// fails, no database is created.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the snapshot is invalid, contains dangling edge
249    /// references, has duplicate IDs, or deserialization fails.
250    pub fn import_snapshot(data: &[u8]) -> Result<Self> {
251        let config = bincode::config::standard();
252        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
253            .map_err(|e| Error::Internal(format!("snapshot import failed: {e}")))?;
254
255        if snapshot.version != 1 {
256            return Err(Error::Internal(format!(
257                "unsupported snapshot version: {}",
258                snapshot.version
259            )));
260        }
261
262        // Pre-validate: collect all node IDs and check for duplicates
263        let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
264        for node in &snapshot.nodes {
265            if !node_ids.insert(node.id) {
266                return Err(Error::Internal(format!(
267                    "snapshot contains duplicate node ID {}",
268                    node.id
269                )));
270            }
271        }
272
273        // Validate edge references and check for duplicate edge IDs
274        let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
275        for edge in &snapshot.edges {
276            if !edge_ids.insert(edge.id) {
277                return Err(Error::Internal(format!(
278                    "snapshot contains duplicate edge ID {}",
279                    edge.id
280                )));
281            }
282            if !node_ids.contains(&edge.src) {
283                return Err(Error::Internal(format!(
284                    "snapshot edge {} references non-existent source node {}",
285                    edge.id, edge.src
286                )));
287            }
288            if !node_ids.contains(&edge.dst) {
289                return Err(Error::Internal(format!(
290                    "snapshot edge {} references non-existent destination node {}",
291                    edge.id, edge.dst
292                )));
293            }
294        }
295
296        // Validation passed — build the database
297        let db = Self::new_in_memory();
298
299        for node in snapshot.nodes {
300            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
301            db.store.create_node_with_id(node.id, &label_refs);
302            for (key, value) in node.properties {
303                db.store.set_node_property(node.id, &key, value);
304            }
305        }
306
307        for edge in snapshot.edges {
308            db.store
309                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
310            for (key, value) in edge.properties {
311                db.store.set_edge_property(edge.id, &key, value);
312            }
313        }
314
315        Ok(db)
316    }
317
318    /// Replaces the current database contents with data from a binary snapshot.
319    ///
320    /// The `data` must have been produced by [`export_snapshot()`](Self::export_snapshot).
321    ///
322    /// All validation (duplicate IDs, dangling edge references) is performed
323    /// before any data is modified. If validation fails, the current database
324    /// is left unchanged. If validation passes, the store is cleared and
325    /// rebuilt from the snapshot atomically (from the perspective of
326    /// subsequent queries).
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the snapshot is invalid, contains dangling edge
331    /// references, has duplicate IDs, or deserialization fails.
332    pub fn restore_snapshot(&self, data: &[u8]) -> Result<()> {
333        let config = bincode::config::standard();
334        let (snapshot, _): (Snapshot, _) = bincode::serde::decode_from_slice(data, config)
335            .map_err(|e| Error::Internal(format!("snapshot restore failed: {e}")))?;
336
337        if snapshot.version != 1 {
338            return Err(Error::Internal(format!(
339                "unsupported snapshot version: {}",
340                snapshot.version
341            )));
342        }
343
344        // Pre-validate: collect all node IDs and check for duplicates
345        let mut node_ids = HashSet::with_capacity(snapshot.nodes.len());
346        for node in &snapshot.nodes {
347            if !node_ids.insert(node.id) {
348                return Err(Error::Internal(format!(
349                    "snapshot contains duplicate node ID {}",
350                    node.id
351                )));
352            }
353        }
354
355        // Validate edge references and check for duplicate edge IDs
356        let mut edge_ids = HashSet::with_capacity(snapshot.edges.len());
357        for edge in &snapshot.edges {
358            if !edge_ids.insert(edge.id) {
359                return Err(Error::Internal(format!(
360                    "snapshot contains duplicate edge ID {}",
361                    edge.id
362                )));
363            }
364            if !node_ids.contains(&edge.src) {
365                return Err(Error::Internal(format!(
366                    "snapshot edge {} references non-existent source node {}",
367                    edge.id, edge.src
368                )));
369            }
370            if !node_ids.contains(&edge.dst) {
371                return Err(Error::Internal(format!(
372                    "snapshot edge {} references non-existent destination node {}",
373                    edge.id, edge.dst
374                )));
375            }
376        }
377
378        // Validation passed: clear and rebuild
379        self.store.clear();
380
381        for node in snapshot.nodes {
382            let label_refs: Vec<&str> = node.labels.iter().map(|s| s.as_str()).collect();
383            self.store.create_node_with_id(node.id, &label_refs);
384            for (key, value) in node.properties {
385                self.store.set_node_property(node.id, &key, value);
386            }
387        }
388
389        for edge in snapshot.edges {
390            self.store
391                .create_edge_with_id(edge.id, edge.src, edge.dst, &edge.edge_type);
392            for (key, value) in edge.properties {
393                self.store.set_edge_property(edge.id, &key, value);
394            }
395        }
396
397        Ok(())
398    }
399
400    // =========================================================================
401    // ADMIN API: Iteration
402    // =========================================================================
403
404    /// Returns an iterator over all nodes in the database.
405    ///
406    /// Useful for dump/export operations.
407    pub fn iter_nodes(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Node> + '_ {
408        self.store.all_nodes()
409    }
410
411    /// Returns an iterator over all edges in the database.
412    ///
413    /// Useful for dump/export operations.
414    pub fn iter_edges(&self) -> impl Iterator<Item = grafeo_core::graph::lpg::Edge> + '_ {
415        self.store.all_edges()
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use grafeo_common::types::{EdgeId, NodeId, Value};
422
423    use super::super::GrafeoDB;
424    use super::{Snapshot, SnapshotEdge, SnapshotNode};
425
426    #[test]
427    fn test_restore_snapshot_basic() {
428        let db = GrafeoDB::new_in_memory();
429        let session = db.session();
430
431        // Populate
432        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
433        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
434
435        let snapshot = db.export_snapshot().unwrap();
436
437        // Modify
438        session
439            .execute("INSERT (:Person {name: 'Charlie'})")
440            .unwrap();
441        assert_eq!(db.store.node_count(), 3);
442
443        // Restore original
444        db.restore_snapshot(&snapshot).unwrap();
445
446        assert_eq!(db.store.node_count(), 2);
447        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
448        assert_eq!(result.rows.len(), 2);
449    }
450
451    #[test]
452    fn test_restore_snapshot_validation_failure() {
453        let db = GrafeoDB::new_in_memory();
454        let session = db.session();
455
456        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
457
458        // Corrupt snapshot: just garbage bytes
459        let result = db.restore_snapshot(b"garbage");
460        assert!(result.is_err());
461
462        // DB should be unchanged
463        assert_eq!(db.store.node_count(), 1);
464    }
465
466    #[test]
467    fn test_restore_snapshot_empty_db() {
468        let db = GrafeoDB::new_in_memory();
469
470        // Export empty snapshot, then populate, then restore to empty
471        let empty_snapshot = db.export_snapshot().unwrap();
472
473        let session = db.session();
474        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
475        assert_eq!(db.store.node_count(), 1);
476
477        db.restore_snapshot(&empty_snapshot).unwrap();
478        assert_eq!(db.store.node_count(), 0);
479    }
480
481    #[test]
482    fn test_restore_snapshot_with_edges() {
483        let db = GrafeoDB::new_in_memory();
484        let session = db.session();
485
486        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
487        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
488        session
489            .execute(
490                "MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) INSERT (a)-[:KNOWS]->(b)",
491            )
492            .unwrap();
493
494        let snapshot = db.export_snapshot().unwrap();
495        assert_eq!(db.store.edge_count(), 1);
496
497        // Modify: add more data
498        session
499            .execute("INSERT (:Person {name: 'Charlie'})")
500            .unwrap();
501
502        // Restore
503        db.restore_snapshot(&snapshot).unwrap();
504        assert_eq!(db.store.node_count(), 2);
505        assert_eq!(db.store.edge_count(), 1);
506    }
507
508    #[test]
509    fn test_restore_snapshot_preserves_sessions() {
510        let db = GrafeoDB::new_in_memory();
511        let session = db.session();
512
513        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
514        let snapshot = db.export_snapshot().unwrap();
515
516        // Modify
517        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
518
519        // Restore
520        db.restore_snapshot(&snapshot).unwrap();
521
522        // Session should still work and see restored data
523        let result = session.execute("MATCH (n:Person) RETURN n.name").unwrap();
524        assert_eq!(result.rows.len(), 1);
525    }
526
527    #[test]
528    fn test_export_import_roundtrip() {
529        let db = GrafeoDB::new_in_memory();
530        let session = db.session();
531
532        session
533            .execute("INSERT (:Person {name: 'Alice', age: 30})")
534            .unwrap();
535
536        let snapshot = db.export_snapshot().unwrap();
537        let db2 = GrafeoDB::import_snapshot(&snapshot).unwrap();
538        let session2 = db2.session();
539
540        let result = session2.execute("MATCH (n:Person) RETURN n.name").unwrap();
541        assert_eq!(result.rows.len(), 1);
542    }
543
544    // --- to_memory() ---
545
546    #[test]
547    fn test_to_memory_empty() {
548        let db = GrafeoDB::new_in_memory();
549        let copy = db.to_memory().unwrap();
550        assert_eq!(copy.store.node_count(), 0);
551        assert_eq!(copy.store.edge_count(), 0);
552    }
553
554    #[test]
555    fn test_to_memory_copies_nodes_and_properties() {
556        let db = GrafeoDB::new_in_memory();
557        let session = db.session();
558        session
559            .execute("INSERT (:Person {name: 'Alice', age: 30})")
560            .unwrap();
561        session
562            .execute("INSERT (:Person {name: 'Bob', age: 25})")
563            .unwrap();
564
565        let copy = db.to_memory().unwrap();
566        assert_eq!(copy.store.node_count(), 2);
567
568        let s2 = copy.session();
569        let result = s2
570            .execute("MATCH (p:Person) RETURN p.name ORDER BY p.name")
571            .unwrap();
572        assert_eq!(result.rows.len(), 2);
573        assert_eq!(result.rows[0][0], Value::String("Alice".into()));
574        assert_eq!(result.rows[1][0], Value::String("Bob".into()));
575    }
576
577    #[test]
578    fn test_to_memory_copies_edges_and_properties() {
579        let db = GrafeoDB::new_in_memory();
580        let a = db.create_node(&["Person"]);
581        db.set_node_property(a, "name", "Alice".into());
582        let b = db.create_node(&["Person"]);
583        db.set_node_property(b, "name", "Bob".into());
584        let edge = db.create_edge(a, b, "KNOWS");
585        db.set_edge_property(edge, "since", Value::Int64(2020));
586
587        let copy = db.to_memory().unwrap();
588        assert_eq!(copy.store.node_count(), 2);
589        assert_eq!(copy.store.edge_count(), 1);
590
591        let s2 = copy.session();
592        let result = s2.execute("MATCH ()-[e:KNOWS]->() RETURN e.since").unwrap();
593        assert_eq!(result.rows[0][0], Value::Int64(2020));
594    }
595
596    #[test]
597    fn test_to_memory_is_independent() {
598        let db = GrafeoDB::new_in_memory();
599        let session = db.session();
600        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
601
602        let copy = db.to_memory().unwrap();
603
604        // Mutating original should not affect copy
605        session.execute("INSERT (:Person {name: 'Bob'})").unwrap();
606        assert_eq!(db.store.node_count(), 2);
607        assert_eq!(copy.store.node_count(), 1);
608    }
609
610    // --- iter_nodes() / iter_edges() ---
611
612    #[test]
613    fn test_iter_nodes_empty() {
614        let db = GrafeoDB::new_in_memory();
615        assert_eq!(db.iter_nodes().count(), 0);
616    }
617
618    #[test]
619    fn test_iter_nodes_returns_all() {
620        let db = GrafeoDB::new_in_memory();
621        let id1 = db.create_node(&["Person"]);
622        db.set_node_property(id1, "name", "Alice".into());
623        let id2 = db.create_node(&["Animal"]);
624        db.set_node_property(id2, "name", "Fido".into());
625
626        let nodes: Vec<_> = db.iter_nodes().collect();
627        assert_eq!(nodes.len(), 2);
628
629        let names: Vec<_> = nodes
630            .iter()
631            .filter_map(|n| n.properties.iter().find(|(k, _)| k.as_str() == "name"))
632            .map(|(_, v)| v.clone())
633            .collect();
634        assert!(names.contains(&Value::String("Alice".into())));
635        assert!(names.contains(&Value::String("Fido".into())));
636    }
637
638    #[test]
639    fn test_iter_edges_empty() {
640        let db = GrafeoDB::new_in_memory();
641        assert_eq!(db.iter_edges().count(), 0);
642    }
643
644    #[test]
645    fn test_iter_edges_returns_all() {
646        let db = GrafeoDB::new_in_memory();
647        let a = db.create_node(&["A"]);
648        let b = db.create_node(&["B"]);
649        let c = db.create_node(&["C"]);
650        db.create_edge(a, b, "R1");
651        db.create_edge(b, c, "R2");
652
653        let edges: Vec<_> = db.iter_edges().collect();
654        assert_eq!(edges.len(), 2);
655
656        let types: Vec<_> = edges.iter().map(|e| e.edge_type.as_ref()).collect();
657        assert!(types.contains(&"R1"));
658        assert!(types.contains(&"R2"));
659    }
660
661    // --- restore_snapshot() validation ---
662
663    fn encode_snapshot(snap: &Snapshot) -> Vec<u8> {
664        bincode::serde::encode_to_vec(snap, bincode::config::standard()).unwrap()
665    }
666
667    #[test]
668    fn test_restore_rejects_unsupported_version() {
669        let db = GrafeoDB::new_in_memory();
670        let session = db.session();
671        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
672
673        let snap = Snapshot {
674            version: 99,
675            nodes: vec![],
676            edges: vec![],
677        };
678        let bytes = encode_snapshot(&snap);
679
680        let result = db.restore_snapshot(&bytes);
681        assert!(result.is_err());
682        let err = result.unwrap_err().to_string();
683        assert!(err.contains("unsupported snapshot version"), "got: {err}");
684
685        // DB unchanged
686        assert_eq!(db.store.node_count(), 1);
687    }
688
689    #[test]
690    fn test_restore_rejects_duplicate_node_ids() {
691        let db = GrafeoDB::new_in_memory();
692        let session = db.session();
693        session.execute("INSERT (:Person {name: 'Alice'})").unwrap();
694
695        let snap = Snapshot {
696            version: 1,
697            nodes: vec![
698                SnapshotNode {
699                    id: NodeId::new(0),
700                    labels: vec!["A".into()],
701                    properties: vec![],
702                },
703                SnapshotNode {
704                    id: NodeId::new(0),
705                    labels: vec!["B".into()],
706                    properties: vec![],
707                },
708            ],
709            edges: vec![],
710        };
711        let bytes = encode_snapshot(&snap);
712
713        let result = db.restore_snapshot(&bytes);
714        assert!(result.is_err());
715        let err = result.unwrap_err().to_string();
716        assert!(err.contains("duplicate node ID"), "got: {err}");
717        assert_eq!(db.store.node_count(), 1);
718    }
719
720    #[test]
721    fn test_restore_rejects_duplicate_edge_ids() {
722        let db = GrafeoDB::new_in_memory();
723
724        let snap = Snapshot {
725            version: 1,
726            nodes: vec![
727                SnapshotNode {
728                    id: NodeId::new(0),
729                    labels: vec![],
730                    properties: vec![],
731                },
732                SnapshotNode {
733                    id: NodeId::new(1),
734                    labels: vec![],
735                    properties: vec![],
736                },
737            ],
738            edges: vec![
739                SnapshotEdge {
740                    id: EdgeId::new(0),
741                    src: NodeId::new(0),
742                    dst: NodeId::new(1),
743                    edge_type: "REL".into(),
744                    properties: vec![],
745                },
746                SnapshotEdge {
747                    id: EdgeId::new(0),
748                    src: NodeId::new(0),
749                    dst: NodeId::new(1),
750                    edge_type: "REL".into(),
751                    properties: vec![],
752                },
753            ],
754        };
755        let bytes = encode_snapshot(&snap);
756
757        let result = db.restore_snapshot(&bytes);
758        assert!(result.is_err());
759        let err = result.unwrap_err().to_string();
760        assert!(err.contains("duplicate edge ID"), "got: {err}");
761    }
762
763    #[test]
764    fn test_restore_rejects_dangling_source() {
765        let db = GrafeoDB::new_in_memory();
766
767        let snap = Snapshot {
768            version: 1,
769            nodes: vec![SnapshotNode {
770                id: NodeId::new(0),
771                labels: vec![],
772                properties: vec![],
773            }],
774            edges: vec![SnapshotEdge {
775                id: EdgeId::new(0),
776                src: NodeId::new(999),
777                dst: NodeId::new(0),
778                edge_type: "REL".into(),
779                properties: vec![],
780            }],
781        };
782        let bytes = encode_snapshot(&snap);
783
784        let result = db.restore_snapshot(&bytes);
785        assert!(result.is_err());
786        let err = result.unwrap_err().to_string();
787        assert!(err.contains("non-existent source node"), "got: {err}");
788    }
789
790    #[test]
791    fn test_restore_rejects_dangling_destination() {
792        let db = GrafeoDB::new_in_memory();
793
794        let snap = Snapshot {
795            version: 1,
796            nodes: vec![SnapshotNode {
797                id: NodeId::new(0),
798                labels: vec![],
799                properties: vec![],
800            }],
801            edges: vec![SnapshotEdge {
802                id: EdgeId::new(0),
803                src: NodeId::new(0),
804                dst: NodeId::new(999),
805                edge_type: "REL".into(),
806                properties: vec![],
807            }],
808        };
809        let bytes = encode_snapshot(&snap);
810
811        let result = db.restore_snapshot(&bytes);
812        assert!(result.is_err());
813        let err = result.unwrap_err().to_string();
814        assert!(err.contains("non-existent destination node"), "got: {err}");
815    }
816}