1use std::{io::Write, path::Path};
18
19use alloy_primitives::B256;
20use redb::{ReadableDatabase, ReadableTable, TableDefinition};
21use serde::{Deserialize, Serialize};
22use tracing::{debug, info, warn};
23
24use crate::error::BootnodeError;
25
26const HEADERS_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("snapshot_headers");
27const BODIES_TABLE: TableDefinition<'_, u64, &[u8]> = TableDefinition::new("snapshot_bodies");
28
29const SNAPSHOT_FORMAT_V1: u8 = 1;
30
31pub const MAX_SNAPSHOT_BODY_BYTES: usize = 256 * 1024 * 1024;
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub struct SnapshotHeader {
41 pub format_version: u8,
43 pub sequence_no: u64,
45 pub state_root: B256,
47 pub sealed_at_ts: u64,
49 pub body_len: u64,
51}
52
53#[derive(Debug)]
58pub struct SnapshotStore {
59 db: redb::Database,
60}
61
62impl SnapshotStore {
63 pub fn open(path: impl AsRef<Path>) -> Result<Self, BootnodeError> {
65 let path = path.as_ref();
66 if let Some(parent) = path.parent() {
67 std::fs::create_dir_all(parent)?;
68 }
69 let db = redb::Database::create(path)?;
70 let txn = db.begin_write()?;
71 txn.open_table(HEADERS_TABLE)?;
72 txn.open_table(BODIES_TABLE)?;
73 txn.commit()?;
74 info!(path = %path.display(), "snapshot store opened");
75 Ok(Self { db })
76 }
77
78 pub fn put(&self, header: &SnapshotHeader, body: &[u8]) -> Result<(), BootnodeError> {
82 if body.len() > MAX_SNAPSHOT_BODY_BYTES {
83 return Err(BootnodeError::SnapshotBodyTooLarge {
84 size: body.len(),
85 max: MAX_SNAPSHOT_BODY_BYTES,
86 });
87 }
88 if header.body_len != body.len() as u64 {
89 return Err(BootnodeError::SnapshotBodyLenMismatch {
90 header_body_len: header.body_len,
91 actual: body.len() as u64,
92 });
93 }
94 let header_bytes = rmp_serde::to_vec(header)?;
95 debug!(
96 sequence_no = header.sequence_no,
97 header_bytes = header_bytes.len(),
98 body_bytes = body.len(),
99 "snapshot put"
100 );
101 let txn = self.db.begin_write()?;
102 {
103 let mut headers = txn.open_table(HEADERS_TABLE)?;
104 let mut bodies = txn.open_table(BODIES_TABLE)?;
105 headers.insert(header.sequence_no, header_bytes.as_slice())?;
106 bodies.insert(header.sequence_no, body)?;
107 }
108 txn.commit()?;
109 Ok(())
110 }
111
112 pub fn get(&self, sequence_no: u64) -> Result<Option<SnapshotHeader>, BootnodeError> {
114 let txn = self.db.begin_read()?;
115 let table = txn.open_table(HEADERS_TABLE)?;
116 match table.get(sequence_no)? {
117 Some(guard) => Ok(Some(decode_header(guard.value())?)),
118 None => Ok(None),
119 }
120 }
121
122 pub fn latest_at_most(&self, up_to: u64) -> Result<Option<SnapshotHeader>, BootnodeError> {
124 let txn = self.db.begin_read()?;
125 let table = txn.open_table(HEADERS_TABLE)?;
126 let mut range = table.range(..=up_to)?;
127 match range.next_back() {
128 Some(Ok((_, guard))) => Ok(Some(decode_header(guard.value())?)),
129 Some(Err(e)) => Err(e.into()),
130 None => Ok(None),
131 }
132 }
133
134 pub fn read_body(&self, sequence_no: u64, writer: &mut impl Write) -> Result<Option<u64>, BootnodeError> {
155 let txn = self.db.begin_read()?;
156 let headers = txn.open_table(HEADERS_TABLE)?;
157 let header = match headers.get(sequence_no)? {
158 Some(guard) => decode_header(guard.value())?,
159 None => return Ok(None),
160 };
161 let bodies = txn.open_table(BODIES_TABLE)?;
162 let Some(guard) = bodies.get(sequence_no)? else {
163 warn!(
164 sequence_no,
165 header_body_len = header.body_len,
166 "orphan snapshot header — body missing on disk"
167 );
168 return Err(BootnodeError::SnapshotBodyLenMismatch {
169 header_body_len: header.body_len,
170 actual: 0,
171 });
172 };
173 let body = guard.value();
174 let actual = body.len() as u64;
175 if actual != header.body_len {
176 warn!(
177 sequence_no,
178 header_body_len = header.body_len,
179 actual,
180 "snapshot body length drift between header and bodies tables"
181 );
182 return Err(BootnodeError::SnapshotBodyLenMismatch {
183 header_body_len: header.body_len,
184 actual,
185 });
186 }
187 writer.write_all(body)?;
188 Ok(Some(actual))
189 }
190}
191
192fn decode_header(bytes: &[u8]) -> Result<SnapshotHeader, BootnodeError> {
193 let hdr: SnapshotHeader = rmp_serde::from_slice(bytes)?;
194 if hdr.format_version != SNAPSHOT_FORMAT_V1 {
195 warn!(version = hdr.format_version, "unsupported snapshot format");
196 return Err(BootnodeError::UnsupportedSnapshotFormat(hdr.format_version));
197 }
198 Ok(hdr)
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 const TEST_BODY: &[u8] = &[0xAB; 64];
206
207 fn test_header(sequence_no: u64, sealed_at_ts: u64) -> SnapshotHeader {
208 SnapshotHeader {
209 format_version: SNAPSHOT_FORMAT_V1,
210 sequence_no,
211 state_root: B256::from([sequence_no as u8; 32]),
212 sealed_at_ts,
213 body_len: TEST_BODY.len() as u64,
214 }
215 }
216
217 #[test]
218 fn roundtrip_header() {
219 let dir = tempfile::tempdir().unwrap();
220 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
221
222 let hdr = test_header(42, 1_700_000_000);
223 store.put(&hdr, TEST_BODY).unwrap();
224
225 let got = store.get(42).unwrap().expect("should exist");
226 assert_eq!(got.format_version, SNAPSHOT_FORMAT_V1);
227 assert_eq!(got.sequence_no, hdr.sequence_no);
228 assert_eq!(got.state_root, hdr.state_root);
229 assert_eq!(got.sealed_at_ts, hdr.sealed_at_ts);
230 assert_eq!(got.body_len, TEST_BODY.len() as u64);
231 }
232
233 #[test]
234 fn read_body_roundtrip() {
235 let dir = tempfile::tempdir().unwrap();
236 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
237
238 let hdr = test_header(42, 1_700_000_000);
239 store.put(&hdr, TEST_BODY).unwrap();
240
241 let mut buf = Vec::new();
242 let n = store.read_body(42, &mut buf).unwrap().expect("should exist");
243 assert_eq!(n, TEST_BODY.len() as u64);
244 assert_eq!(buf, TEST_BODY);
245 }
246
247 #[test]
248 fn read_body_missing_returns_none() {
249 let dir = tempfile::tempdir().unwrap();
250 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
251
252 assert!(store.read_body(99, &mut Vec::new()).unwrap().is_none());
253 }
254
255 #[test]
256 fn get_missing_returns_none() {
257 let dir = tempfile::tempdir().unwrap();
258 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
259
260 assert!(store.get(99).unwrap().is_none());
261 }
262
263 #[test]
264 fn latest_at_most_finds_right_snap() {
265 let dir = tempfile::tempdir().unwrap();
266 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
267
268 for seq in [10, 20, 30, 40] {
269 store.put(&test_header(seq, 1_000_000 + seq), TEST_BODY).unwrap();
270 }
271
272 let hdr = store.latest_at_most(25).unwrap().expect("should find 20");
273 assert_eq!(hdr.sequence_no, 20);
274
275 let hdr = store.latest_at_most(40).unwrap().expect("should find 40");
276 assert_eq!(hdr.sequence_no, 40);
277
278 assert!(store.latest_at_most(5).unwrap().is_none());
279 }
280
281 #[test]
282 fn sealed_at_ts_preserved_including_epoch_zero() {
283 let dir = tempfile::tempdir().unwrap();
284 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
285
286 store.put(&test_header(1, 0), TEST_BODY).unwrap();
287
288 let hdr = store.get(1).unwrap().expect("should exist");
289 assert_eq!(hdr.sealed_at_ts, 0);
290
291 let hdr = store.latest_at_most(100).unwrap().expect("should find it");
292 assert_eq!(hdr.sealed_at_ts, 0);
293 }
294
295 #[test]
296 fn sequence_no_zero_roundtrip() {
297 let dir = tempfile::tempdir().unwrap();
298 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
299
300 store.put(&test_header(0, 0), TEST_BODY).unwrap();
301
302 let hdr = store.get(0).unwrap().expect("seq 0 should exist");
303 assert_eq!(hdr.sequence_no, 0);
304
305 let hdr = store
306 .latest_at_most(0)
307 .unwrap()
308 .expect("latest_at_most(0) should return seq 0");
309 assert_eq!(hdr.sequence_no, 0);
310 }
311
312 #[test]
313 fn unsupported_format_version_rejected() {
314 let dir = tempfile::tempdir().unwrap();
315 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
316
317 let bad_hdr = SnapshotHeader {
318 format_version: 99,
319 sequence_no: 1,
320 state_root: B256::from([1u8; 32]),
321 sealed_at_ts: 1_000,
322 body_len: 64,
323 };
324 let bytes = rmp_serde::to_vec(&bad_hdr).unwrap();
325 let txn = store.db.begin_write().unwrap();
326 {
327 let mut table = txn.open_table(HEADERS_TABLE).unwrap();
328 table.insert(1u64, bytes.as_slice()).unwrap();
329 }
330 txn.commit().unwrap();
331
332 let err = store.get(1).unwrap_err();
333 assert!(matches!(err, BootnodeError::UnsupportedSnapshotFormat(99)));
334 }
335
336 #[test]
337 fn read_body_rejects_cross_table_body_len_drift() {
338 let dir = tempfile::tempdir().unwrap();
339 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
340
341 let mut hdr = test_header(7, 1_000);
345 hdr.body_len = 64;
346 let header_bytes = rmp_serde::to_vec(&hdr).unwrap();
347
348 let txn = store.db.begin_write().unwrap();
349 {
350 let mut headers = txn.open_table(HEADERS_TABLE).unwrap();
351 headers.insert(7u64, header_bytes.as_slice()).unwrap();
352 let mut bodies = txn.open_table(BODIES_TABLE).unwrap();
353 let short_body = vec![0xCC; 32];
354 bodies.insert(7u64, short_body.as_slice()).unwrap();
355 }
356 txn.commit().unwrap();
357
358 let mut buf = Vec::new();
359 let err = store.read_body(7, &mut buf).unwrap_err();
360 assert!(matches!(
361 err,
362 BootnodeError::SnapshotBodyLenMismatch {
363 header_body_len: 64,
364 actual: 32
365 }
366 ));
367 assert!(buf.is_empty(), "writer must not see partial bytes on mismatch");
368 }
369
370 #[test]
371 fn read_body_rejects_orphan_header() {
372 let dir = tempfile::tempdir().unwrap();
373 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
374
375 let hdr = test_header(9, 1_000);
377 let header_bytes = rmp_serde::to_vec(&hdr).unwrap();
378
379 let txn = store.db.begin_write().unwrap();
380 {
381 let mut headers = txn.open_table(HEADERS_TABLE).unwrap();
382 headers.insert(9u64, header_bytes.as_slice()).unwrap();
383 }
384 txn.commit().unwrap();
385
386 let err = store.read_body(9, &mut Vec::new()).unwrap_err();
387 assert!(matches!(err, BootnodeError::SnapshotBodyLenMismatch { actual: 0, .. }));
388 }
389
390 #[test]
391 fn put_rejects_body_len_mismatch() {
392 let dir = tempfile::tempdir().unwrap();
393 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
394
395 let mut hdr = test_header(1, 1_000);
396 hdr.body_len = 999;
397
398 let err = store.put(&hdr, TEST_BODY).unwrap_err();
399 assert!(matches!(err, BootnodeError::SnapshotBodyLenMismatch { .. }));
400 assert!(store.get(1).unwrap().is_none());
401 }
402
403 #[test]
404 fn put_rejects_oversized_body() {
405 let dir = tempfile::tempdir().unwrap();
406 let store = SnapshotStore::open(dir.path().join("snap.redb")).unwrap();
407
408 let big_body = vec![0xCC; MAX_SNAPSHOT_BODY_BYTES + 1];
409 let hdr = SnapshotHeader {
410 format_version: SNAPSHOT_FORMAT_V1,
411 sequence_no: 1,
412 state_root: B256::ZERO,
413 sealed_at_ts: 1_000,
414 body_len: big_body.len() as u64,
415 };
416
417 let err = store.put(&hdr, &big_body).unwrap_err();
418 assert!(matches!(err, BootnodeError::SnapshotBodyTooLarge { .. }));
419 assert!(store.get(1).unwrap().is_none());
420 }
421}