Skip to main content

moloch_storage/
snapshot.rs

1//! Snapshot creation and import for fast sync.
2//!
3//! Snapshots allow new nodes to bootstrap quickly by downloading
4//! a complete state dump rather than replaying all blocks.
5//!
6//! # Format
7//!
8//! Snapshots are stored as compressed archives containing:
9//! - Header: metadata and verification hashes
10//! - Blocks: serialized block data
11//! - MMR: merkle mountain range nodes
12//! - Indexes: secondary indexes (optional)
13//!
14//! # Example
15//!
16//! ```ignore
17//! use moloch_storage::{RocksStorage, SnapshotBuilder, SnapshotImporter};
18//!
19//! // Create a snapshot
20//! let storage = RocksStorage::open("./data")?;
21//! let snapshot = SnapshotBuilder::new(&storage)
22//!     .at_height(1000000)
23//!     .with_indexes(true)
24//!     .build()?;
25//!
26//! snapshot.write_to_file("snapshot-1000000.msnap")?;
27//!
28//! // Import a snapshot
29//! let importer = SnapshotImporter::new("snapshot-1000000.msnap")?;
30//! importer.import_into(&mut new_storage)?;
31//! ```
32
33use std::fs::File;
34use std::io::{BufReader, BufWriter, Read, Write};
35use std::path::Path;
36
37use moloch_core::{BlockHash, Hash};
38use serde::{Deserialize, Serialize};
39
40/// Snapshot format version.
41pub const SNAPSHOT_VERSION: u32 = 1;
42
43/// Magic bytes for snapshot files.
44pub const SNAPSHOT_MAGIC: &[u8; 8] = b"MSNAP001";
45
46/// Snapshot metadata header.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct SnapshotHeader {
49    /// Format version.
50    pub version: u32,
51    /// Chain ID.
52    pub chain_id: String,
53    /// Block height at snapshot.
54    pub height: u64,
55    /// Block hash at snapshot.
56    pub block_hash: BlockHash,
57    /// MMR root at snapshot.
58    pub mmr_root: Hash,
59    /// Total number of events.
60    pub total_events: u64,
61    /// Total number of blocks.
62    pub total_blocks: u64,
63    /// Whether indexes are included.
64    pub includes_indexes: bool,
65    /// Uncompressed data size.
66    pub uncompressed_size: u64,
67    /// Hash of all data (for integrity check).
68    pub data_hash: Hash,
69    /// Timestamp of snapshot creation.
70    pub created_at: i64,
71}
72
73impl SnapshotHeader {
74    /// Verify the header is valid.
75    pub fn validate(&self) -> Result<()> {
76        if self.version != SNAPSHOT_VERSION {
77            return Err(SnapshotError::UnsupportedVersion(self.version));
78        }
79        if self.height == 0 && self.total_blocks > 0 {
80            return Err(SnapshotError::InvalidHeader(
81                "height is 0 but blocks exist".to_string(),
82            ));
83        }
84        Ok(())
85    }
86}
87
88/// Errors during snapshot operations.
89#[derive(Debug, thiserror::Error)]
90pub enum SnapshotError {
91    /// IO error.
92    #[error("IO error: {0}")]
93    Io(#[from] std::io::Error),
94
95    /// Serialization error.
96    #[error("serialization error: {0}")]
97    Serialization(String),
98
99    /// Invalid snapshot format.
100    #[error("invalid snapshot: {0}")]
101    InvalidFormat(String),
102
103    /// Unsupported version.
104    #[error("unsupported snapshot version: {0}")]
105    UnsupportedVersion(u32),
106
107    /// Invalid header.
108    #[error("invalid header: {0}")]
109    InvalidHeader(String),
110
111    /// Data integrity check failed.
112    #[error("data integrity check failed: expected {expected}, got {actual}")]
113    IntegrityError {
114        /// Expected hash.
115        expected: Hash,
116        /// Actual hash.
117        actual: Hash,
118    },
119
120    /// Storage error.
121    #[error("storage error: {0}")]
122    Storage(String),
123}
124
125/// Result type for snapshot operations.
126pub type Result<T> = std::result::Result<T, SnapshotError>;
127
128/// Snapshot builder for creating snapshots from storage.
129#[allow(dead_code)]
130pub struct SnapshotBuilder<'a, S> {
131    storage: &'a S,
132    height: Option<u64>,
133    include_indexes: bool,
134    chain_id: String,
135}
136
137impl<'a, S> SnapshotBuilder<'a, S> {
138    /// Create a new snapshot builder.
139    pub fn new(storage: &'a S) -> Self {
140        Self {
141            storage,
142            height: None,
143            include_indexes: false,
144            chain_id: "moloch-mainnet".to_string(),
145        }
146    }
147
148    /// Set the snapshot height.
149    pub fn at_height(mut self, height: u64) -> Self {
150        self.height = Some(height);
151        self
152    }
153
154    /// Include indexes in the snapshot.
155    pub fn with_indexes(mut self, include: bool) -> Self {
156        self.include_indexes = include;
157        self
158    }
159
160    /// Set the chain ID.
161    pub fn chain_id(mut self, id: impl Into<String>) -> Self {
162        self.chain_id = id.into();
163        self
164    }
165}
166
167/// A complete snapshot ready for writing.
168pub struct Snapshot {
169    /// Snapshot header.
170    pub header: SnapshotHeader,
171    /// Serialized blocks data.
172    blocks_data: Vec<u8>,
173    /// Serialized MMR data.
174    mmr_data: Vec<u8>,
175    /// Serialized index data (optional).
176    index_data: Option<Vec<u8>>,
177}
178
179impl Snapshot {
180    /// Create a new snapshot with the given data.
181    pub fn new(
182        header: SnapshotHeader,
183        blocks_data: Vec<u8>,
184        mmr_data: Vec<u8>,
185        index_data: Option<Vec<u8>>,
186    ) -> Self {
187        Self {
188            header,
189            blocks_data,
190            mmr_data,
191            index_data,
192        }
193    }
194
195    /// Write snapshot to a file.
196    pub fn write_to_file(&self, path: impl AsRef<Path>) -> Result<()> {
197        let file = File::create(path)?;
198        let mut writer = BufWriter::new(file);
199        self.write(&mut writer)
200    }
201
202    /// Write snapshot to a writer.
203    pub fn write<W: Write>(&self, writer: &mut W) -> Result<()> {
204        // Write magic bytes
205        writer.write_all(SNAPSHOT_MAGIC)?;
206
207        // Write header
208        let header_bytes = bincode::serialize(&self.header)
209            .map_err(|e| SnapshotError::Serialization(e.to_string()))?;
210        let header_len = header_bytes.len() as u32;
211        writer.write_all(&header_len.to_le_bytes())?;
212        writer.write_all(&header_bytes)?;
213
214        // Write blocks data
215        let blocks_len = self.blocks_data.len() as u64;
216        writer.write_all(&blocks_len.to_le_bytes())?;
217        writer.write_all(&self.blocks_data)?;
218
219        // Write MMR data
220        let mmr_len = self.mmr_data.len() as u64;
221        writer.write_all(&mmr_len.to_le_bytes())?;
222        writer.write_all(&self.mmr_data)?;
223
224        // Write index data if present
225        if let Some(ref index_data) = self.index_data {
226            let index_len = index_data.len() as u64;
227            writer.write_all(&index_len.to_le_bytes())?;
228            writer.write_all(index_data)?;
229        } else {
230            writer.write_all(&0u64.to_le_bytes())?;
231        }
232
233        writer.flush()?;
234        Ok(())
235    }
236
237    /// Total size of the snapshot in bytes.
238    pub fn size(&self) -> usize {
239        SNAPSHOT_MAGIC.len()
240            + 4 // header length
241            + bincode::serialized_size(&self.header).unwrap_or(0) as usize
242            + 8 + self.blocks_data.len()
243            + 8 + self.mmr_data.len()
244            + 8 + self.index_data.as_ref().map(|d| d.len()).unwrap_or(0)
245    }
246}
247
248/// Snapshot reader for importing snapshots.
249pub struct SnapshotReader {
250    /// Snapshot header.
251    pub header: SnapshotHeader,
252    /// Blocks data.
253    blocks_data: Vec<u8>,
254    /// MMR data.
255    mmr_data: Vec<u8>,
256    /// Index data.
257    index_data: Option<Vec<u8>>,
258}
259
260impl SnapshotReader {
261    /// Open a snapshot file for reading.
262    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
263        let file = File::open(path)?;
264        let mut reader = BufReader::new(file);
265        Self::read(&mut reader)
266    }
267
268    /// Read a snapshot from a reader.
269    pub fn read<R: Read>(reader: &mut R) -> Result<Self> {
270        // Read and verify magic bytes
271        let mut magic = [0u8; 8];
272        reader.read_exact(&mut magic)?;
273        if &magic != SNAPSHOT_MAGIC {
274            return Err(SnapshotError::InvalidFormat(
275                "invalid magic bytes".to_string(),
276            ));
277        }
278
279        // Read header
280        let mut header_len_bytes = [0u8; 4];
281        reader.read_exact(&mut header_len_bytes)?;
282        let header_len = u32::from_le_bytes(header_len_bytes) as usize;
283
284        let mut header_bytes = vec![0u8; header_len];
285        reader.read_exact(&mut header_bytes)?;
286        let header: SnapshotHeader = bincode::deserialize(&header_bytes)
287            .map_err(|e| SnapshotError::Serialization(e.to_string()))?;
288        header.validate()?;
289
290        // Read blocks data
291        let mut blocks_len_bytes = [0u8; 8];
292        reader.read_exact(&mut blocks_len_bytes)?;
293        let blocks_len = u64::from_le_bytes(blocks_len_bytes) as usize;
294
295        let mut blocks_data = vec![0u8; blocks_len];
296        reader.read_exact(&mut blocks_data)?;
297
298        // Read MMR data
299        let mut mmr_len_bytes = [0u8; 8];
300        reader.read_exact(&mut mmr_len_bytes)?;
301        let mmr_len = u64::from_le_bytes(mmr_len_bytes) as usize;
302
303        let mut mmr_data = vec![0u8; mmr_len];
304        reader.read_exact(&mut mmr_data)?;
305
306        // Read index data
307        let mut index_len_bytes = [0u8; 8];
308        reader.read_exact(&mut index_len_bytes)?;
309        let index_len = u64::from_le_bytes(index_len_bytes) as usize;
310
311        let index_data = if index_len > 0 {
312            let mut data = vec![0u8; index_len];
313            reader.read_exact(&mut data)?;
314            Some(data)
315        } else {
316            None
317        };
318
319        Ok(Self {
320            header,
321            blocks_data,
322            mmr_data,
323            index_data,
324        })
325    }
326
327    /// Verify data integrity.
328    pub fn verify(&self) -> Result<()> {
329        // Compute hash of all data
330        let mut hasher_data = Vec::new();
331        hasher_data.extend(&self.blocks_data);
332        hasher_data.extend(&self.mmr_data);
333        if let Some(ref index_data) = self.index_data {
334            hasher_data.extend(index_data);
335        }
336
337        let actual_hash = moloch_core::hash(&hasher_data);
338        if actual_hash != self.header.data_hash {
339            return Err(SnapshotError::IntegrityError {
340                expected: self.header.data_hash,
341                actual: actual_hash,
342            });
343        }
344
345        Ok(())
346    }
347
348    /// Get the snapshot height.
349    pub fn height(&self) -> u64 {
350        self.header.height
351    }
352
353    /// Get the blocks data.
354    pub fn blocks_data(&self) -> &[u8] {
355        &self.blocks_data
356    }
357
358    /// Get the MMR data.
359    pub fn mmr_data(&self) -> &[u8] {
360        &self.mmr_data
361    }
362
363    /// Get the index data.
364    pub fn index_data(&self) -> Option<&[u8]> {
365        self.index_data.as_deref()
366    }
367}
368
369/// Progress callback for import operations.
370pub type ProgressCallback = Box<dyn Fn(ImportProgress) + Send>;
371
372/// Import progress information.
373#[derive(Debug, Clone)]
374pub struct ImportProgress {
375    /// Current phase.
376    pub phase: ImportPhase,
377    /// Items processed.
378    pub processed: u64,
379    /// Total items.
380    pub total: u64,
381    /// Bytes processed.
382    pub bytes_processed: u64,
383    /// Total bytes.
384    pub bytes_total: u64,
385}
386
387impl ImportProgress {
388    /// Get progress as percentage.
389    pub fn percent(&self) -> f64 {
390        if self.total == 0 {
391            100.0
392        } else {
393            (self.processed as f64 / self.total as f64) * 100.0
394        }
395    }
396}
397
398/// Phases of snapshot import.
399#[derive(Debug, Clone, Copy, PartialEq, Eq)]
400pub enum ImportPhase {
401    /// Verifying snapshot integrity.
402    Verifying,
403    /// Importing blocks.
404    ImportingBlocks,
405    /// Importing MMR nodes.
406    ImportingMmr,
407    /// Importing indexes.
408    ImportingIndexes,
409    /// Finalizing import.
410    Finalizing,
411    /// Import complete.
412    Complete,
413}
414
415/// Configuration for state pruning.
416#[derive(Debug, Clone)]
417pub struct PruneConfig {
418    /// Keep blocks from this height onwards.
419    pub keep_from_height: u64,
420    /// Keep at least this many recent blocks.
421    pub keep_recent_blocks: u64,
422    /// Prune MMR nodes for old blocks.
423    pub prune_mmr: bool,
424    /// Prune indexes for old events.
425    pub prune_indexes: bool,
426}
427
428impl Default for PruneConfig {
429    fn default() -> Self {
430        Self {
431            keep_from_height: 0,
432            keep_recent_blocks: 10000,
433            prune_mmr: false,
434            prune_indexes: true,
435        }
436    }
437}
438
439/// Statistics from a pruning operation.
440#[derive(Debug, Clone, Default)]
441pub struct PruneStats {
442    /// Blocks pruned.
443    pub blocks_pruned: u64,
444    /// Events pruned.
445    pub events_pruned: u64,
446    /// MMR nodes pruned.
447    pub mmr_nodes_pruned: u64,
448    /// Index entries pruned.
449    pub index_entries_pruned: u64,
450    /// Bytes freed.
451    pub bytes_freed: u64,
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    #[test]
459    fn test_snapshot_header_validation() {
460        let header = SnapshotHeader {
461            version: SNAPSHOT_VERSION,
462            chain_id: "test".to_string(),
463            height: 1000,
464            block_hash: BlockHash(Hash::ZERO),
465            mmr_root: Hash::ZERO,
466            total_events: 50000,
467            total_blocks: 1000,
468            includes_indexes: false,
469            uncompressed_size: 1024 * 1024,
470            data_hash: Hash::ZERO,
471            created_at: 0,
472        };
473
474        assert!(header.validate().is_ok());
475    }
476
477    #[test]
478    fn test_snapshot_header_invalid_version() {
479        let header = SnapshotHeader {
480            version: 999,
481            chain_id: "test".to_string(),
482            height: 0,
483            block_hash: BlockHash(Hash::ZERO),
484            mmr_root: Hash::ZERO,
485            total_events: 0,
486            total_blocks: 0,
487            includes_indexes: false,
488            uncompressed_size: 0,
489            data_hash: Hash::ZERO,
490            created_at: 0,
491        };
492
493        assert!(matches!(
494            header.validate(),
495            Err(SnapshotError::UnsupportedVersion(999))
496        ));
497    }
498
499    #[test]
500    fn test_snapshot_roundtrip() {
501        let header = SnapshotHeader {
502            version: SNAPSHOT_VERSION,
503            chain_id: "test".to_string(),
504            height: 100,
505            block_hash: BlockHash(Hash::ZERO),
506            mmr_root: Hash::ZERO,
507            total_events: 1000,
508            total_blocks: 100,
509            includes_indexes: false,
510            uncompressed_size: 1024,
511            data_hash: Hash::ZERO,
512            created_at: 0,
513        };
514
515        let snapshot = Snapshot::new(header, vec![1, 2, 3, 4], vec![5, 6, 7, 8], None);
516
517        let mut buffer = Vec::new();
518        snapshot.write(&mut buffer).unwrap();
519
520        let mut cursor = std::io::Cursor::new(buffer);
521        let reader = SnapshotReader::read(&mut cursor).unwrap();
522
523        assert_eq!(reader.header.height, 100);
524        assert_eq!(reader.blocks_data(), &[1, 2, 3, 4]);
525        assert_eq!(reader.mmr_data(), &[5, 6, 7, 8]);
526    }
527
528    #[test]
529    fn test_prune_config_default() {
530        let config = PruneConfig::default();
531        assert_eq!(config.keep_recent_blocks, 10000);
532        assert!(!config.prune_mmr);
533    }
534}