commonware_runtime/utils/buffer/pool/
mod.rs

1//! Blob wrappers for reading and writing data with integrity guarantees, plus a buffer pool that
2//! manages read caching over the data.
3//!
4//! # Page-oriented structure
5//!
6//! Blob data is stored in _pages_ having a logical `page_size` dictated by the managing buffer
7//! pool. A _physical page_ consists of `page_size` bytes of data followed by a 12-byte _CRC
8//! record_ containing:
9//!
10//! ```text
11//! | len1 (2 bytes) |  crc1 (4 bytes) | len2 (2 bytes) | crc2 (4 bytes) |
12//! ```
13//!
14//! Two checksums are stored so that partial pages can be re-written without overwriting a valid
15//! checksum for its previously committed contents. A checksum over a page is computed over the
16//! first [0,len) bytes in the page, with all other bytes in the page ignored. This implementation
17//! always 0-pads the range [len, page_size). A checksum with length 0 is never considered
18//! valid. If both checksums are valid for the page, the one with the larger `len` is considered
19//! authoritative.
20//!
21//! A _full_ page is one whose crc stores a len equal to the logical page size. Otherwise the page
22//! is called _partial_. All pages in a blob are full except for the very last page, which can be
23//! full or partial. A partial page's logical bytes are immutable on commit, and if it's re-written,
24//! it's only to add more bytes after the existing ones.
25
26use crate::{Blob, Error};
27use bytes::{Buf, BufMut};
28use commonware_codec::{EncodeFixed, FixedSize, Read as CodecRead, ReadExt, Write};
29use commonware_cryptography::{crc32, Crc32};
30use commonware_utils::StableBuf;
31
32mod append;
33mod page_cache;
34mod read;
35
36pub use append::Append;
37pub use page_cache::PoolRef;
38pub use read::Replay;
39use tracing::{debug, error};
40
41// A checksum record contains two u16 lengths and two CRCs (each 4 bytes).
42const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64;
43
44/// Read the designated page from the underlying blob and return its logical bytes as a vector if it
45/// passes the integrity check, returning error otherwise. Safely handles partial pages. Caller can
46/// check the length of the returned vector to determine if the page was partial vs full.
47async fn get_page_from_blob(
48    blob: &impl Blob,
49    page_num: u64,
50    logical_page_size: u64,
51) -> Result<StableBuf, Error> {
52    let physical_page_size = logical_page_size + CHECKSUM_SIZE;
53    let physical_page_start = page_num * physical_page_size;
54
55    let mut page = blob
56        .read_at(vec![0; physical_page_size as usize], physical_page_start)
57        .await?;
58
59    let Some(record) = Checksum::validate_page(page.as_ref()) else {
60        return Err(Error::InvalidChecksum);
61    };
62    let (len, _) = record.get_crc();
63
64    page.truncate(len as usize);
65
66    Ok(page)
67}
68
69/// Describes a CRC record stored at the end of a page.
70///
71/// The CRC accompanied by the larger length is the one that should be treated as authoritative for
72/// the page. Two checksums are stored so that partial pages can be written without overwriting a
73/// valid checksum for a previously committed partial page.
74#[derive(Clone)]
75struct Checksum {
76    len1: u16,
77    crc1: u32,
78    len2: u16,
79    crc2: u32,
80}
81
82impl Checksum {
83    /// Create a new CRC record with the given length and CRC.
84    /// The new CRC is stored in the first slot (len1/crc1), with the second slot zeroed.
85    const fn new(len: u16, crc: u32) -> Self {
86        Self {
87            len1: len,
88            crc1: crc,
89            len2: 0,
90            crc2: 0,
91        }
92    }
93
94    /// Return the CRC record for the page if it is valid. The provided slice is assumed to be
95    /// exactly the size of a physical page. The record may not precisely reflect the bytes written
96    /// if what should have been the most recent CRC doesn't validate, in which case it will be
97    /// zeroed and the other CRC used as a fallback.
98    fn validate_page(buf: &[u8]) -> Option<Self> {
99        let page_size = buf.len() as u64;
100        if page_size < CHECKSUM_SIZE {
101            error!(
102                page_size,
103                required = CHECKSUM_SIZE,
104                "read page smaller than CRC record"
105            );
106            return None;
107        }
108
109        let crc_start_idx = (page_size - CHECKSUM_SIZE) as usize;
110        let mut crc_bytes = &buf[crc_start_idx..];
111        let mut crc_record = Self::read(&mut crc_bytes).expect("CRC record read should not fail");
112        let (len, crc) = crc_record.get_crc();
113
114        // Validate that len is in the valid range [1, logical_page_size].
115        // A page with len=0 is invalid (e.g., all-zero pages from unwritten data).
116        let len_usize = len as usize;
117        if len_usize == 0 {
118            // Both CRCs have 0 length, so there is no fallback possible.
119            debug!("Invalid CRC: len==0");
120            return None;
121        }
122
123        if len_usize > crc_start_idx {
124            // len is too large so this CRC isn't valid. Fall back to the other CRC.
125            debug!("Invalid CRC: len too long. Using fallback CRC");
126            if crc_record.validate_fallback(buf, crc_start_idx) {
127                return Some(crc_record);
128            }
129            return None;
130        }
131
132        let computed_crc = Crc32::checksum(&buf[..len_usize]);
133        if computed_crc != crc {
134            debug!("Invalid CRC: doesn't match page contents. Using fallback CRC");
135            if crc_record.validate_fallback(buf, crc_start_idx) {
136                return Some(crc_record);
137            }
138            return None;
139        }
140
141        Some(crc_record)
142    }
143
144    /// Attempts to validate a CRC record based on its fallback CRC because the primary CRC failed
145    /// validation. The primary CRC is zeroed in the process. Returns false if the fallback CRC
146    /// fails validation.
147    fn validate_fallback(&mut self, buf: &[u8], crc_start_idx: usize) -> bool {
148        let (len, crc) = self.get_fallback_crc();
149        if len == 0 {
150            // No fallback available (only one CRC was ever written to this page).
151            debug!("Invalid fallback CRC: len==0");
152            return false;
153        }
154
155        let len_usize = len as usize;
156
157        if len_usize > crc_start_idx {
158            // len is too large so this CRC isn't valid.
159            debug!("Invalid fallback CRC: len too long.");
160            return false;
161        }
162
163        let computed_crc = Crc32::checksum(&buf[..len_usize]);
164        if computed_crc != crc {
165            debug!("Invalid fallback CRC: doesn't match page contents.");
166            return false;
167        }
168
169        true
170    }
171
172    /// Returns the CRC record with the longer (authoritative) length, without performing any
173    /// validation. If they both have the same length (which should only happen due to data
174    /// corruption) return the first.
175    const fn get_crc(&self) -> (u16, u32) {
176        if self.len1 >= self.len2 {
177            (self.len1, self.crc1)
178        } else {
179            (self.len2, self.crc2)
180        }
181    }
182
183    /// Zeroes the primary CRC (because we assumed it failed validation) and returns the other. This
184    /// should only be called if the primary CRC failed validation. After this returns, get_crc will
185    /// no longer return the invalid primary CRC.
186    const fn get_fallback_crc(&mut self) -> (u16, u32) {
187        if self.len1 >= self.len2 {
188            // First CRC was primary, and must have been invalid. Zero it and return the second.
189            self.len1 = 0;
190            self.crc1 = 0;
191            (self.len2, self.crc2)
192        } else {
193            // Second CRC was primary, and must have been invalid. Zero it and return the first.
194            self.len2 = 0;
195            self.crc2 = 0;
196            (self.len1, self.crc1)
197        }
198    }
199
200    /// Returns the CRC record in its storage representation.
201    fn to_bytes(&self) -> [u8; CHECKSUM_SIZE as usize] {
202        self.encode_fixed()
203    }
204}
205
206impl Write for Checksum {
207    fn write(&self, buf: &mut impl BufMut) {
208        self.len1.write(buf);
209        self.crc1.write(buf);
210        self.len2.write(buf);
211        self.crc2.write(buf);
212    }
213}
214
215impl CodecRead for Checksum {
216    type Cfg = ();
217
218    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
219        Ok(Self {
220            len1: u16::read(buf)?,
221            crc1: u32::read(buf)?,
222            len2: u16::read(buf)?,
223            crc2: u32::read(buf)?,
224        })
225    }
226}
227
228impl FixedSize for Checksum {
229    const SIZE: usize = 2 * u16::SIZE + 2 * crc32::Digest::SIZE;
230}
231
232#[cfg(feature = "arbitrary")]
233impl arbitrary::Arbitrary<'_> for Checksum {
234    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
235        Ok(Self {
236            len1: u.arbitrary()?,
237            crc1: u.arbitrary()?,
238            len2: u.arbitrary()?,
239            crc2: u.arbitrary()?,
240        })
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn test_crc_record_encode_read_roundtrip() {
250        let record = Checksum {
251            len1: 0x1234,
252            crc1: 0xAABBCCDD,
253            len2: 0x5678,
254            crc2: 0x11223344,
255        };
256
257        let bytes = record.to_bytes();
258        let restored = Checksum::read(&mut &bytes[..]).unwrap();
259
260        assert_eq!(restored.len1, 0x1234);
261        assert_eq!(restored.crc1, 0xAABBCCDD);
262        assert_eq!(restored.len2, 0x5678);
263        assert_eq!(restored.crc2, 0x11223344);
264    }
265
266    #[test]
267    fn test_crc_record_encoding() {
268        let record = Checksum {
269            len1: 0x0102,
270            crc1: 0x03040506,
271            len2: 0x0708,
272            crc2: 0x090A0B0C,
273        };
274
275        let bytes = record.to_bytes();
276        // Verify big-endian encoding
277        assert_eq!(
278            bytes,
279            [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C]
280        );
281    }
282
283    #[test]
284    fn test_crc_record_get_crc_len1_larger() {
285        let record = Checksum {
286            len1: 200,
287            crc1: 0xAAAAAAAA,
288            len2: 100,
289            crc2: 0xBBBBBBBB,
290        };
291
292        let (len, crc) = record.get_crc();
293        assert_eq!(len, 200);
294        assert_eq!(crc, 0xAAAAAAAA);
295    }
296
297    #[test]
298    fn test_crc_record_get_crc_len2_larger() {
299        let record = Checksum {
300            len1: 100,
301            crc1: 0xAAAAAAAA,
302            len2: 200,
303            crc2: 0xBBBBBBBB,
304        };
305
306        let (len, crc) = record.get_crc();
307        assert_eq!(len, 200);
308        assert_eq!(crc, 0xBBBBBBBB);
309    }
310
311    #[test]
312    fn test_crc_record_get_crc_equal_lengths() {
313        // When lengths are equal, len1/crc1 is returned (first slot wins ties).
314        let record = Checksum {
315            len1: 100,
316            crc1: 0xAAAAAAAA,
317            len2: 100,
318            crc2: 0xBBBBBBBB,
319        };
320
321        let (len, crc) = record.get_crc();
322        assert_eq!(len, 100);
323        assert_eq!(crc, 0xAAAAAAAA);
324    }
325
326    #[test]
327    fn test_validate_page_valid() {
328        let logical_page_size = 64usize;
329        let physical_page_size = logical_page_size + Checksum::SIZE;
330        let mut page = vec![0u8; physical_page_size];
331
332        // Write some data
333        let data = b"hello world";
334        page[..data.len()].copy_from_slice(data);
335
336        // Compute CRC of the data portion
337        let crc = Crc32::checksum(&page[..data.len()]);
338        let record = Checksum::new(data.len() as u16, crc);
339
340        // Write the CRC record at the end
341        let crc_start = physical_page_size - Checksum::SIZE;
342        page[crc_start..].copy_from_slice(&record.to_bytes());
343
344        // Validate - should return Some with the Checksum
345        let validated = Checksum::validate_page(&page);
346        assert!(validated.is_some());
347        let (len, _) = validated.unwrap().get_crc();
348        assert_eq!(len as usize, data.len());
349    }
350
351    #[test]
352    fn test_validate_page_invalid_crc() {
353        let logical_page_size = 64usize;
354        let physical_page_size = logical_page_size + Checksum::SIZE;
355        let mut page = vec![0u8; physical_page_size];
356
357        // Write some data
358        let data = b"hello world";
359        page[..data.len()].copy_from_slice(data);
360
361        // Write a record with wrong CRC
362        let wrong_crc = 0xBADBADBA;
363        let record = Checksum::new(data.len() as u16, wrong_crc);
364
365        let crc_start = physical_page_size - Checksum::SIZE;
366        page[crc_start..].copy_from_slice(&record.to_bytes());
367
368        // Should fail validation (return None)
369        let validated = Checksum::validate_page(&page);
370        assert!(validated.is_none());
371    }
372
373    #[test]
374    fn test_validate_page_corrupted_data() {
375        let logical_page_size = 64usize;
376        let physical_page_size = logical_page_size + Checksum::SIZE;
377        let mut page = vec![0u8; physical_page_size];
378
379        // Write some data and compute correct CRC
380        let data = b"hello world";
381        page[..data.len()].copy_from_slice(data);
382        let crc = Crc32::checksum(&page[..data.len()]);
383        let record = Checksum::new(data.len() as u16, crc);
384
385        let crc_start = physical_page_size - Checksum::SIZE;
386        page[crc_start..].copy_from_slice(&record.to_bytes());
387
388        // Corrupt the data
389        page[0] = 0xFF;
390
391        // Should fail validation (return None)
392        let validated = Checksum::validate_page(&page);
393        assert!(validated.is_none());
394    }
395
396    #[test]
397    fn test_validate_page_uses_larger_len() {
398        let logical_page_size = 64usize;
399        let physical_page_size = logical_page_size + Checksum::SIZE;
400        let mut page = vec![0u8; physical_page_size];
401
402        // Write data and compute CRC for the larger portion
403        let data = b"hello world, this is longer";
404        page[..data.len()].copy_from_slice(data);
405        let crc = Crc32::checksum(&page[..data.len()]);
406
407        // Create a record where len2 has the valid CRC for longer data
408        let record = Checksum {
409            len1: 5,
410            crc1: 0xDEADBEEF, // Invalid CRC for shorter data
411            len2: data.len() as u16,
412            crc2: crc,
413        };
414
415        let crc_start = physical_page_size - Checksum::SIZE;
416        page[crc_start..].copy_from_slice(&record.to_bytes());
417
418        // Should validate using len2/crc2 since len2 > len1
419        let validated = Checksum::validate_page(&page);
420        assert!(validated.is_some());
421        let (len, _) = validated.unwrap().get_crc();
422        assert_eq!(len as usize, data.len());
423    }
424
425    #[test]
426    fn test_validate_page_uses_fallback() {
427        let logical_page_size = 64usize;
428        let physical_page_size = logical_page_size + Checksum::SIZE;
429        let mut page = vec![0u8; physical_page_size];
430
431        // Write data
432        let data = b"fallback data";
433        page[..data.len()].copy_from_slice(data);
434        let valid_crc = Crc32::checksum(&page[..data.len()]);
435        let valid_len = data.len() as u16;
436
437        // Create a record where:
438        // len1 is larger (primary) but INVALID
439        // len2 is smaller (fallback) but VALID
440        let record = Checksum {
441            len1: valid_len + 10, // Larger, so it's primary
442            crc1: 0xBAD1DEA,      // Invalid CRC
443            len2: valid_len,      // Smaller, so it's fallback
444            crc2: valid_crc,      // Valid CRC
445        };
446
447        let crc_start = physical_page_size - Checksum::SIZE;
448        page[crc_start..].copy_from_slice(&record.to_bytes());
449
450        // Should validate using the fallback (len2)
451        let validated = Checksum::validate_page(&page);
452
453        assert!(validated.is_some(), "Should have validated using fallback");
454        let validated = validated.unwrap();
455        let (len, crc) = validated.get_crc();
456        assert_eq!(len, valid_len);
457        assert_eq!(crc, valid_crc);
458
459        // Verify that the invalid primary was zeroed out
460        assert_eq!(validated.len1, 0);
461        assert_eq!(validated.crc1, 0);
462    }
463
464    #[test]
465    fn test_validate_page_no_fallback_available() {
466        let logical_page_size = 64usize;
467        let physical_page_size = logical_page_size + Checksum::SIZE;
468        let mut page = vec![0u8; physical_page_size];
469
470        // Write some data
471        let data = b"some data";
472        page[..data.len()].copy_from_slice(data);
473
474        // Create a record where:
475        // len1 > 0 (primary) but with INVALID CRC
476        // len2 = 0 (no fallback available)
477        let record = Checksum {
478            len1: data.len() as u16,
479            crc1: 0xBAD1DEA, // Invalid CRC
480            len2: 0,         // No fallback
481            crc2: 0,
482        };
483
484        let crc_start = physical_page_size - Checksum::SIZE;
485        page[crc_start..].copy_from_slice(&record.to_bytes());
486
487        // Should fail validation since primary is invalid and no fallback exists
488        let validated = Checksum::validate_page(&page);
489        assert!(
490            validated.is_none(),
491            "Should fail when primary is invalid and fallback has len=0"
492        );
493    }
494
495    #[cfg(feature = "arbitrary")]
496    mod conformance {
497        use super::*;
498        use commonware_codec::conformance::CodecConformance;
499
500        commonware_conformance::conformance_tests! {
501            CodecConformance<Checksum>,
502        }
503    }
504}