use std::path::Path;
use redb::{ReadableDatabase, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};
use crate::error::BootnodeError;
const DELTAS_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("deltas");
const DELTA_FORMAT_V1: u8 = 1;
pub const MAX_DELTA_CERT_BYTES: usize = 4 * 1024 * 1024;
pub const MAX_DELTA_BLOB_BYTES: usize = 64 * 1024 * 1024;
pub const MAX_FETCH_RANGE_LEN: u64 = 1024;
#[derive(Debug, Serialize, Deserialize)]
pub struct DeltaEntry {
pub format_version: u8,
#[serde(with = "serde_bytes")]
pub cert_bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
pub blob: Vec<u8>,
}
#[derive(Debug)]
pub struct DeltaStore {
db: redb::Database,
}
impl DeltaStore {
pub fn open(path: impl AsRef<Path>) -> Result<Self, BootnodeError> {
let path = path.as_ref();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let db = redb::Database::create(path)?;
let txn = db.begin_write()?;
txn.open_table(DELTAS_TABLE)?;
txn.commit()?;
info!(path = %path.display(), "delta store opened");
Ok(Self { db })
}
pub fn put(&self, seq: u64, entry: &DeltaEntry) -> Result<(), BootnodeError> {
validate_cert_bytes(&entry.cert_bytes, seq)?;
let bytes = rmp_serde::to_vec(entry)?;
debug!(seq, size_bytes = bytes.len(), "delta put");
let txn = self.db.begin_write()?;
{
let mut table = txn.open_table(DELTAS_TABLE)?;
table.insert(seq, bytes.as_slice())?;
}
txn.commit()?;
Ok(())
}
pub fn put_batch<'a>(&self, entries: impl IntoIterator<Item = (u64, &'a DeltaEntry)>) -> Result<(), BootnodeError> {
let serialized: Vec<(u64, Vec<u8>)> = entries
.into_iter()
.map(|(seq, entry)| {
validate_cert_bytes(&entry.cert_bytes, seq)?;
let bytes = rmp_serde::to_vec(entry)?;
debug!(seq, size_bytes = bytes.len(), "delta put_batch entry");
Ok((seq, bytes))
})
.collect::<Result<_, BootnodeError>>()?;
let txn = self.db.begin_write()?;
{
let mut table = txn.open_table(DELTAS_TABLE)?;
for (seq, bytes) in &serialized {
table.insert(*seq, bytes.as_slice())?;
}
}
txn.commit()?;
Ok(())
}
pub fn get(&self, seq: u64) -> Result<Option<DeltaEntry>, BootnodeError> {
let txn = self.db.begin_read()?;
let table = txn.open_table(DELTAS_TABLE)?;
match table.get(seq)? {
Some(guard) => Ok(Some(decode_delta(guard.value())?)),
None => Ok(None),
}
}
pub fn fetch_range(&self, from_seq: u64, to_seq: u64) -> Result<Vec<(u64, DeltaEntry)>, BootnodeError> {
let requested = to_seq.saturating_sub(from_seq);
if requested > MAX_FETCH_RANGE_LEN {
warn!(
from_seq,
to_seq,
requested,
max = MAX_FETCH_RANGE_LEN,
"fetch_range window exceeds bound"
);
return Err(BootnodeError::FetchRangeTooLarge {
requested,
max: MAX_FETCH_RANGE_LEN,
});
}
let txn = self.db.begin_read()?;
let table = txn.open_table(DELTAS_TABLE)?;
let mut results = Vec::new();
for entry in table.range(from_seq..to_seq)? {
let (key, guard) = entry?;
results.push((key.value(), decode_delta(guard.value())?));
}
Ok(results)
}
}
fn validate_cert_bytes(bytes: &[u8], seq: u64) -> Result<(), BootnodeError> {
if bytes.is_empty() {
warn!(seq, "rejected empty cert bytes");
return Err(BootnodeError::InvalidCertData(format!("empty cert bytes at seq {seq}")));
}
Ok(())
}
fn decode_delta(bytes: &[u8]) -> Result<DeltaEntry, BootnodeError> {
let entry: DeltaEntry = rmp_serde::from_slice(bytes)?;
if entry.format_version != DELTA_FORMAT_V1 {
warn!(version = entry.format_version, "unsupported delta format");
return Err(BootnodeError::UnsupportedDeltaFormat(entry.format_version));
}
if entry.cert_bytes.is_empty() {
return Err(BootnodeError::InvalidCertData("empty cert bytes".into()));
}
if entry.cert_bytes.len() > MAX_DELTA_CERT_BYTES {
return Err(BootnodeError::DeltaFieldTooLarge {
field: "cert_bytes",
size: entry.cert_bytes.len(),
max: MAX_DELTA_CERT_BYTES,
});
}
if entry.blob.len() > MAX_DELTA_BLOB_BYTES {
return Err(BootnodeError::DeltaFieldTooLarge {
field: "blob",
size: entry.blob.len(),
max: MAX_DELTA_BLOB_BYTES,
});
}
Ok(entry)
}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_cert_bytes(seq: u64) -> Vec<u8> {
let tag = seq.to_le_bytes();
let mut bytes = vec![0u8; 48];
bytes[..8].copy_from_slice(&tag);
bytes
}
fn test_entry(seq: u64) -> DeltaEntry {
DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: dummy_cert_bytes(seq),
blob: vec![seq as u8; 16],
}
}
#[test]
fn roundtrip() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let entry = test_entry(0xAA);
store.put(42, &entry).unwrap();
let got = store.get(42).unwrap().expect("should exist");
assert_eq!(got.cert_bytes, entry.cert_bytes);
assert_eq!(got.blob, entry.blob);
}
#[test]
fn get_missing_returns_none() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
assert!(store.get(99).unwrap().is_none());
}
#[test]
fn fetch_range_ascending_order() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
for seq in 100..120 {
store.put(seq, &test_entry(seq)).unwrap();
}
let results = store.fetch_range(105, 115).unwrap();
assert_eq!(results.len(), 10);
for (i, (seq, entry)) in results.iter().enumerate() {
let expected_seq = 105 + i as u64;
assert_eq!(*seq, expected_seq);
assert_eq!(entry.blob, vec![expected_seq as u8; 16]);
}
}
#[test]
fn fetch_range_gap_tolerance() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
for seq in [100, 101, 102, 105, 110] {
store.put(seq, &test_entry(seq)).unwrap();
}
let results = store.fetch_range(100, 111).unwrap();
let seqs: Vec<u64> = results.iter().map(|(s, _)| *s).collect();
assert_eq!(seqs, vec![100, 101, 102, 105, 110]);
}
#[test]
fn post_injection_gap() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
for seq in [100, 101, 102] {
store.put(seq, &test_entry(seq)).unwrap();
}
store.put(201, &test_entry(201)).unwrap();
let results = store.fetch_range(100, 202).unwrap();
let seqs: Vec<u64> = results.iter().map(|(s, _)| *s).collect();
assert_eq!(seqs, vec![100, 101, 102, 201]);
}
#[test]
fn put_no_predecessor_required() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let entry = DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: dummy_cert_bytes(0xFF),
blob: b"lone delta".to_vec(),
};
store.put(500, &entry).unwrap();
let got = store.get(500).unwrap().expect("should exist");
assert_eq!(got.blob, b"lone delta");
}
#[test]
fn put_batch_single_transaction() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let entries: Vec<DeltaEntry> = (0..5)
.map(|i| DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: dummy_cert_bytes(i),
blob: vec![i as u8; 32],
})
.collect();
let batch: Vec<(u64, &DeltaEntry)> = entries.iter().enumerate().map(|(i, e)| (100 + i as u64, e)).collect();
store.put_batch(batch).unwrap();
let results = store.fetch_range(100, 105).unwrap();
assert_eq!(results.len(), 5);
for (i, (seq, entry)) in results.iter().enumerate() {
assert_eq!(*seq, 100 + i as u64);
assert_eq!(entry.blob, vec![i as u8; 32]);
}
}
#[test]
fn put_rejects_empty_cert_bytes() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let entry = DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: vec![],
blob: b"payload".to_vec(),
};
let err = store.put(1, &entry).unwrap_err();
assert!(matches!(err, BootnodeError::InvalidCertData(_)));
}
#[test]
fn put_batch_rejects_empty_cert_bytes() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let good = DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: vec![0xAA; 48],
blob: b"ok".to_vec(),
};
let bad = DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: vec![],
blob: b"bad".to_vec(),
};
let err = store.put_batch(vec![(1, &good), (2, &bad)]).unwrap_err();
assert!(matches!(err, BootnodeError::InvalidCertData(_)));
assert!(store.get(1).unwrap().is_none());
}
#[test]
fn fetch_range_empty_when_from_gte_to() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
store.put(100, &test_entry(1)).unwrap();
assert!(store.fetch_range(100, 100).unwrap().is_empty());
assert!(store.fetch_range(110, 100).unwrap().is_empty());
assert!(store.fetch_range(u64::MAX, 0).unwrap().is_empty());
}
#[test]
fn fetch_range_rejects_oversized_window() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let err = store.fetch_range(0, MAX_FETCH_RANGE_LEN + 1).unwrap_err();
assert!(matches!(
err,
BootnodeError::FetchRangeTooLarge { requested, max }
if requested == MAX_FETCH_RANGE_LEN + 1 && max == MAX_FETCH_RANGE_LEN
));
store
.fetch_range(0, MAX_FETCH_RANGE_LEN)
.expect("at-limit window must succeed");
let err = store.fetch_range(0, u64::MAX).unwrap_err();
assert!(matches!(err, BootnodeError::FetchRangeTooLarge { .. }));
}
#[test]
fn unsupported_format_version_rejected() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let entry = DeltaEntry {
format_version: 42,
cert_bytes: vec![0xAA; 48],
blob: vec![0xBB; 16],
};
let bytes = rmp_serde::to_vec(&entry).unwrap();
let txn = store.db.begin_write().unwrap();
{
let mut table = txn.open_table(DELTAS_TABLE).unwrap();
table.insert(1u64, bytes.as_slice()).unwrap();
}
txn.commit().unwrap();
let err = store.get(1).unwrap_err();
assert!(matches!(err, BootnodeError::UnsupportedDeltaFormat(42)));
}
#[test]
fn oversized_cert_bytes_rejected() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let entry = DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: vec![0xAA; MAX_DELTA_CERT_BYTES + 1],
blob: vec![0xBB; 16],
};
let bytes = rmp_serde::to_vec(&entry).unwrap();
let txn = store.db.begin_write().unwrap();
{
let mut table = txn.open_table(DELTAS_TABLE).unwrap();
table.insert(1u64, bytes.as_slice()).unwrap();
}
txn.commit().unwrap();
let err = store.get(1).unwrap_err();
assert!(matches!(
err,
BootnodeError::DeltaFieldTooLarge {
field: "cert_bytes",
..
}
));
}
#[test]
fn oversized_blob_rejected() {
let dir = tempfile::tempdir().unwrap();
let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
let entry = DeltaEntry {
format_version: DELTA_FORMAT_V1,
cert_bytes: vec![0xAA; 48],
blob: vec![0xBB; MAX_DELTA_BLOB_BYTES + 1],
};
let bytes = rmp_serde::to_vec(&entry).unwrap();
let txn = store.db.begin_write().unwrap();
{
let mut table = txn.open_table(DELTAS_TABLE).unwrap();
table.insert(1u64, bytes.as_slice()).unwrap();
}
txn.commit().unwrap();
let err = store.get(1).unwrap_err();
assert!(matches!(err, BootnodeError::DeltaFieldTooLarge { field: "blob", .. }));
}
}