1use std::fs::File;
34use std::io::{BufReader, BufWriter, Read, Write};
35use std::path::Path;
36
37use moloch_core::{BlockHash, Hash};
38use serde::{Deserialize, Serialize};
39
40pub const SNAPSHOT_VERSION: u32 = 1;
42
43pub const SNAPSHOT_MAGIC: &[u8; 8] = b"MSNAP001";
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct SnapshotHeader {
49 pub version: u32,
51 pub chain_id: String,
53 pub height: u64,
55 pub block_hash: BlockHash,
57 pub mmr_root: Hash,
59 pub total_events: u64,
61 pub total_blocks: u64,
63 pub includes_indexes: bool,
65 pub uncompressed_size: u64,
67 pub data_hash: Hash,
69 pub created_at: i64,
71}
72
73impl SnapshotHeader {
74 pub fn validate(&self) -> Result<()> {
76 if self.version != SNAPSHOT_VERSION {
77 return Err(SnapshotError::UnsupportedVersion(self.version));
78 }
79 if self.height == 0 && self.total_blocks > 0 {
80 return Err(SnapshotError::InvalidHeader(
81 "height is 0 but blocks exist".to_string(),
82 ));
83 }
84 Ok(())
85 }
86}
87
88#[derive(Debug, thiserror::Error)]
90pub enum SnapshotError {
91 #[error("IO error: {0}")]
93 Io(#[from] std::io::Error),
94
95 #[error("serialization error: {0}")]
97 Serialization(String),
98
99 #[error("invalid snapshot: {0}")]
101 InvalidFormat(String),
102
103 #[error("unsupported snapshot version: {0}")]
105 UnsupportedVersion(u32),
106
107 #[error("invalid header: {0}")]
109 InvalidHeader(String),
110
111 #[error("data integrity check failed: expected {expected}, got {actual}")]
113 IntegrityError {
114 expected: Hash,
116 actual: Hash,
118 },
119
120 #[error("storage error: {0}")]
122 Storage(String),
123}
124
125pub type Result<T> = std::result::Result<T, SnapshotError>;
127
128#[allow(dead_code)]
130pub struct SnapshotBuilder<'a, S> {
131 storage: &'a S,
132 height: Option<u64>,
133 include_indexes: bool,
134 chain_id: String,
135}
136
137impl<'a, S> SnapshotBuilder<'a, S> {
138 pub fn new(storage: &'a S) -> Self {
140 Self {
141 storage,
142 height: None,
143 include_indexes: false,
144 chain_id: "moloch-mainnet".to_string(),
145 }
146 }
147
148 pub fn at_height(mut self, height: u64) -> Self {
150 self.height = Some(height);
151 self
152 }
153
154 pub fn with_indexes(mut self, include: bool) -> Self {
156 self.include_indexes = include;
157 self
158 }
159
160 pub fn chain_id(mut self, id: impl Into<String>) -> Self {
162 self.chain_id = id.into();
163 self
164 }
165}
166
167pub struct Snapshot {
169 pub header: SnapshotHeader,
171 blocks_data: Vec<u8>,
173 mmr_data: Vec<u8>,
175 index_data: Option<Vec<u8>>,
177}
178
179impl Snapshot {
180 pub fn new(
182 header: SnapshotHeader,
183 blocks_data: Vec<u8>,
184 mmr_data: Vec<u8>,
185 index_data: Option<Vec<u8>>,
186 ) -> Self {
187 Self {
188 header,
189 blocks_data,
190 mmr_data,
191 index_data,
192 }
193 }
194
195 pub fn write_to_file(&self, path: impl AsRef<Path>) -> Result<()> {
197 let file = File::create(path)?;
198 let mut writer = BufWriter::new(file);
199 self.write(&mut writer)
200 }
201
202 pub fn write<W: Write>(&self, writer: &mut W) -> Result<()> {
204 writer.write_all(SNAPSHOT_MAGIC)?;
206
207 let header_bytes = bincode::serialize(&self.header)
209 .map_err(|e| SnapshotError::Serialization(e.to_string()))?;
210 let header_len = header_bytes.len() as u32;
211 writer.write_all(&header_len.to_le_bytes())?;
212 writer.write_all(&header_bytes)?;
213
214 let blocks_len = self.blocks_data.len() as u64;
216 writer.write_all(&blocks_len.to_le_bytes())?;
217 writer.write_all(&self.blocks_data)?;
218
219 let mmr_len = self.mmr_data.len() as u64;
221 writer.write_all(&mmr_len.to_le_bytes())?;
222 writer.write_all(&self.mmr_data)?;
223
224 if let Some(ref index_data) = self.index_data {
226 let index_len = index_data.len() as u64;
227 writer.write_all(&index_len.to_le_bytes())?;
228 writer.write_all(index_data)?;
229 } else {
230 writer.write_all(&0u64.to_le_bytes())?;
231 }
232
233 writer.flush()?;
234 Ok(())
235 }
236
237 pub fn size(&self) -> usize {
239 SNAPSHOT_MAGIC.len()
240 + 4 + bincode::serialized_size(&self.header).unwrap_or(0) as usize
242 + 8 + self.blocks_data.len()
243 + 8 + self.mmr_data.len()
244 + 8 + self.index_data.as_ref().map(|d| d.len()).unwrap_or(0)
245 }
246}
247
248pub struct SnapshotReader {
250 pub header: SnapshotHeader,
252 blocks_data: Vec<u8>,
254 mmr_data: Vec<u8>,
256 index_data: Option<Vec<u8>>,
258}
259
260impl SnapshotReader {
261 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
263 let file = File::open(path)?;
264 let mut reader = BufReader::new(file);
265 Self::read(&mut reader)
266 }
267
268 pub fn read<R: Read>(reader: &mut R) -> Result<Self> {
270 let mut magic = [0u8; 8];
272 reader.read_exact(&mut magic)?;
273 if &magic != SNAPSHOT_MAGIC {
274 return Err(SnapshotError::InvalidFormat(
275 "invalid magic bytes".to_string(),
276 ));
277 }
278
279 let mut header_len_bytes = [0u8; 4];
281 reader.read_exact(&mut header_len_bytes)?;
282 let header_len = u32::from_le_bytes(header_len_bytes) as usize;
283
284 let mut header_bytes = vec![0u8; header_len];
285 reader.read_exact(&mut header_bytes)?;
286 let header: SnapshotHeader = bincode::deserialize(&header_bytes)
287 .map_err(|e| SnapshotError::Serialization(e.to_string()))?;
288 header.validate()?;
289
290 let mut blocks_len_bytes = [0u8; 8];
292 reader.read_exact(&mut blocks_len_bytes)?;
293 let blocks_len = u64::from_le_bytes(blocks_len_bytes) as usize;
294
295 let mut blocks_data = vec![0u8; blocks_len];
296 reader.read_exact(&mut blocks_data)?;
297
298 let mut mmr_len_bytes = [0u8; 8];
300 reader.read_exact(&mut mmr_len_bytes)?;
301 let mmr_len = u64::from_le_bytes(mmr_len_bytes) as usize;
302
303 let mut mmr_data = vec![0u8; mmr_len];
304 reader.read_exact(&mut mmr_data)?;
305
306 let mut index_len_bytes = [0u8; 8];
308 reader.read_exact(&mut index_len_bytes)?;
309 let index_len = u64::from_le_bytes(index_len_bytes) as usize;
310
311 let index_data = if index_len > 0 {
312 let mut data = vec![0u8; index_len];
313 reader.read_exact(&mut data)?;
314 Some(data)
315 } else {
316 None
317 };
318
319 Ok(Self {
320 header,
321 blocks_data,
322 mmr_data,
323 index_data,
324 })
325 }
326
327 pub fn verify(&self) -> Result<()> {
329 let mut hasher_data = Vec::new();
331 hasher_data.extend(&self.blocks_data);
332 hasher_data.extend(&self.mmr_data);
333 if let Some(ref index_data) = self.index_data {
334 hasher_data.extend(index_data);
335 }
336
337 let actual_hash = moloch_core::hash(&hasher_data);
338 if actual_hash != self.header.data_hash {
339 return Err(SnapshotError::IntegrityError {
340 expected: self.header.data_hash,
341 actual: actual_hash,
342 });
343 }
344
345 Ok(())
346 }
347
348 pub fn height(&self) -> u64 {
350 self.header.height
351 }
352
353 pub fn blocks_data(&self) -> &[u8] {
355 &self.blocks_data
356 }
357
358 pub fn mmr_data(&self) -> &[u8] {
360 &self.mmr_data
361 }
362
363 pub fn index_data(&self) -> Option<&[u8]> {
365 self.index_data.as_deref()
366 }
367}
368
369pub type ProgressCallback = Box<dyn Fn(ImportProgress) + Send>;
371
372#[derive(Debug, Clone)]
374pub struct ImportProgress {
375 pub phase: ImportPhase,
377 pub processed: u64,
379 pub total: u64,
381 pub bytes_processed: u64,
383 pub bytes_total: u64,
385}
386
387impl ImportProgress {
388 pub fn percent(&self) -> f64 {
390 if self.total == 0 {
391 100.0
392 } else {
393 (self.processed as f64 / self.total as f64) * 100.0
394 }
395 }
396}
397
398#[derive(Debug, Clone, Copy, PartialEq, Eq)]
400pub enum ImportPhase {
401 Verifying,
403 ImportingBlocks,
405 ImportingMmr,
407 ImportingIndexes,
409 Finalizing,
411 Complete,
413}
414
415#[derive(Debug, Clone)]
417pub struct PruneConfig {
418 pub keep_from_height: u64,
420 pub keep_recent_blocks: u64,
422 pub prune_mmr: bool,
424 pub prune_indexes: bool,
426}
427
428impl Default for PruneConfig {
429 fn default() -> Self {
430 Self {
431 keep_from_height: 0,
432 keep_recent_blocks: 10000,
433 prune_mmr: false,
434 prune_indexes: true,
435 }
436 }
437}
438
439#[derive(Debug, Clone, Default)]
441pub struct PruneStats {
442 pub blocks_pruned: u64,
444 pub events_pruned: u64,
446 pub mmr_nodes_pruned: u64,
448 pub index_entries_pruned: u64,
450 pub bytes_freed: u64,
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn test_snapshot_header_validation() {
460 let header = SnapshotHeader {
461 version: SNAPSHOT_VERSION,
462 chain_id: "test".to_string(),
463 height: 1000,
464 block_hash: BlockHash(Hash::ZERO),
465 mmr_root: Hash::ZERO,
466 total_events: 50000,
467 total_blocks: 1000,
468 includes_indexes: false,
469 uncompressed_size: 1024 * 1024,
470 data_hash: Hash::ZERO,
471 created_at: 0,
472 };
473
474 assert!(header.validate().is_ok());
475 }
476
477 #[test]
478 fn test_snapshot_header_invalid_version() {
479 let header = SnapshotHeader {
480 version: 999,
481 chain_id: "test".to_string(),
482 height: 0,
483 block_hash: BlockHash(Hash::ZERO),
484 mmr_root: Hash::ZERO,
485 total_events: 0,
486 total_blocks: 0,
487 includes_indexes: false,
488 uncompressed_size: 0,
489 data_hash: Hash::ZERO,
490 created_at: 0,
491 };
492
493 assert!(matches!(
494 header.validate(),
495 Err(SnapshotError::UnsupportedVersion(999))
496 ));
497 }
498
499 #[test]
500 fn test_snapshot_roundtrip() {
501 let header = SnapshotHeader {
502 version: SNAPSHOT_VERSION,
503 chain_id: "test".to_string(),
504 height: 100,
505 block_hash: BlockHash(Hash::ZERO),
506 mmr_root: Hash::ZERO,
507 total_events: 1000,
508 total_blocks: 100,
509 includes_indexes: false,
510 uncompressed_size: 1024,
511 data_hash: Hash::ZERO,
512 created_at: 0,
513 };
514
515 let snapshot = Snapshot::new(header, vec![1, 2, 3, 4], vec![5, 6, 7, 8], None);
516
517 let mut buffer = Vec::new();
518 snapshot.write(&mut buffer).unwrap();
519
520 let mut cursor = std::io::Cursor::new(buffer);
521 let reader = SnapshotReader::read(&mut cursor).unwrap();
522
523 assert_eq!(reader.header.height, 100);
524 assert_eq!(reader.blocks_data(), &[1, 2, 3, 4]);
525 assert_eq!(reader.mmr_data(), &[5, 6, 7, 8]);
526 }
527
528 #[test]
529 fn test_prune_config_default() {
530 let config = PruneConfig::default();
531 assert_eq!(config.keep_recent_blocks, 10000);
532 assert!(!config.prune_mmr);
533 }
534}