Skip to main content

newton_bootnode/
snapshot_store.rs

1//! Sealed snapshot persistence backed by redb.
2//!
3//! Stores full JMT snapshots keyed by `sequence_no`. The unified tree has a single
4//! snapshot stream (no per-subtree keys) — namespace separation is at the leaf-key
5//! level inside the snapshot body, matching the unified-tree design in the state-tree
6//! crate.
7//!
8//! **Memory:** `get` and `latest_at_most` return only the lightweight
9//! [`SnapshotHeader`] from a dedicated header table. The body lives in a
10//! separate table and is only read via [`SnapshotStore::read_body`], which
11//! streams to a caller-supplied `Write` impl.
12//!
13//! **Staleness (§S.19):** the store is policy-free — `latest_at_most` returns
14//! snapshots of any age. Consumer-side staleness gating (e.g., operator bootstrap
15//! checking `MAX_SNAPSHOT_STALENESS_SECS`) is enforced at the boundary, not here.
16
17use std::{io::Write, path::Path};
18
19use alloy_primitives::B256;
20use redb::{ReadableDatabase, ReadableTable, TableDefinition};
21use serde::{Deserialize, Serialize};
22use tracing::{debug, info, warn};
23
24use crate::error::BootnodeError;
25
26const HEADERS_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("snapshot_headers");
27const BODIES_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("snapshot_bodies");
28
29const SNAPSHOT_FORMAT_V1: u8 = 1;
30
31/// 256 MiB — upper bound on a valid JMT snapshot body.
32///
33/// Chosen to accommodate the maximum projected unified-tree size for the
34/// foreseeable deployment window while preventing OOM from a corrupt record
35/// claiming a multi-GB body. Revisit when tree size projections change.
36pub const MAX_SNAPSHOT_BODY_BYTES: usize = 256 * 1024 * 1024;
37
38/// Lightweight metadata for a sealed snapshot — no body allocation.
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct SnapshotHeader {
41    /// On-disk format version. Current: [`SNAPSHOT_FORMAT_V1`].
42    pub format_version: u8,
43    /// State-tree sequence number at which this snapshot was taken.
44    pub sequence_no: u64,
45    /// State root hash of the unified JMT at `sequence_no`.
46    pub state_root: B256,
47    /// UNIX timestamp (seconds) when the snapshot was sealed.
48    pub sealed_at_ts: u64,
49    /// Size of the serialized body in bytes.
50    pub body_len: u64,
51}
52
53/// redb-backed store for sealed full-tree snapshots.
54///
55/// The store is policy-free: it persists and retrieves snapshots without
56/// filtering by age. Staleness enforcement is the consumer's responsibility.
57#[derive(Debug)]
58pub struct SnapshotStore {
59    db: redb::Database,
60}
61
62impl SnapshotStore {
63    /// Open or create the snapshot store at `path`.
64    pub fn open(path: impl AsRef<Path>) -> Result<Self, BootnodeError> {
65        let path = path.as_ref();
66        if let Some(parent) = path.parent() {
67            std::fs::create_dir_all(parent)?;
68        }
69        let db = redb::Database::create(path)?;
70        let txn = db.begin_write()?;
71        txn.open_table(HEADERS_TABLE)?;
72        txn.open_table(BODIES_TABLE)?;
73        txn.commit()?;
74        info!(path = %path.display(), "snapshot store opened");
75        Ok(Self { db })
76    }
77
78    /// Persist a snapshot. Overwrites any existing snapshot at the same `sequence_no`.
79    ///
80    /// Header and body are written in a single transaction.
81    pub fn put(&self, header: &SnapshotHeader, body: &[u8]) -> Result<(), BootnodeError> {
82        if body.len() > MAX_SNAPSHOT_BODY_BYTES {
83            return Err(BootnodeError::SnapshotBodyTooLarge {
84                size: body.len(),
85                max: MAX_SNAPSHOT_BODY_BYTES,
86            });
87        }
88        if header.body_len != body.len() as u64 {
89            return Err(BootnodeError::SnapshotBodyLenMismatch {
90                header_body_len: header.body_len,
91                actual: body.len() as u64,
92            });
93        }
94        let header_bytes = rmp_serde::to_vec(header)?;
95        debug!(
96            sequence_no = header.sequence_no,
97            header_bytes = header_bytes.len(),
98            body_bytes = body.len(),
99            "snapshot put"
100        );
101        let txn = self.db.begin_write()?;
102        {
103            let mut headers = txn.open_table(HEADERS_TABLE)?;
104            let mut bodies = txn.open_table(BODIES_TABLE)?;
105            headers.insert(header.sequence_no, header_bytes.as_slice())?;
106            bodies.insert(header.sequence_no, body)?;
107        }
108        txn.commit()?;
109        Ok(())
110    }
111
112    /// Retrieve the header at exactly `sequence_no`, if any.
113    pub fn get(&self, sequence_no: u64) -> Result<Option<SnapshotHeader>, BootnodeError> {
114        let txn = self.db.begin_read()?;
115        let table = txn.open_table(HEADERS_TABLE)?;
116        match table.get(sequence_no)? {
117            Some(guard) => Ok(Some(decode_header(guard.value())?)),
118            None => Ok(None),
119        }
120    }
121
122    /// Return the header with the largest `sequence_no <= up_to`, if any.
123    pub fn latest_at_most(&self, up_to: u64) -> Result<Option<SnapshotHeader>, BootnodeError> {
124        let txn = self.db.begin_read()?;
125        let table = txn.open_table(HEADERS_TABLE)?;
126        let mut range = table.range(..=up_to)?;
127        match range.next_back() {
128            Some(Ok((_, guard))) => Ok(Some(decode_header(guard.value())?)),
129            Some(Err(e)) => Err(e.into()),
130            None => Ok(None),
131        }
132    }
133
134    /// Stream the body of snapshot `sequence_no` into `writer`.
135    ///
136    /// Both tables are opened from the same read transaction for snapshot
137    /// isolation. The header is the source of truth for existence:
138    ///
139    /// - **Header missing**: returns `Ok(None)` regardless of `BODIES_TABLE`
140    ///   state. The canonical "not sealed" signal. An orphan body without a
141    ///   matching header is silently inaccessible through this method —
142    ///   correct, because the only insertion path (`put`) writes header+body
143    ///   atomically, so an orphan body would be evidence of either external
144    ///   tampering or a redb-internal bug, neither of which `read_body`
145    ///   should service.
146    /// - **Header present, body missing**: returns
147    ///   [`BootnodeError::SnapshotBodyLenMismatch`] with `actual: 0`. Indicates
148    ///   a torn write or partial restoration.
149    /// - **Header present, body present, lengths differ**: returns
150    ///   [`BootnodeError::SnapshotBodyLenMismatch`] with the actual length.
151    ///   Indicates corruption or a write-side invariant violation.
152    /// - **Header present, body present, lengths match**: streams body to
153    ///   `writer`, returns `Ok(Some(body_len))`.
154    pub fn read_body(&self, sequence_no: u64, writer: &mut impl Write) -> Result<Option<u64>, BootnodeError> {
155        let txn = self.db.begin_read()?;
156        let headers = txn.open_table(HEADERS_TABLE)?;
157        let header = match headers.get(sequence_no)? {
158            Some(guard) => decode_header(guard.value())?,
159            None => return Ok(None),
160        };
161        let bodies = txn.open_table(BODIES_TABLE)?;
162        let Some(guard) = bodies.get(sequence_no)? else {
163            warn!(
164                sequence_no,
165                header_body_len = header.body_len,
166                "orphan snapshot header — body missing on disk"
167            );
168            return Err(BootnodeError::SnapshotBodyLenMismatch {
169                header_body_len: header.body_len,
170                actual: 0,
171            });
172        };
173        let body = guard.value();
174        let actual = body.len() as u64;
175        if actual != header.body_len {
176            warn!(
177                sequence_no,
178                header_body_len = header.body_len,
179                actual,
180                "snapshot body length drift between header and bodies tables"
181            );
182            return Err(BootnodeError::SnapshotBodyLenMismatch {
183                header_body_len: header.body_len,
184                actual,
185            });
186        }
187        writer.write_all(body)?;
188        Ok(Some(actual))
189    }
190}
191
192fn decode_header(bytes: &[u8]) -> Result<SnapshotHeader, BootnodeError> {
193    let hdr: SnapshotHeader = rmp_serde::from_slice(bytes)?;
194    if hdr.format_version != SNAPSHOT_FORMAT_V1 {
195        warn!(version = hdr.format_version, "unsupported snapshot format");
196        return Err(BootnodeError::UnsupportedSnapshotFormat(hdr.format_version));
197    }
198    Ok(hdr)
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    const TEST_BODY: &[u8] = &[0xAB; 64];
206
207    fn test_header(sequence_no: u64, sealed_at_ts: u64) -> SnapshotHeader {
208        SnapshotHeader {
209            format_version: SNAPSHOT_FORMAT_V1,
210            sequence_no,
211            state_root: B256::from([sequence_no as u8; 32]),
212            sealed_at_ts,
213            body_len: TEST_BODY.len() as u64,
214        }
215    }
216
217    #[test]
218    fn roundtrip_header() {
219        let dir = tempfile::tempdir().unwrap();
220        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
221
222        let hdr = test_header(42, 1_700_000_000);
223        store.put(&hdr, TEST_BODY).unwrap();
224
225        let got = store.get(42).unwrap().expect("should exist");
226        assert_eq!(got.format_version, SNAPSHOT_FORMAT_V1);
227        assert_eq!(got.sequence_no, hdr.sequence_no);
228        assert_eq!(got.state_root, hdr.state_root);
229        assert_eq!(got.sealed_at_ts, hdr.sealed_at_ts);
230        assert_eq!(got.body_len, TEST_BODY.len() as u64);
231    }
232
233    #[test]
234    fn read_body_roundtrip() {
235        let dir = tempfile::tempdir().unwrap();
236        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
237
238        let hdr = test_header(42, 1_700_000_000);
239        store.put(&hdr, TEST_BODY).unwrap();
240
241        let mut buf = Vec::new();
242        let n = store.read_body(42, &mut buf).unwrap().expect("should exist");
243        assert_eq!(n, TEST_BODY.len() as u64);
244        assert_eq!(buf, TEST_BODY);
245    }
246
247    #[test]
248    fn read_body_missing_returns_none() {
249        let dir = tempfile::tempdir().unwrap();
250        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
251
252        assert!(store.read_body(99, &mut Vec::new()).unwrap().is_none());
253    }
254
255    #[test]
256    fn get_missing_returns_none() {
257        let dir = tempfile::tempdir().unwrap();
258        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
259
260        assert!(store.get(99).unwrap().is_none());
261    }
262
263    #[test]
264    fn latest_at_most_finds_right_snap() {
265        let dir = tempfile::tempdir().unwrap();
266        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
267
268        for seq in [10, 20, 30, 40] {
269            store.put(&test_header(seq, 1_000_000 + seq), TEST_BODY).unwrap();
270        }
271
272        let hdr = store.latest_at_most(25).unwrap().expect("should find 20");
273        assert_eq!(hdr.sequence_no, 20);
274
275        let hdr = store.latest_at_most(40).unwrap().expect("should find 40");
276        assert_eq!(hdr.sequence_no, 40);
277
278        assert!(store.latest_at_most(5).unwrap().is_none());
279    }
280
281    #[test]
282    fn sealed_at_ts_preserved_including_epoch_zero() {
283        let dir = tempfile::tempdir().unwrap();
284        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
285
286        store.put(&test_header(1, 0), TEST_BODY).unwrap();
287
288        let hdr = store.get(1).unwrap().expect("should exist");
289        assert_eq!(hdr.sealed_at_ts, 0);
290
291        let hdr = store.latest_at_most(100).unwrap().expect("should find it");
292        assert_eq!(hdr.sealed_at_ts, 0);
293    }
294
295    #[test]
296    fn sequence_no_zero_roundtrip() {
297        let dir = tempfile::tempdir().unwrap();
298        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
299
300        store.put(&test_header(0, 0), TEST_BODY).unwrap();
301
302        let hdr = store.get(0).unwrap().expect("seq 0 should exist");
303        assert_eq!(hdr.sequence_no, 0);
304
305        let hdr = store
306            .latest_at_most(0)
307            .unwrap()
308            .expect("latest_at_most(0) should return seq 0");
309        assert_eq!(hdr.sequence_no, 0);
310    }
311
312    #[test]
313    fn unsupported_format_version_rejected() {
314        let dir = tempfile::tempdir().unwrap();
315        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
316
317        let bad_hdr = SnapshotHeader {
318            format_version: 99,
319            sequence_no: 1,
320            state_root: B256::from([1u8; 32]),
321            sealed_at_ts: 1_000,
322            body_len: 64,
323        };
324        let bytes = rmp_serde::to_vec(&bad_hdr).unwrap();
325        let txn = store.db.begin_write().unwrap();
326        {
327            let mut table = txn.open_table(HEADERS_TABLE).unwrap();
328            table.insert(1u64, bytes.as_slice()).unwrap();
329        }
330        txn.commit().unwrap();
331
332        let err = store.get(1).unwrap_err();
333        assert!(matches!(err, BootnodeError::UnsupportedSnapshotFormat(99)));
334    }
335
336    #[test]
337    fn read_body_rejects_cross_table_body_len_drift() {
338        let dir = tempfile::tempdir().unwrap();
339        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
340
341        // Write a header claiming a 64-byte body, then a 32-byte body directly
342        // to BODIES_TABLE — bypassing `put` so the cross-table invariant is
343        // violated on disk.
344        let mut hdr = test_header(7, 1_000);
345        hdr.body_len = 64;
346        let header_bytes = rmp_serde::to_vec(&hdr).unwrap();
347
348        let txn = store.db.begin_write().unwrap();
349        {
350            let mut headers = txn.open_table(HEADERS_TABLE).unwrap();
351            headers.insert(7u64, header_bytes.as_slice()).unwrap();
352            let mut bodies = txn.open_table(BODIES_TABLE).unwrap();
353            let short_body = vec![0xCC; 32];
354            bodies.insert(7u64, short_body.as_slice()).unwrap();
355        }
356        txn.commit().unwrap();
357
358        let mut buf = Vec::new();
359        let err = store.read_body(7, &mut buf).unwrap_err();
360        assert!(matches!(
361            err,
362            BootnodeError::SnapshotBodyLenMismatch {
363                header_body_len: 64,
364                actual: 32
365            }
366        ));
367        assert!(buf.is_empty(), "writer must not see partial bytes on mismatch");
368    }
369
370    #[test]
371    fn read_body_rejects_orphan_header() {
372        let dir = tempfile::tempdir().unwrap();
373        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
374
375        // Header present, body absent (torn write).
376        let hdr = test_header(9, 1_000);
377        let header_bytes = rmp_serde::to_vec(&hdr).unwrap();
378
379        let txn = store.db.begin_write().unwrap();
380        {
381            let mut headers = txn.open_table(HEADERS_TABLE).unwrap();
382            headers.insert(9u64, header_bytes.as_slice()).unwrap();
383        }
384        txn.commit().unwrap();
385
386        let err = store.read_body(9, &mut Vec::new()).unwrap_err();
387        assert!(matches!(err, BootnodeError::SnapshotBodyLenMismatch { actual: 0, .. }));
388    }
389
390    #[test]
391    fn put_rejects_body_len_mismatch() {
392        let dir = tempfile::tempdir().unwrap();
393        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
394
395        let mut hdr = test_header(1, 1_000);
396        hdr.body_len = 999;
397
398        let err = store.put(&hdr, TEST_BODY).unwrap_err();
399        assert!(matches!(err, BootnodeError::SnapshotBodyLenMismatch { .. }));
400        assert!(store.get(1).unwrap().is_none());
401    }
402
403    #[test]
404    fn put_rejects_oversized_body() {
405        let dir = tempfile::tempdir().unwrap();
406        let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
407
408        let big_body = vec![0xCC; MAX_SNAPSHOT_BODY_BYTES + 1];
409        let hdr = SnapshotHeader {
410            format_version: SNAPSHOT_FORMAT_V1,
411            sequence_no: 1,
412            state_root: B256::ZERO,
413            sealed_at_ts: 1_000,
414            body_len: big_body.len() as u64,
415        };
416
417        let err = store.put(&hdr, &big_body).unwrap_err();
418        assert!(matches!(err, BootnodeError::SnapshotBodyTooLarge { .. }));
419        assert!(store.get(1).unwrap().is_none());
420    }
421}