Skip to main content

newton_bootnode/
delta_store.rs

1//! Delta log persistence backed by redb.
2//!
3//! Stores `(DaCert, delta_blob)` pairs keyed by `sequence_no` so bootstrap
4//! consumers get both the EigenDA attestation and the raw delta in one
5//! round-trip.  Range fetch returns entries in ascending sequence order for
6//! deterministic replay.
7//!
8//! **Gap tolerance (§S.19):** `put` accepts any sequence number without
9//! requiring predecessors to exist.  `fetch_range` returns the entries that
10//! *do* exist; gaps are signalled by missing sequence numbers in the result
11//! and are never treated as errors.  This is required for post-injection
12//! recovery where an admin-seeded snapshot may create discontinuities in the
13//! delta stream.
14
15use std::path::Path;
16
17use redb::{ReadableDatabase, ReadableTable, TableDefinition};
18use serde::{Deserialize, Serialize};
19use tracing::{debug, info, warn};
20
21use crate::error::BootnodeError;
22
23const DELTAS_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("deltas");
24
25const DELTA_FORMAT_V1: u8 = 1;
26
27/// 4 MiB — upper bound on a serialized EigenDA certificate.
28///
29/// Real certs today are under 1 KiB; 4 MiB gives headroom for future proof
30/// format changes while preventing OOM from a corrupt record.
31pub const MAX_DELTA_CERT_BYTES: usize = 4 * 1024 * 1024;
32
33/// 64 MiB — upper bound on a single delta blob.
34///
35/// Sized to the EigenDA per-blob ceiling. A corrupt record claiming a multi-GB
36/// blob would OOM the bootnode at deserialize time without this check.
37pub const MAX_DELTA_BLOB_BYTES: usize = 64 * 1024 * 1024;
38
39/// Maximum number of delta entries returnable from a single `fetch_range` call.
40///
41/// `fetch_range` materializes the result into a `Vec`, so an unbounded window
42/// would let a misbehaving caller request `[0, u64::MAX)` and OOM the bootnode.
43/// Bootstrap consumers walk the delta stream in fixed-size chunks; 1024 entries
44/// per call is well above the chunk sizes used in §S.19 replay loops.
45///
46/// **Layering:** this is an entry-count bound at the store boundary. Worst-case
47/// bytes returned are `MAX_FETCH_RANGE_LEN * MAX_DELTA_BLOB_BYTES` ≈ 64 GiB,
48/// but real deltas are KB-scale state-tree mutations and never approach the
49/// per-blob ceiling. A bytes-budget belongs at the server boundary (HTTP
50/// response chunking / `body_size_limit`), not here — the store cannot reason
51/// about wire-format overhead or client-side buffering.
52pub const MAX_FETCH_RANGE_LEN: u64 = 1024;
53
54/// A delta record: format version, EigenDA certificate bytes, and raw blob.
55#[derive(Debug, Serialize, Deserialize)]
56pub struct DeltaEntry {
57    /// On-disk format version. Current: [`DELTA_FORMAT_V1`].
58    pub format_version: u8,
59    /// Serialized EigenDA dispersal certificate.
60    #[serde(with = "serde_bytes")]
61    pub cert_bytes: Vec<u8>,
62    /// Raw delta blob.
63    #[serde(with = "serde_bytes")]
64    pub blob: Vec<u8>,
65}
66
67/// redb-backed store for delta blobs keyed by sequence number.
68///
69/// The store is policy-free and gap-tolerant: it persists and retrieves deltas
70/// without enforcing continuity.  Staleness and correctness gates live at
71/// consumer boundaries.
72#[derive(Debug)]
73pub struct DeltaStore {
74    db: redb::Database,
75}
76
77impl DeltaStore {
78    /// Open or create the delta store at `path`.
79    pub fn open(path: impl AsRef<Path>) -> Result<Self, BootnodeError> {
80        let path = path.as_ref();
81        if let Some(parent) = path.parent() {
82            std::fs::create_dir_all(parent)?;
83        }
84        let db = redb::Database::create(path)?;
85        let txn = db.begin_write()?;
86        txn.open_table(DELTAS_TABLE)?;
87        txn.commit()?;
88        info!(path = %path.display(), "delta store opened");
89        Ok(Self { db })
90    }
91
92    /// Persist a delta.  Overwrites any existing entry at the same `seq`.
93    ///
94    /// Accepts any sequence number without requiring predecessors — gaps are a
95    /// normal post-recovery state per §S.19.
96    pub fn put(&self, seq: u64, entry: &DeltaEntry) -> Result<(), BootnodeError> {
97        validate_cert_bytes(&entry.cert_bytes, seq)?;
98        let bytes = rmp_serde::to_vec(entry)?;
99        debug!(seq, size_bytes = bytes.len(), "delta put");
100        let txn = self.db.begin_write()?;
101        {
102            let mut table = txn.open_table(DELTAS_TABLE)?;
103            table.insert(seq, bytes.as_slice())?;
104        }
105        txn.commit()?;
106        Ok(())
107    }
108
109    /// Persist multiple deltas in a single write transaction.
110    pub fn put_batch<'a>(&self, entries: impl IntoIterator<Item = (u64, &'a DeltaEntry)>) -> Result<(), BootnodeError> {
111        let serialized: Vec<(u64, Vec<u8>)> = entries
112            .into_iter()
113            .map(|(seq, entry)| {
114                validate_cert_bytes(&entry.cert_bytes, seq)?;
115                let bytes = rmp_serde::to_vec(entry)?;
116                debug!(seq, size_bytes = bytes.len(), "delta put_batch entry");
117                Ok((seq, bytes))
118            })
119            .collect::<Result<_, BootnodeError>>()?;
120
121        let txn = self.db.begin_write()?;
122        {
123            let mut table = txn.open_table(DELTAS_TABLE)?;
124            for (seq, bytes) in &serialized {
125                table.insert(*seq, bytes.as_slice())?;
126            }
127        }
128        txn.commit()?;
129        Ok(())
130    }
131
132    /// Retrieve the delta at exactly `seq`, if any.
133    pub fn get(&self, seq: u64) -> Result<Option<DeltaEntry>, BootnodeError> {
134        let txn = self.db.begin_read()?;
135        let table = txn.open_table(DELTAS_TABLE)?;
136        match table.get(seq)? {
137            Some(guard) => Ok(Some(decode_delta(guard.value())?)),
138            None => Ok(None),
139        }
140    }
141
142    /// Return all deltas with sequence numbers in `[from_seq, to_seq)` that
143    /// exist in the store, in ascending sequence order.
144    ///
145    /// Gaps are signalled by missing sequence numbers in the result — the
146    /// consumer detects them, not the store.
147    ///
148    /// Returns [`BootnodeError::FetchRangeTooLarge`] if `to_seq - from_seq`
149    /// exceeds [`MAX_FETCH_RANGE_LEN`].
150    pub fn fetch_range(&self, from_seq: u64, to_seq: u64) -> Result<Vec<(u64, DeltaEntry)>, BootnodeError> {
151        let requested = to_seq.saturating_sub(from_seq);
152        if requested > MAX_FETCH_RANGE_LEN {
153            warn!(
154                from_seq,
155                to_seq,
156                requested,
157                max = MAX_FETCH_RANGE_LEN,
158                "fetch_range window exceeds bound"
159            );
160            return Err(BootnodeError::FetchRangeTooLarge {
161                requested,
162                max: MAX_FETCH_RANGE_LEN,
163            });
164        }
165        let txn = self.db.begin_read()?;
166        let table = txn.open_table(DELTAS_TABLE)?;
167        let mut results = Vec::new();
168        for entry in table.range(from_seq..to_seq)? {
169            let (key, guard) = entry?;
170            results.push((key.value(), decode_delta(guard.value())?));
171        }
172        Ok(results)
173    }
174}
175
176fn validate_cert_bytes(bytes: &[u8], seq: u64) -> Result<(), BootnodeError> {
177    if bytes.is_empty() {
178        warn!(seq, "rejected empty cert bytes");
179        return Err(BootnodeError::InvalidCertData(format!("empty cert bytes at seq {seq}")));
180    }
181    Ok(())
182}
183
184fn decode_delta(bytes: &[u8]) -> Result<DeltaEntry, BootnodeError> {
185    let entry: DeltaEntry = rmp_serde::from_slice(bytes)?;
186    if entry.format_version != DELTA_FORMAT_V1 {
187        warn!(version = entry.format_version, "unsupported delta format");
188        return Err(BootnodeError::UnsupportedDeltaFormat(entry.format_version));
189    }
190    if entry.cert_bytes.is_empty() {
191        return Err(BootnodeError::InvalidCertData("empty cert bytes".into()));
192    }
193    if entry.cert_bytes.len() > MAX_DELTA_CERT_BYTES {
194        return Err(BootnodeError::DeltaFieldTooLarge {
195            field: "cert_bytes",
196            size: entry.cert_bytes.len(),
197            max: MAX_DELTA_CERT_BYTES,
198        });
199    }
200    if entry.blob.len() > MAX_DELTA_BLOB_BYTES {
201        return Err(BootnodeError::DeltaFieldTooLarge {
202            field: "blob",
203            size: entry.blob.len(),
204            max: MAX_DELTA_BLOB_BYTES,
205        });
206    }
207    Ok(entry)
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    fn dummy_cert_bytes(seq: u64) -> Vec<u8> {
215        let tag = seq.to_le_bytes();
216        let mut bytes = vec![0u8; 48];
217        bytes[..8].copy_from_slice(&tag);
218        bytes
219    }
220
221    fn test_entry(seq: u64) -> DeltaEntry {
222        DeltaEntry {
223            format_version: DELTA_FORMAT_V1,
224            cert_bytes: dummy_cert_bytes(seq),
225            blob: vec![seq as u8; 16],
226        }
227    }
228
229    #[test]
230    fn roundtrip() {
231        let dir = tempfile::tempdir().unwrap();
232        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
233
234        let entry = test_entry(0xAA);
235        store.put(42, &entry).unwrap();
236
237        let got = store.get(42).unwrap().expect("should exist");
238        assert_eq!(got.cert_bytes, entry.cert_bytes);
239        assert_eq!(got.blob, entry.blob);
240    }
241
242    #[test]
243    fn get_missing_returns_none() {
244        let dir = tempfile::tempdir().unwrap();
245        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
246        assert!(store.get(99).unwrap().is_none());
247    }
248
249    #[test]
250    fn fetch_range_ascending_order() {
251        let dir = tempfile::tempdir().unwrap();
252        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
253
254        for seq in 100..120 {
255            store.put(seq, &test_entry(seq)).unwrap();
256        }
257
258        let results = store.fetch_range(105, 115).unwrap();
259        assert_eq!(results.len(), 10);
260        for (i, (seq, entry)) in results.iter().enumerate() {
261            let expected_seq = 105 + i as u64;
262            assert_eq!(*seq, expected_seq);
263            assert_eq!(entry.blob, vec![expected_seq as u8; 16]);
264        }
265    }
266
267    #[test]
268    fn fetch_range_gap_tolerance() {
269        let dir = tempfile::tempdir().unwrap();
270        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
271
272        for seq in [100, 101, 102, 105, 110] {
273            store.put(seq, &test_entry(seq)).unwrap();
274        }
275
276        let results = store.fetch_range(100, 111).unwrap();
277        let seqs: Vec<u64> = results.iter().map(|(s, _)| *s).collect();
278        assert_eq!(seqs, vec![100, 101, 102, 105, 110]);
279    }
280
281    #[test]
282    fn post_injection_gap() {
283        let dir = tempfile::tempdir().unwrap();
284        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
285
286        for seq in [100, 101, 102] {
287            store.put(seq, &test_entry(seq)).unwrap();
288        }
289
290        store.put(201, &test_entry(201)).unwrap();
291
292        let results = store.fetch_range(100, 202).unwrap();
293        let seqs: Vec<u64> = results.iter().map(|(s, _)| *s).collect();
294        assert_eq!(seqs, vec![100, 101, 102, 201]);
295    }
296
297    #[test]
298    fn put_no_predecessor_required() {
299        let dir = tempfile::tempdir().unwrap();
300        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
301
302        let entry = DeltaEntry {
303            format_version: DELTA_FORMAT_V1,
304            cert_bytes: dummy_cert_bytes(0xFF),
305            blob: b"lone delta".to_vec(),
306        };
307        store.put(500, &entry).unwrap();
308        let got = store.get(500).unwrap().expect("should exist");
309        assert_eq!(got.blob, b"lone delta");
310    }
311
312    #[test]
313    fn put_batch_single_transaction() {
314        let dir = tempfile::tempdir().unwrap();
315        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
316
317        let entries: Vec<DeltaEntry> = (0..5)
318            .map(|i| DeltaEntry {
319                format_version: DELTA_FORMAT_V1,
320                cert_bytes: dummy_cert_bytes(i),
321                blob: vec![i as u8; 32],
322            })
323            .collect();
324
325        let batch: Vec<(u64, &DeltaEntry)> = entries.iter().enumerate().map(|(i, e)| (100 + i as u64, e)).collect();
326
327        store.put_batch(batch).unwrap();
328
329        let results = store.fetch_range(100, 105).unwrap();
330        assert_eq!(results.len(), 5);
331        for (i, (seq, entry)) in results.iter().enumerate() {
332            assert_eq!(*seq, 100 + i as u64);
333            assert_eq!(entry.blob, vec![i as u8; 32]);
334        }
335    }
336
337    #[test]
338    fn put_rejects_empty_cert_bytes() {
339        let dir = tempfile::tempdir().unwrap();
340        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
341
342        let entry = DeltaEntry {
343            format_version: DELTA_FORMAT_V1,
344            cert_bytes: vec![],
345            blob: b"payload".to_vec(),
346        };
347        let err = store.put(1, &entry).unwrap_err();
348        assert!(matches!(err, BootnodeError::InvalidCertData(_)));
349    }
350
351    #[test]
352    fn put_batch_rejects_empty_cert_bytes() {
353        let dir = tempfile::tempdir().unwrap();
354        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
355
356        let good = DeltaEntry {
357            format_version: DELTA_FORMAT_V1,
358            cert_bytes: vec![0xAA; 48],
359            blob: b"ok".to_vec(),
360        };
361        let bad = DeltaEntry {
362            format_version: DELTA_FORMAT_V1,
363            cert_bytes: vec![],
364            blob: b"bad".to_vec(),
365        };
366
367        let err = store.put_batch(vec![(1, &good), (2, &bad)]).unwrap_err();
368        assert!(matches!(err, BootnodeError::InvalidCertData(_)));
369
370        assert!(store.get(1).unwrap().is_none());
371    }
372
373    #[test]
374    fn fetch_range_empty_when_from_gte_to() {
375        let dir = tempfile::tempdir().unwrap();
376        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
377
378        store.put(100, &test_entry(1)).unwrap();
379
380        assert!(store.fetch_range(100, 100).unwrap().is_empty());
381        assert!(store.fetch_range(110, 100).unwrap().is_empty());
382        assert!(store.fetch_range(u64::MAX, 0).unwrap().is_empty());
383    }
384
385    #[test]
386    fn fetch_range_rejects_oversized_window() {
387        let dir = tempfile::tempdir().unwrap();
388        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
389
390        let err = store.fetch_range(0, MAX_FETCH_RANGE_LEN + 1).unwrap_err();
391        assert!(matches!(
392            err,
393            BootnodeError::FetchRangeTooLarge { requested, max }
394                if requested == MAX_FETCH_RANGE_LEN + 1 && max == MAX_FETCH_RANGE_LEN
395        ));
396
397        store
398            .fetch_range(0, MAX_FETCH_RANGE_LEN)
399            .expect("at-limit window must succeed");
400
401        let err = store.fetch_range(0, u64::MAX).unwrap_err();
402        assert!(matches!(err, BootnodeError::FetchRangeTooLarge { .. }));
403    }
404
405    #[test]
406    fn unsupported_format_version_rejected() {
407        let dir = tempfile::tempdir().unwrap();
408        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
409
410        let entry = DeltaEntry {
411            format_version: 42,
412            cert_bytes: vec![0xAA; 48],
413            blob: vec![0xBB; 16],
414        };
415        let bytes = rmp_serde::to_vec(&entry).unwrap();
416        let txn = store.db.begin_write().unwrap();
417        {
418            let mut table = txn.open_table(DELTAS_TABLE).unwrap();
419            table.insert(1u64, bytes.as_slice()).unwrap();
420        }
421        txn.commit().unwrap();
422
423        let err = store.get(1).unwrap_err();
424        assert!(matches!(err, BootnodeError::UnsupportedDeltaFormat(42)));
425    }
426
427    #[test]
428    fn oversized_cert_bytes_rejected() {
429        let dir = tempfile::tempdir().unwrap();
430        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
431
432        let entry = DeltaEntry {
433            format_version: DELTA_FORMAT_V1,
434            cert_bytes: vec![0xAA; MAX_DELTA_CERT_BYTES + 1],
435            blob: vec![0xBB; 16],
436        };
437        let bytes = rmp_serde::to_vec(&entry).unwrap();
438        let txn = store.db.begin_write().unwrap();
439        {
440            let mut table = txn.open_table(DELTAS_TABLE).unwrap();
441            table.insert(1u64, bytes.as_slice()).unwrap();
442        }
443        txn.commit().unwrap();
444
445        let err = store.get(1).unwrap_err();
446        assert!(matches!(
447            err,
448            BootnodeError::DeltaFieldTooLarge {
449                field: "cert_bytes",
450                ..
451            }
452        ));
453    }
454
455    #[test]
456    fn oversized_blob_rejected() {
457        let dir = tempfile::tempdir().unwrap();
458        let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
459
460        let entry = DeltaEntry {
461            format_version: DELTA_FORMAT_V1,
462            cert_bytes: vec![0xAA; 48],
463            blob: vec![0xBB; MAX_DELTA_BLOB_BYTES + 1],
464        };
465        let bytes = rmp_serde::to_vec(&entry).unwrap();
466        let txn = store.db.begin_write().unwrap();
467        {
468            let mut table = txn.open_table(DELTAS_TABLE).unwrap();
469            table.insert(1u64, bytes.as_slice()).unwrap();
470        }
471        txn.commit().unwrap();
472
473        let err = store.get(1).unwrap_err();
474        assert!(matches!(err, BootnodeError::DeltaFieldTooLarge { field: "blob", .. }));
475    }
476}