1use std::path::Path;
16
17use redb::{ReadableDatabase, ReadableTable, TableDefinition};
18use serde::{Deserialize, Serialize};
19use tracing::{debug, info, warn};
20
21use crate::error::BootnodeError;
22
23const DELTAS_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("deltas");
24
25const DELTA_FORMAT_V1: u8 = 1;
26
27pub const MAX_DELTA_CERT_BYTES: usize = 4 * 1024 * 1024;
32
33pub const MAX_DELTA_BLOB_BYTES: usize = 64 * 1024 * 1024;
38
39pub const MAX_FETCH_RANGE_LEN: u64 = 1024;
53
54#[derive(Debug, Serialize, Deserialize)]
56pub struct DeltaEntry {
57 pub format_version: u8,
59 #[serde(with = "serde_bytes")]
61 pub cert_bytes: Vec<u8>,
62 #[serde(with = "serde_bytes")]
64 pub blob: Vec<u8>,
65}
66
67#[derive(Debug)]
73pub struct DeltaStore {
74 db: redb::Database,
75}
76
77impl DeltaStore {
78 pub fn open(path: impl AsRef<Path>) -> Result<Self, BootnodeError> {
80 let path = path.as_ref();
81 if let Some(parent) = path.parent() {
82 std::fs::create_dir_all(parent)?;
83 }
84 let db = redb::Database::create(path)?;
85 let txn = db.begin_write()?;
86 txn.open_table(DELTAS_TABLE)?;
87 txn.commit()?;
88 info!(path = %path.display(), "delta store opened");
89 Ok(Self { db })
90 }
91
92 pub fn put(&self, seq: u64, entry: &DeltaEntry) -> Result<(), BootnodeError> {
97 validate_cert_bytes(&entry.cert_bytes, seq)?;
98 let bytes = rmp_serde::to_vec(entry)?;
99 debug!(seq, size_bytes = bytes.len(), "delta put");
100 let txn = self.db.begin_write()?;
101 {
102 let mut table = txn.open_table(DELTAS_TABLE)?;
103 table.insert(seq, bytes.as_slice())?;
104 }
105 txn.commit()?;
106 Ok(())
107 }
108
109 pub fn put_batch<'a>(&self, entries: impl IntoIterator<Item = (u64, &'a DeltaEntry)>) -> Result<(), BootnodeError> {
111 let serialized: Vec<(u64, Vec<u8>)> = entries
112 .into_iter()
113 .map(|(seq, entry)| {
114 validate_cert_bytes(&entry.cert_bytes, seq)?;
115 let bytes = rmp_serde::to_vec(entry)?;
116 debug!(seq, size_bytes = bytes.len(), "delta put_batch entry");
117 Ok((seq, bytes))
118 })
119 .collect::<Result<_, BootnodeError>>()?;
120
121 let txn = self.db.begin_write()?;
122 {
123 let mut table = txn.open_table(DELTAS_TABLE)?;
124 for (seq, bytes) in &serialized {
125 table.insert(*seq, bytes.as_slice())?;
126 }
127 }
128 txn.commit()?;
129 Ok(())
130 }
131
132 pub fn get(&self, seq: u64) -> Result<Option<DeltaEntry>, BootnodeError> {
134 let txn = self.db.begin_read()?;
135 let table = txn.open_table(DELTAS_TABLE)?;
136 match table.get(seq)? {
137 Some(guard) => Ok(Some(decode_delta(guard.value())?)),
138 None => Ok(None),
139 }
140 }
141
142 pub fn fetch_range(&self, from_seq: u64, to_seq: u64) -> Result<Vec<(u64, DeltaEntry)>, BootnodeError> {
151 let requested = to_seq.saturating_sub(from_seq);
152 if requested > MAX_FETCH_RANGE_LEN {
153 warn!(
154 from_seq,
155 to_seq,
156 requested,
157 max = MAX_FETCH_RANGE_LEN,
158 "fetch_range window exceeds bound"
159 );
160 return Err(BootnodeError::FetchRangeTooLarge {
161 requested,
162 max: MAX_FETCH_RANGE_LEN,
163 });
164 }
165 let txn = self.db.begin_read()?;
166 let table = txn.open_table(DELTAS_TABLE)?;
167 let mut results = Vec::new();
168 for entry in table.range(from_seq..to_seq)? {
169 let (key, guard) = entry?;
170 results.push((key.value(), decode_delta(guard.value())?));
171 }
172 Ok(results)
173 }
174}
175
176fn validate_cert_bytes(bytes: &[u8], seq: u64) -> Result<(), BootnodeError> {
177 if bytes.is_empty() {
178 warn!(seq, "rejected empty cert bytes");
179 return Err(BootnodeError::InvalidCertData(format!("empty cert bytes at seq {seq}")));
180 }
181 Ok(())
182}
183
184fn decode_delta(bytes: &[u8]) -> Result<DeltaEntry, BootnodeError> {
185 let entry: DeltaEntry = rmp_serde::from_slice(bytes)?;
186 if entry.format_version != DELTA_FORMAT_V1 {
187 warn!(version = entry.format_version, "unsupported delta format");
188 return Err(BootnodeError::UnsupportedDeltaFormat(entry.format_version));
189 }
190 if entry.cert_bytes.is_empty() {
191 return Err(BootnodeError::InvalidCertData("empty cert bytes".into()));
192 }
193 if entry.cert_bytes.len() > MAX_DELTA_CERT_BYTES {
194 return Err(BootnodeError::DeltaFieldTooLarge {
195 field: "cert_bytes",
196 size: entry.cert_bytes.len(),
197 max: MAX_DELTA_CERT_BYTES,
198 });
199 }
200 if entry.blob.len() > MAX_DELTA_BLOB_BYTES {
201 return Err(BootnodeError::DeltaFieldTooLarge {
202 field: "blob",
203 size: entry.blob.len(),
204 max: MAX_DELTA_BLOB_BYTES,
205 });
206 }
207 Ok(entry)
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 fn dummy_cert_bytes(seq: u64) -> Vec<u8> {
215 let tag = seq.to_le_bytes();
216 let mut bytes = vec![0u8; 48];
217 bytes[..8].copy_from_slice(&tag);
218 bytes
219 }
220
221 fn test_entry(seq: u64) -> DeltaEntry {
222 DeltaEntry {
223 format_version: DELTA_FORMAT_V1,
224 cert_bytes: dummy_cert_bytes(seq),
225 blob: vec![seq as u8; 16],
226 }
227 }
228
229 #[test]
230 fn roundtrip() {
231 let dir = tempfile::tempdir().unwrap();
232 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
233
234 let entry = test_entry(0xAA);
235 store.put(42, &entry).unwrap();
236
237 let got = store.get(42).unwrap().expect("should exist");
238 assert_eq!(got.cert_bytes, entry.cert_bytes);
239 assert_eq!(got.blob, entry.blob);
240 }
241
242 #[test]
243 fn get_missing_returns_none() {
244 let dir = tempfile::tempdir().unwrap();
245 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
246 assert!(store.get(99).unwrap().is_none());
247 }
248
249 #[test]
250 fn fetch_range_ascending_order() {
251 let dir = tempfile::tempdir().unwrap();
252 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
253
254 for seq in 100..120 {
255 store.put(seq, &test_entry(seq)).unwrap();
256 }
257
258 let results = store.fetch_range(105, 115).unwrap();
259 assert_eq!(results.len(), 10);
260 for (i, (seq, entry)) in results.iter().enumerate() {
261 let expected_seq = 105 + i as u64;
262 assert_eq!(*seq, expected_seq);
263 assert_eq!(entry.blob, vec![expected_seq as u8; 16]);
264 }
265 }
266
267 #[test]
268 fn fetch_range_gap_tolerance() {
269 let dir = tempfile::tempdir().unwrap();
270 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
271
272 for seq in [100, 101, 102, 105, 110] {
273 store.put(seq, &test_entry(seq)).unwrap();
274 }
275
276 let results = store.fetch_range(100, 111).unwrap();
277 let seqs: Vec<u64> = results.iter().map(|(s, _)| *s).collect();
278 assert_eq!(seqs, vec![100, 101, 102, 105, 110]);
279 }
280
281 #[test]
282 fn post_injection_gap() {
283 let dir = tempfile::tempdir().unwrap();
284 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
285
286 for seq in [100, 101, 102] {
287 store.put(seq, &test_entry(seq)).unwrap();
288 }
289
290 store.put(201, &test_entry(201)).unwrap();
291
292 let results = store.fetch_range(100, 202).unwrap();
293 let seqs: Vec<u64> = results.iter().map(|(s, _)| *s).collect();
294 assert_eq!(seqs, vec![100, 101, 102, 201]);
295 }
296
297 #[test]
298 fn put_no_predecessor_required() {
299 let dir = tempfile::tempdir().unwrap();
300 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
301
302 let entry = DeltaEntry {
303 format_version: DELTA_FORMAT_V1,
304 cert_bytes: dummy_cert_bytes(0xFF),
305 blob: b"lone delta".to_vec(),
306 };
307 store.put(500, &entry).unwrap();
308 let got = store.get(500).unwrap().expect("should exist");
309 assert_eq!(got.blob, b"lone delta");
310 }
311
312 #[test]
313 fn put_batch_single_transaction() {
314 let dir = tempfile::tempdir().unwrap();
315 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
316
317 let entries: Vec<DeltaEntry> = (0..5)
318 .map(|i| DeltaEntry {
319 format_version: DELTA_FORMAT_V1,
320 cert_bytes: dummy_cert_bytes(i),
321 blob: vec![i as u8; 32],
322 })
323 .collect();
324
325 let batch: Vec<(u64, &DeltaEntry)> = entries.iter().enumerate().map(|(i, e)| (100 + i as u64, e)).collect();
326
327 store.put_batch(batch).unwrap();
328
329 let results = store.fetch_range(100, 105).unwrap();
330 assert_eq!(results.len(), 5);
331 for (i, (seq, entry)) in results.iter().enumerate() {
332 assert_eq!(*seq, 100 + i as u64);
333 assert_eq!(entry.blob, vec![i as u8; 32]);
334 }
335 }
336
337 #[test]
338 fn put_rejects_empty_cert_bytes() {
339 let dir = tempfile::tempdir().unwrap();
340 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
341
342 let entry = DeltaEntry {
343 format_version: DELTA_FORMAT_V1,
344 cert_bytes: vec![],
345 blob: b"payload".to_vec(),
346 };
347 let err = store.put(1, &entry).unwrap_err();
348 assert!(matches!(err, BootnodeError::InvalidCertData(_)));
349 }
350
351 #[test]
352 fn put_batch_rejects_empty_cert_bytes() {
353 let dir = tempfile::tempdir().unwrap();
354 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
355
356 let good = DeltaEntry {
357 format_version: DELTA_FORMAT_V1,
358 cert_bytes: vec![0xAA; 48],
359 blob: b"ok".to_vec(),
360 };
361 let bad = DeltaEntry {
362 format_version: DELTA_FORMAT_V1,
363 cert_bytes: vec![],
364 blob: b"bad".to_vec(),
365 };
366
367 let err = store.put_batch(vec![(1, &good), (2, &bad)]).unwrap_err();
368 assert!(matches!(err, BootnodeError::InvalidCertData(_)));
369
370 assert!(store.get(1).unwrap().is_none());
371 }
372
373 #[test]
374 fn fetch_range_empty_when_from_gte_to() {
375 let dir = tempfile::tempdir().unwrap();
376 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
377
378 store.put(100, &test_entry(1)).unwrap();
379
380 assert!(store.fetch_range(100, 100).unwrap().is_empty());
381 assert!(store.fetch_range(110, 100).unwrap().is_empty());
382 assert!(store.fetch_range(u64::MAX, 0).unwrap().is_empty());
383 }
384
385 #[test]
386 fn fetch_range_rejects_oversized_window() {
387 let dir = tempfile::tempdir().unwrap();
388 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
389
390 let err = store.fetch_range(0, MAX_FETCH_RANGE_LEN + 1).unwrap_err();
391 assert!(matches!(
392 err,
393 BootnodeError::FetchRangeTooLarge { requested, max }
394 if requested == MAX_FETCH_RANGE_LEN + 1 && max == MAX_FETCH_RANGE_LEN
395 ));
396
397 store
398 .fetch_range(0, MAX_FETCH_RANGE_LEN)
399 .expect("at-limit window must succeed");
400
401 let err = store.fetch_range(0, u64::MAX).unwrap_err();
402 assert!(matches!(err, BootnodeError::FetchRangeTooLarge { .. }));
403 }
404
405 #[test]
406 fn unsupported_format_version_rejected() {
407 let dir = tempfile::tempdir().unwrap();
408 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
409
410 let entry = DeltaEntry {
411 format_version: 42,
412 cert_bytes: vec![0xAA; 48],
413 blob: vec![0xBB; 16],
414 };
415 let bytes = rmp_serde::to_vec(&entry).unwrap();
416 let txn = store.db.begin_write().unwrap();
417 {
418 let mut table = txn.open_table(DELTAS_TABLE).unwrap();
419 table.insert(1u64, bytes.as_slice()).unwrap();
420 }
421 txn.commit().unwrap();
422
423 let err = store.get(1).unwrap_err();
424 assert!(matches!(err, BootnodeError::UnsupportedDeltaFormat(42)));
425 }
426
427 #[test]
428 fn oversized_cert_bytes_rejected() {
429 let dir = tempfile::tempdir().unwrap();
430 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
431
432 let entry = DeltaEntry {
433 format_version: DELTA_FORMAT_V1,
434 cert_bytes: vec![0xAA; MAX_DELTA_CERT_BYTES + 1],
435 blob: vec![0xBB; 16],
436 };
437 let bytes = rmp_serde::to_vec(&entry).unwrap();
438 let txn = store.db.begin_write().unwrap();
439 {
440 let mut table = txn.open_table(DELTAS_TABLE).unwrap();
441 table.insert(1u64, bytes.as_slice()).unwrap();
442 }
443 txn.commit().unwrap();
444
445 let err = store.get(1).unwrap_err();
446 assert!(matches!(
447 err,
448 BootnodeError::DeltaFieldTooLarge {
449 field: "cert_bytes",
450 ..
451 }
452 ));
453 }
454
455 #[test]
456 fn oversized_blob_rejected() {
457 let dir = tempfile::tempdir().unwrap();
458 let store = DeltaStore::open(dir.path().join("delta.redb")).unwrap();
459
460 let entry = DeltaEntry {
461 format_version: DELTA_FORMAT_V1,
462 cert_bytes: vec![0xAA; 48],
463 blob: vec![0xBB; MAX_DELTA_BLOB_BYTES + 1],
464 };
465 let bytes = rmp_serde::to_vec(&entry).unwrap();
466 let txn = store.db.begin_write().unwrap();
467 {
468 let mut table = txn.open_table(DELTAS_TABLE).unwrap();
469 table.insert(1u64, bytes.as_slice()).unwrap();
470 }
471 txn.commit().unwrap();
472
473 let err = store.get(1).unwrap_err();
474 assert!(matches!(err, BootnodeError::DeltaFieldTooLarge { field: "blob", .. }));
475 }
476}