commonware_runtime/utils/buffer/pool/
mod.rs1use crate::{Blob, Error};
27use bytes::{Buf, BufMut};
28use commonware_codec::{EncodeFixed, FixedSize, Read as CodecRead, ReadExt, Write};
29use commonware_cryptography::{crc32, Crc32};
30use commonware_utils::StableBuf;
31
32mod append;
33mod page_cache;
34mod read;
35
36pub use append::Append;
37pub use page_cache::PoolRef;
38pub use read::Replay;
39use tracing::{debug, error};
40
41const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64;
43
44async fn get_page_from_blob(
48 blob: &impl Blob,
49 page_num: u64,
50 logical_page_size: u64,
51) -> Result<StableBuf, Error> {
52 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
53 let physical_page_start = page_num * physical_page_size;
54
55 let mut page = blob
56 .read_at(vec![0; physical_page_size as usize], physical_page_start)
57 .await?;
58
59 let Some(record) = Checksum::validate_page(page.as_ref()) else {
60 return Err(Error::InvalidChecksum);
61 };
62 let (len, _) = record.get_crc();
63
64 page.truncate(len as usize);
65
66 Ok(page)
67}
68
69#[derive(Clone)]
75struct Checksum {
76 len1: u16,
77 crc1: u32,
78 len2: u16,
79 crc2: u32,
80}
81
82impl Checksum {
83 const fn new(len: u16, crc: u32) -> Self {
86 Self {
87 len1: len,
88 crc1: crc,
89 len2: 0,
90 crc2: 0,
91 }
92 }
93
94 fn validate_page(buf: &[u8]) -> Option<Self> {
99 let page_size = buf.len() as u64;
100 if page_size < CHECKSUM_SIZE {
101 error!(
102 page_size,
103 required = CHECKSUM_SIZE,
104 "read page smaller than CRC record"
105 );
106 return None;
107 }
108
109 let crc_start_idx = (page_size - CHECKSUM_SIZE) as usize;
110 let mut crc_bytes = &buf[crc_start_idx..];
111 let mut crc_record = Self::read(&mut crc_bytes).expect("CRC record read should not fail");
112 let (len, crc) = crc_record.get_crc();
113
114 let len_usize = len as usize;
117 if len_usize == 0 {
118 debug!("Invalid CRC: len==0");
120 return None;
121 }
122
123 if len_usize > crc_start_idx {
124 debug!("Invalid CRC: len too long. Using fallback CRC");
126 if crc_record.validate_fallback(buf, crc_start_idx) {
127 return Some(crc_record);
128 }
129 return None;
130 }
131
132 let computed_crc = Crc32::checksum(&buf[..len_usize]);
133 if computed_crc != crc {
134 debug!("Invalid CRC: doesn't match page contents. Using fallback CRC");
135 if crc_record.validate_fallback(buf, crc_start_idx) {
136 return Some(crc_record);
137 }
138 return None;
139 }
140
141 Some(crc_record)
142 }
143
144 fn validate_fallback(&mut self, buf: &[u8], crc_start_idx: usize) -> bool {
148 let (len, crc) = self.get_fallback_crc();
149 if len == 0 {
150 debug!("Invalid fallback CRC: len==0");
152 return false;
153 }
154
155 let len_usize = len as usize;
156
157 if len_usize > crc_start_idx {
158 debug!("Invalid fallback CRC: len too long.");
160 return false;
161 }
162
163 let computed_crc = Crc32::checksum(&buf[..len_usize]);
164 if computed_crc != crc {
165 debug!("Invalid fallback CRC: doesn't match page contents.");
166 return false;
167 }
168
169 true
170 }
171
172 const fn get_crc(&self) -> (u16, u32) {
176 if self.len1 >= self.len2 {
177 (self.len1, self.crc1)
178 } else {
179 (self.len2, self.crc2)
180 }
181 }
182
183 const fn get_fallback_crc(&mut self) -> (u16, u32) {
187 if self.len1 >= self.len2 {
188 self.len1 = 0;
190 self.crc1 = 0;
191 (self.len2, self.crc2)
192 } else {
193 self.len2 = 0;
195 self.crc2 = 0;
196 (self.len1, self.crc1)
197 }
198 }
199
200 fn to_bytes(&self) -> [u8; CHECKSUM_SIZE as usize] {
202 self.encode_fixed()
203 }
204}
205
206impl Write for Checksum {
207 fn write(&self, buf: &mut impl BufMut) {
208 self.len1.write(buf);
209 self.crc1.write(buf);
210 self.len2.write(buf);
211 self.crc2.write(buf);
212 }
213}
214
215impl CodecRead for Checksum {
216 type Cfg = ();
217
218 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
219 Ok(Self {
220 len1: u16::read(buf)?,
221 crc1: u32::read(buf)?,
222 len2: u16::read(buf)?,
223 crc2: u32::read(buf)?,
224 })
225 }
226}
227
228impl FixedSize for Checksum {
229 const SIZE: usize = 2 * u16::SIZE + 2 * crc32::Digest::SIZE;
230}
231
232#[cfg(feature = "arbitrary")]
233impl arbitrary::Arbitrary<'_> for Checksum {
234 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
235 Ok(Self {
236 len1: u.arbitrary()?,
237 crc1: u.arbitrary()?,
238 len2: u.arbitrary()?,
239 crc2: u.arbitrary()?,
240 })
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn test_crc_record_encode_read_roundtrip() {
250 let record = Checksum {
251 len1: 0x1234,
252 crc1: 0xAABBCCDD,
253 len2: 0x5678,
254 crc2: 0x11223344,
255 };
256
257 let bytes = record.to_bytes();
258 let restored = Checksum::read(&mut &bytes[..]).unwrap();
259
260 assert_eq!(restored.len1, 0x1234);
261 assert_eq!(restored.crc1, 0xAABBCCDD);
262 assert_eq!(restored.len2, 0x5678);
263 assert_eq!(restored.crc2, 0x11223344);
264 }
265
266 #[test]
267 fn test_crc_record_encoding() {
268 let record = Checksum {
269 len1: 0x0102,
270 crc1: 0x03040506,
271 len2: 0x0708,
272 crc2: 0x090A0B0C,
273 };
274
275 let bytes = record.to_bytes();
276 assert_eq!(
278 bytes,
279 [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C]
280 );
281 }
282
283 #[test]
284 fn test_crc_record_get_crc_len1_larger() {
285 let record = Checksum {
286 len1: 200,
287 crc1: 0xAAAAAAAA,
288 len2: 100,
289 crc2: 0xBBBBBBBB,
290 };
291
292 let (len, crc) = record.get_crc();
293 assert_eq!(len, 200);
294 assert_eq!(crc, 0xAAAAAAAA);
295 }
296
297 #[test]
298 fn test_crc_record_get_crc_len2_larger() {
299 let record = Checksum {
300 len1: 100,
301 crc1: 0xAAAAAAAA,
302 len2: 200,
303 crc2: 0xBBBBBBBB,
304 };
305
306 let (len, crc) = record.get_crc();
307 assert_eq!(len, 200);
308 assert_eq!(crc, 0xBBBBBBBB);
309 }
310
311 #[test]
312 fn test_crc_record_get_crc_equal_lengths() {
313 let record = Checksum {
315 len1: 100,
316 crc1: 0xAAAAAAAA,
317 len2: 100,
318 crc2: 0xBBBBBBBB,
319 };
320
321 let (len, crc) = record.get_crc();
322 assert_eq!(len, 100);
323 assert_eq!(crc, 0xAAAAAAAA);
324 }
325
326 #[test]
327 fn test_validate_page_valid() {
328 let logical_page_size = 64usize;
329 let physical_page_size = logical_page_size + Checksum::SIZE;
330 let mut page = vec![0u8; physical_page_size];
331
332 let data = b"hello world";
334 page[..data.len()].copy_from_slice(data);
335
336 let crc = Crc32::checksum(&page[..data.len()]);
338 let record = Checksum::new(data.len() as u16, crc);
339
340 let crc_start = physical_page_size - Checksum::SIZE;
342 page[crc_start..].copy_from_slice(&record.to_bytes());
343
344 let validated = Checksum::validate_page(&page);
346 assert!(validated.is_some());
347 let (len, _) = validated.unwrap().get_crc();
348 assert_eq!(len as usize, data.len());
349 }
350
351 #[test]
352 fn test_validate_page_invalid_crc() {
353 let logical_page_size = 64usize;
354 let physical_page_size = logical_page_size + Checksum::SIZE;
355 let mut page = vec![0u8; physical_page_size];
356
357 let data = b"hello world";
359 page[..data.len()].copy_from_slice(data);
360
361 let wrong_crc = 0xBADBADBA;
363 let record = Checksum::new(data.len() as u16, wrong_crc);
364
365 let crc_start = physical_page_size - Checksum::SIZE;
366 page[crc_start..].copy_from_slice(&record.to_bytes());
367
368 let validated = Checksum::validate_page(&page);
370 assert!(validated.is_none());
371 }
372
373 #[test]
374 fn test_validate_page_corrupted_data() {
375 let logical_page_size = 64usize;
376 let physical_page_size = logical_page_size + Checksum::SIZE;
377 let mut page = vec![0u8; physical_page_size];
378
379 let data = b"hello world";
381 page[..data.len()].copy_from_slice(data);
382 let crc = Crc32::checksum(&page[..data.len()]);
383 let record = Checksum::new(data.len() as u16, crc);
384
385 let crc_start = physical_page_size - Checksum::SIZE;
386 page[crc_start..].copy_from_slice(&record.to_bytes());
387
388 page[0] = 0xFF;
390
391 let validated = Checksum::validate_page(&page);
393 assert!(validated.is_none());
394 }
395
396 #[test]
397 fn test_validate_page_uses_larger_len() {
398 let logical_page_size = 64usize;
399 let physical_page_size = logical_page_size + Checksum::SIZE;
400 let mut page = vec![0u8; physical_page_size];
401
402 let data = b"hello world, this is longer";
404 page[..data.len()].copy_from_slice(data);
405 let crc = Crc32::checksum(&page[..data.len()]);
406
407 let record = Checksum {
409 len1: 5,
410 crc1: 0xDEADBEEF, len2: data.len() as u16,
412 crc2: crc,
413 };
414
415 let crc_start = physical_page_size - Checksum::SIZE;
416 page[crc_start..].copy_from_slice(&record.to_bytes());
417
418 let validated = Checksum::validate_page(&page);
420 assert!(validated.is_some());
421 let (len, _) = validated.unwrap().get_crc();
422 assert_eq!(len as usize, data.len());
423 }
424
425 #[test]
426 fn test_validate_page_uses_fallback() {
427 let logical_page_size = 64usize;
428 let physical_page_size = logical_page_size + Checksum::SIZE;
429 let mut page = vec![0u8; physical_page_size];
430
431 let data = b"fallback data";
433 page[..data.len()].copy_from_slice(data);
434 let valid_crc = Crc32::checksum(&page[..data.len()]);
435 let valid_len = data.len() as u16;
436
437 let record = Checksum {
441 len1: valid_len + 10, crc1: 0xBAD1DEA, len2: valid_len, crc2: valid_crc, };
446
447 let crc_start = physical_page_size - Checksum::SIZE;
448 page[crc_start..].copy_from_slice(&record.to_bytes());
449
450 let validated = Checksum::validate_page(&page);
452
453 assert!(validated.is_some(), "Should have validated using fallback");
454 let validated = validated.unwrap();
455 let (len, crc) = validated.get_crc();
456 assert_eq!(len, valid_len);
457 assert_eq!(crc, valid_crc);
458
459 assert_eq!(validated.len1, 0);
461 assert_eq!(validated.crc1, 0);
462 }
463
464 #[test]
465 fn test_validate_page_no_fallback_available() {
466 let logical_page_size = 64usize;
467 let physical_page_size = logical_page_size + Checksum::SIZE;
468 let mut page = vec![0u8; physical_page_size];
469
470 let data = b"some data";
472 page[..data.len()].copy_from_slice(data);
473
474 let record = Checksum {
478 len1: data.len() as u16,
479 crc1: 0xBAD1DEA, len2: 0, crc2: 0,
482 };
483
484 let crc_start = physical_page_size - Checksum::SIZE;
485 page[crc_start..].copy_from_slice(&record.to_bytes());
486
487 let validated = Checksum::validate_page(&page);
489 assert!(
490 validated.is_none(),
491 "Should fail when primary is invalid and fallback has len=0"
492 );
493 }
494
495 #[cfg(feature = "arbitrary")]
496 mod conformance {
497 use super::*;
498 use commonware_codec::conformance::CodecConformance;
499
500 commonware_conformance::conformance_tests! {
501 CodecConformance<Checksum>,
502 }
503 }
504}