use std::{io::Write, path::Path};
use alloy_primitives::B256;
use redb::{ReadableDatabase, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};
use crate::error::BootnodeError;
const HEADERS_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("snapshot_headers");
const BODIES_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("snapshot_bodies");
const SNAPSHOT_FORMAT_V1: u8 = 1;
pub const MAX_SNAPSHOT_BODY_BYTES: usize = 256 * 1024 * 1024;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SnapshotHeader {
pub format_version: u8,
pub sequence_no: u64,
pub state_root: B256,
pub sealed_at_ts: u64,
pub body_len: u64,
}
#[derive(Debug)]
pub struct SnapshotStore {
db: redb::Database,
}
impl SnapshotStore {
pub fn open(path: impl AsRef<Path>) -> Result<Self, BootnodeError> {
let path = path.as_ref();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let db = redb::Database::create(path)?;
let txn = db.begin_write()?;
txn.open_table(HEADERS_TABLE)?;
txn.open_table(BODIES_TABLE)?;
txn.commit()?;
info!(path = %path.display(), "snapshot store opened");
Ok(Self { db })
}
pub fn put(&self, header: &SnapshotHeader, body: &[u8]) -> Result<(), BootnodeError> {
if body.len() > MAX_SNAPSHOT_BODY_BYTES {
return Err(BootnodeError::SnapshotBodyTooLarge {
size: body.len(),
max: MAX_SNAPSHOT_BODY_BYTES,
});
}
if header.body_len != body.len() as u64 {
return Err(BootnodeError::SnapshotBodyLenMismatch {
header_body_len: header.body_len,
actual: body.len() as u64,
});
}
let header_bytes = rmp_serde::to_vec(header)?;
debug!(
sequence_no = header.sequence_no,
header_bytes = header_bytes.len(),
body_bytes = body.len(),
"snapshot put"
);
let txn = self.db.begin_write()?;
{
let mut headers = txn.open_table(HEADERS_TABLE)?;
let mut bodies = txn.open_table(BODIES_TABLE)?;
headers.insert(header.sequence_no, header_bytes.as_slice())?;
bodies.insert(header.sequence_no, body)?;
}
txn.commit()?;
Ok(())
}
pub fn get(&self, sequence_no: u64) -> Result<Option<SnapshotHeader>, BootnodeError> {
let txn = self.db.begin_read()?;
let table = txn.open_table(HEADERS_TABLE)?;
match table.get(sequence_no)? {
Some(guard) => Ok(Some(decode_header(guard.value())?)),
None => Ok(None),
}
}
pub fn latest_at_most(&self, up_to: u64) -> Result<Option<SnapshotHeader>, BootnodeError> {
let txn = self.db.begin_read()?;
let table = txn.open_table(HEADERS_TABLE)?;
let mut range = table.range(..=up_to)?;
match range.next_back() {
Some(Ok((_, guard))) => Ok(Some(decode_header(guard.value())?)),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}
pub fn read_body(&self, sequence_no: u64, writer: &mut impl Write) -> Result<Option<u64>, BootnodeError> {
let txn = self.db.begin_read()?;
let headers = txn.open_table(HEADERS_TABLE)?;
let header = match headers.get(sequence_no)? {
Some(guard) => decode_header(guard.value())?,
None => return Ok(None),
};
let bodies = txn.open_table(BODIES_TABLE)?;
let Some(guard) = bodies.get(sequence_no)? else {
warn!(
sequence_no,
header_body_len = header.body_len,
"orphan snapshot header — body missing on disk"
);
return Err(BootnodeError::SnapshotBodyLenMismatch {
header_body_len: header.body_len,
actual: 0,
});
};
let body = guard.value();
let actual = body.len() as u64;
if actual != header.body_len {
warn!(
sequence_no,
header_body_len = header.body_len,
actual,
"snapshot body length drift between header and bodies tables"
);
return Err(BootnodeError::SnapshotBodyLenMismatch {
header_body_len: header.body_len,
actual,
});
}
writer.write_all(body)?;
Ok(Some(actual))
}
}
fn decode_header(bytes: &[u8]) -> Result<SnapshotHeader, BootnodeError> {
let hdr: SnapshotHeader = rmp_serde::from_slice(bytes)?;
if hdr.format_version != SNAPSHOT_FORMAT_V1 {
warn!(version = hdr.format_version, "unsupported snapshot format");
return Err(BootnodeError::UnsupportedSnapshotFormat(hdr.format_version));
}
Ok(hdr)
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_BODY: &[u8] = &[0xAB; 64];
fn test_header(sequence_no: u64, sealed_at_ts: u64) -> SnapshotHeader {
SnapshotHeader {
format_version: SNAPSHOT_FORMAT_V1,
sequence_no,
state_root: B256::from([sequence_no as u8; 32]),
sealed_at_ts,
body_len: TEST_BODY.len() as u64,
}
}
#[test]
fn roundtrip_header() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
let hdr = test_header(42, 1_700_000_000);
store.put(&hdr, TEST_BODY).unwrap();
let got = store.get(42).unwrap().expect("should exist");
assert_eq!(got.format_version, SNAPSHOT_FORMAT_V1);
assert_eq!(got.sequence_no, hdr.sequence_no);
assert_eq!(got.state_root, hdr.state_root);
assert_eq!(got.sealed_at_ts, hdr.sealed_at_ts);
assert_eq!(got.body_len, TEST_BODY.len() as u64);
}
#[test]
fn read_body_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
let hdr = test_header(42, 1_700_000_000);
store.put(&hdr, TEST_BODY).unwrap();
let mut buf = Vec::new();
let n = store.read_body(42, &mut buf).unwrap().expect("should exist");
assert_eq!(n, TEST_BODY.len() as u64);
assert_eq!(buf, TEST_BODY);
}
#[test]
fn read_body_missing_returns_none() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
assert!(store.read_body(99, &mut Vec::new()).unwrap().is_none());
}
#[test]
fn get_missing_returns_none() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
assert!(store.get(99).unwrap().is_none());
}
#[test]
fn latest_at_most_finds_right_snap() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
for seq in [10, 20, 30, 40] {
store.put(&test_header(seq, 1_000_000 + seq), TEST_BODY).unwrap();
}
let hdr = store.latest_at_most(25).unwrap().expect("should find 20");
assert_eq!(hdr.sequence_no, 20);
let hdr = store.latest_at_most(40).unwrap().expect("should find 40");
assert_eq!(hdr.sequence_no, 40);
assert!(store.latest_at_most(5).unwrap().is_none());
}
#[test]
fn sealed_at_ts_preserved_including_epoch_zero() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
store.put(&test_header(1, 0), TEST_BODY).unwrap();
let hdr = store.get(1).unwrap().expect("should exist");
assert_eq!(hdr.sealed_at_ts, 0);
let hdr = store.latest_at_most(100).unwrap().expect("should find it");
assert_eq!(hdr.sealed_at_ts, 0);
}
#[test]
fn sequence_no_zero_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
store.put(&test_header(0, 0), TEST_BODY).unwrap();
let hdr = store.get(0).unwrap().expect("seq 0 should exist");
assert_eq!(hdr.sequence_no, 0);
let hdr = store
.latest_at_most(0)
.unwrap()
.expect("latest_at_most(0) should return seq 0");
assert_eq!(hdr.sequence_no, 0);
}
#[test]
fn unsupported_format_version_rejected() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
let bad_hdr = SnapshotHeader {
format_version: 99,
sequence_no: 1,
state_root: B256::from([1u8; 32]),
sealed_at_ts: 1_000,
body_len: 64,
};
let bytes = rmp_serde::to_vec(&bad_hdr).unwrap();
let txn = store.db.begin_write().unwrap();
{
let mut table = txn.open_table(HEADERS_TABLE).unwrap();
table.insert(1u64, bytes.as_slice()).unwrap();
}
txn.commit().unwrap();
let err = store.get(1).unwrap_err();
assert!(matches!(err, BootnodeError::UnsupportedSnapshotFormat(99)));
}
#[test]
fn read_body_rejects_cross_table_body_len_drift() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
let mut hdr = test_header(7, 1_000);
hdr.body_len = 64;
let header_bytes = rmp_serde::to_vec(&hdr).unwrap();
let txn = store.db.begin_write().unwrap();
{
let mut headers = txn.open_table(HEADERS_TABLE).unwrap();
headers.insert(7u64, header_bytes.as_slice()).unwrap();
let mut bodies = txn.open_table(BODIES_TABLE).unwrap();
let short_body = vec![0xCC; 32];
bodies.insert(7u64, short_body.as_slice()).unwrap();
}
txn.commit().unwrap();
let mut buf = Vec::new();
let err = store.read_body(7, &mut buf).unwrap_err();
assert!(matches!(
err,
BootnodeError::SnapshotBodyLenMismatch {
header_body_len: 64,
actual: 32
}
));
assert!(buf.is_empty(), "writer must not see partial bytes on mismatch");
}
#[test]
fn read_body_rejects_orphan_header() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
let hdr = test_header(9, 1_000);
let header_bytes = rmp_serde::to_vec(&hdr).unwrap();
let txn = store.db.begin_write().unwrap();
{
let mut headers = txn.open_table(HEADERS_TABLE).unwrap();
headers.insert(9u64, header_bytes.as_slice()).unwrap();
}
txn.commit().unwrap();
let err = store.read_body(9, &mut Vec::new()).unwrap_err();
assert!(matches!(err, BootnodeError::SnapshotBodyLenMismatch { actual: 0, .. }));
}
#[test]
fn put_rejects_body_len_mismatch() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
let mut hdr = test_header(1, 1_000);
hdr.body_len = 999;
let err = store.put(&hdr, TEST_BODY).unwrap_err();
assert!(matches!(err, BootnodeError::SnapshotBodyLenMismatch { .. }));
assert!(store.get(1).unwrap().is_none());
}
#[test]
fn put_rejects_oversized_body() {
let dir = tempfile::tempdir().unwrap();
let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
let big_body = vec![0xCC; MAX_SNAPSHOT_BODY_BYTES + 1];
let hdr = SnapshotHeader {
format_version: SNAPSHOT_FORMAT_V1,
sequence_no: 1,
state_root: B256::ZERO,
sealed_at_ts: 1_000,
body_len: big_body.len() as u64,
};
let err = store.put(&hdr, &big_body).unwrap_err();
assert!(matches!(err, BootnodeError::SnapshotBodyTooLarge { .. }));
assert!(store.get(1).unwrap().is_none());
}
}