Skip to main content

commonware_runtime/utils/buffer/paged/
mod.rs

1//! Blob wrappers for reading and writing data with integrity guarantees, plus a page cache that
2//! manages read caching over the data.
3//!
4//! # Page-oriented structure
5//!
6//! Blob data is stored in _pages_ having a logical `page_size` dictated by the managing page cache.
7//! A _physical page_ consists of `page_size` bytes of data followed by a 12-byte _CRC
8//! record_ containing:
9//!
10//! ```text
11//! | len1 (2 bytes) |  crc1 (4 bytes) | len2 (2 bytes) | crc2 (4 bytes) |
12//! ```
13//!
14//! Two checksums are stored so that partial pages can be re-written without overwriting a valid
15//! checksum for its previously committed contents. A checksum over a page is computed over the
16//! first [0,len) bytes in the page, with all other bytes in the page ignored. This implementation
17//! always 0-pads the range [len, page_size). A checksum with length 0 is never considered
18//! valid. If both checksums are valid for the page, the one with the larger `len` is considered
19//! authoritative.
20//!
21//! A _full_ page is one whose crc stores a len equal to the logical page size. Otherwise the page
22//! is called _partial_. All pages in a blob are full except for the very last page, which can be
23//! full or partial. A partial page's logical bytes are immutable on commit, and if it's re-written,
24//! it's only to add more bytes after the existing ones.
25
26use crate::{Blob, Buf, BufMut, Error, IoBuf};
27use commonware_codec::{EncodeFixed, FixedSize, Read as CodecRead, ReadExt, Write};
28use commonware_cryptography::{crc32, Crc32};
29
30mod append;
31mod cache;
32mod read;
33
34pub use append::Append;
35pub use cache::CacheRef;
36pub use read::Replay;
37use tracing::{debug, error};
38
39// A checksum record contains two u16 lengths and two CRCs (each 4 bytes).
40const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64;
41
42/// Read the designated page from the underlying blob and return its logical bytes as a vector if it
43/// passes the integrity check, returning error otherwise. Safely handles partial pages. Caller can
44/// check the length of the returned vector to determine if the page was partial vs full.
45async fn get_page_from_blob(
46    blob: &impl Blob,
47    page_num: u64,
48    logical_page_size: u64,
49) -> Result<IoBuf, Error> {
50    let physical_page_size = logical_page_size + CHECKSUM_SIZE;
51    let physical_page_start = page_num * physical_page_size;
52
53    let page = blob
54        .read_at(physical_page_start, physical_page_size as usize)
55        .await?
56        .coalesce();
57
58    let Some(record) = Checksum::validate_page(page.as_ref()) else {
59        return Err(Error::InvalidChecksum);
60    };
61    let (len, _) = record.get_crc();
62
63    Ok(page.freeze().slice(..len as usize))
64}
65
66/// Describes a CRC record stored at the end of a page.
67///
68/// The CRC accompanied by the larger length is the one that should be treated as authoritative for
69/// the page. Two checksums are stored so that partial pages can be written without overwriting a
70/// valid checksum for a previously committed partial page.
71#[derive(Clone)]
72struct Checksum {
73    len1: u16,
74    crc1: u32,
75    len2: u16,
76    crc2: u32,
77}
78
79impl Checksum {
80    /// Create a new CRC record with the given length and CRC.
81    /// The new CRC is stored in the first slot (len1/crc1), with the second slot zeroed.
82    const fn new(len: u16, crc: u32) -> Self {
83        Self {
84            len1: len,
85            crc1: crc,
86            len2: 0,
87            crc2: 0,
88        }
89    }
90
91    /// Return the CRC record for the page if it is valid. The provided slice is assumed to be
92    /// exactly the size of a physical page. The record may not precisely reflect the bytes written
93    /// if what should have been the most recent CRC doesn't validate, in which case it will be
94    /// zeroed and the other CRC used as a fallback.
95    fn validate_page(buf: &[u8]) -> Option<Self> {
96        let page_size = buf.len() as u64;
97        if page_size < CHECKSUM_SIZE {
98            error!(
99                page_size,
100                required = CHECKSUM_SIZE,
101                "read page smaller than CRC record"
102            );
103            return None;
104        }
105
106        let crc_start_idx = (page_size - CHECKSUM_SIZE) as usize;
107        let mut crc_bytes = &buf[crc_start_idx..];
108        let mut crc_record = Self::read(&mut crc_bytes).expect("CRC record read should not fail");
109        let (len, crc) = crc_record.get_crc();
110
111        // Validate that len is in the valid range [1, logical_page_size].
112        // A page with len=0 is invalid (e.g., all-zero pages from unwritten data).
113        let len_usize = len as usize;
114        if len_usize == 0 {
115            // Both CRCs have 0 length, so there is no fallback possible.
116            debug!("Invalid CRC: len==0");
117            return None;
118        }
119
120        if len_usize > crc_start_idx {
121            // len is too large so this CRC isn't valid. Fall back to the other CRC.
122            debug!("Invalid CRC: len too long. Using fallback CRC");
123            if crc_record.validate_fallback(buf, crc_start_idx) {
124                return Some(crc_record);
125            }
126            return None;
127        }
128
129        let computed_crc = Crc32::checksum(&buf[..len_usize]);
130        if computed_crc != crc {
131            debug!("Invalid CRC: doesn't match page contents. Using fallback CRC");
132            if crc_record.validate_fallback(buf, crc_start_idx) {
133                return Some(crc_record);
134            }
135            return None;
136        }
137
138        Some(crc_record)
139    }
140
141    /// Attempts to validate a CRC record based on its fallback CRC because the primary CRC failed
142    /// validation. The primary CRC is zeroed in the process. Returns false if the fallback CRC
143    /// fails validation.
144    fn validate_fallback(&mut self, buf: &[u8], crc_start_idx: usize) -> bool {
145        let (len, crc) = self.get_fallback_crc();
146        if len == 0 {
147            // No fallback available (only one CRC was ever written to this page).
148            debug!("Invalid fallback CRC: len==0");
149            return false;
150        }
151
152        let len_usize = len as usize;
153
154        if len_usize > crc_start_idx {
155            // len is too large so this CRC isn't valid.
156            debug!("Invalid fallback CRC: len too long.");
157            return false;
158        }
159
160        let computed_crc = Crc32::checksum(&buf[..len_usize]);
161        if computed_crc != crc {
162            debug!("Invalid fallback CRC: doesn't match page contents.");
163            return false;
164        }
165
166        true
167    }
168
169    /// Returns the CRC record with the longer (authoritative) length, without performing any
170    /// validation. If they both have the same length (which should only happen due to data
171    /// corruption) return the first.
172    const fn get_crc(&self) -> (u16, u32) {
173        if self.len1 >= self.len2 {
174            (self.len1, self.crc1)
175        } else {
176            (self.len2, self.crc2)
177        }
178    }
179
180    /// Zeroes the primary CRC (because we assumed it failed validation) and returns the other. This
181    /// should only be called if the primary CRC failed validation. After this returns, get_crc will
182    /// no longer return the invalid primary CRC.
183    const fn get_fallback_crc(&mut self) -> (u16, u32) {
184        if self.len1 >= self.len2 {
185            // First CRC was primary, and must have been invalid. Zero it and return the second.
186            self.len1 = 0;
187            self.crc1 = 0;
188            (self.len2, self.crc2)
189        } else {
190            // Second CRC was primary, and must have been invalid. Zero it and return the first.
191            self.len2 = 0;
192            self.crc2 = 0;
193            (self.len1, self.crc1)
194        }
195    }
196
197    /// Returns the CRC record in its storage representation.
198    fn to_bytes(&self) -> [u8; CHECKSUM_SIZE as usize] {
199        self.encode_fixed()
200    }
201}
202
203impl Write for Checksum {
204    fn write(&self, buf: &mut impl BufMut) {
205        self.len1.write(buf);
206        self.crc1.write(buf);
207        self.len2.write(buf);
208        self.crc2.write(buf);
209    }
210}
211
212impl CodecRead for Checksum {
213    type Cfg = ();
214
215    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
216        Ok(Self {
217            len1: u16::read(buf)?,
218            crc1: u32::read(buf)?,
219            len2: u16::read(buf)?,
220            crc2: u32::read(buf)?,
221        })
222    }
223}
224
225impl FixedSize for Checksum {
226    const SIZE: usize = 2 * u16::SIZE + 2 * crc32::Digest::SIZE;
227}
228
229#[cfg(feature = "arbitrary")]
230impl arbitrary::Arbitrary<'_> for Checksum {
231    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
232        Ok(Self {
233            len1: u.arbitrary()?,
234            crc1: u.arbitrary()?,
235            len2: u.arbitrary()?,
236            crc2: u.arbitrary()?,
237        })
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    #[test]
246    fn test_crc_record_encode_read_roundtrip() {
247        let record = Checksum {
248            len1: 0x1234,
249            crc1: 0xAABBCCDD,
250            len2: 0x5678,
251            crc2: 0x11223344,
252        };
253
254        let bytes = record.to_bytes();
255        let restored = Checksum::read(&mut &bytes[..]).unwrap();
256
257        assert_eq!(restored.len1, 0x1234);
258        assert_eq!(restored.crc1, 0xAABBCCDD);
259        assert_eq!(restored.len2, 0x5678);
260        assert_eq!(restored.crc2, 0x11223344);
261    }
262
263    #[test]
264    fn test_crc_record_encoding() {
265        let record = Checksum {
266            len1: 0x0102,
267            crc1: 0x03040506,
268            len2: 0x0708,
269            crc2: 0x090A0B0C,
270        };
271
272        let bytes = record.to_bytes();
273        // Verify big-endian encoding
274        assert_eq!(
275            bytes,
276            [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C]
277        );
278    }
279
280    #[test]
281    fn test_crc_record_get_crc_len1_larger() {
282        let record = Checksum {
283            len1: 200,
284            crc1: 0xAAAAAAAA,
285            len2: 100,
286            crc2: 0xBBBBBBBB,
287        };
288
289        let (len, crc) = record.get_crc();
290        assert_eq!(len, 200);
291        assert_eq!(crc, 0xAAAAAAAA);
292    }
293
294    #[test]
295    fn test_crc_record_get_crc_len2_larger() {
296        let record = Checksum {
297            len1: 100,
298            crc1: 0xAAAAAAAA,
299            len2: 200,
300            crc2: 0xBBBBBBBB,
301        };
302
303        let (len, crc) = record.get_crc();
304        assert_eq!(len, 200);
305        assert_eq!(crc, 0xBBBBBBBB);
306    }
307
308    #[test]
309    fn test_crc_record_get_crc_equal_lengths() {
310        // When lengths are equal, len1/crc1 is returned (first slot wins ties).
311        let record = Checksum {
312            len1: 100,
313            crc1: 0xAAAAAAAA,
314            len2: 100,
315            crc2: 0xBBBBBBBB,
316        };
317
318        let (len, crc) = record.get_crc();
319        assert_eq!(len, 100);
320        assert_eq!(crc, 0xAAAAAAAA);
321    }
322
323    #[test]
324    fn test_validate_page_valid() {
325        let logical_page_size = 64usize;
326        let physical_page_size = logical_page_size + Checksum::SIZE;
327        let mut page = vec![0u8; physical_page_size];
328
329        // Write some data
330        let data = b"hello world";
331        page[..data.len()].copy_from_slice(data);
332
333        // Compute CRC of the data portion
334        let crc = Crc32::checksum(&page[..data.len()]);
335        let record = Checksum::new(data.len() as u16, crc);
336
337        // Write the CRC record at the end
338        let crc_start = physical_page_size - Checksum::SIZE;
339        page[crc_start..].copy_from_slice(&record.to_bytes());
340
341        // Validate - should return Some with the Checksum
342        let validated = Checksum::validate_page(&page);
343        assert!(validated.is_some());
344        let (len, _) = validated.unwrap().get_crc();
345        assert_eq!(len as usize, data.len());
346    }
347
348    #[test]
349    fn test_validate_page_invalid_crc() {
350        let logical_page_size = 64usize;
351        let physical_page_size = logical_page_size + Checksum::SIZE;
352        let mut page = vec![0u8; physical_page_size];
353
354        // Write some data
355        let data = b"hello world";
356        page[..data.len()].copy_from_slice(data);
357
358        // Write a record with wrong CRC
359        let wrong_crc = 0xBADBADBA;
360        let record = Checksum::new(data.len() as u16, wrong_crc);
361
362        let crc_start = physical_page_size - Checksum::SIZE;
363        page[crc_start..].copy_from_slice(&record.to_bytes());
364
365        // Should fail validation (return None)
366        let validated = Checksum::validate_page(&page);
367        assert!(validated.is_none());
368    }
369
370    #[test]
371    fn test_validate_page_corrupted_data() {
372        let logical_page_size = 64usize;
373        let physical_page_size = logical_page_size + Checksum::SIZE;
374        let mut page = vec![0u8; physical_page_size];
375
376        // Write some data and compute correct CRC
377        let data = b"hello world";
378        page[..data.len()].copy_from_slice(data);
379        let crc = Crc32::checksum(&page[..data.len()]);
380        let record = Checksum::new(data.len() as u16, crc);
381
382        let crc_start = physical_page_size - Checksum::SIZE;
383        page[crc_start..].copy_from_slice(&record.to_bytes());
384
385        // Corrupt the data
386        page[0] = 0xFF;
387
388        // Should fail validation (return None)
389        let validated = Checksum::validate_page(&page);
390        assert!(validated.is_none());
391    }
392
393    #[test]
394    fn test_validate_page_uses_larger_len() {
395        let logical_page_size = 64usize;
396        let physical_page_size = logical_page_size + Checksum::SIZE;
397        let mut page = vec![0u8; physical_page_size];
398
399        // Write data and compute CRC for the larger portion
400        let data = b"hello world, this is longer";
401        page[..data.len()].copy_from_slice(data);
402        let crc = Crc32::checksum(&page[..data.len()]);
403
404        // Create a record where len2 has the valid CRC for longer data
405        let record = Checksum {
406            len1: 5,
407            crc1: 0xDEADBEEF, // Invalid CRC for shorter data
408            len2: data.len() as u16,
409            crc2: crc,
410        };
411
412        let crc_start = physical_page_size - Checksum::SIZE;
413        page[crc_start..].copy_from_slice(&record.to_bytes());
414
415        // Should validate using len2/crc2 since len2 > len1
416        let validated = Checksum::validate_page(&page);
417        assert!(validated.is_some());
418        let (len, _) = validated.unwrap().get_crc();
419        assert_eq!(len as usize, data.len());
420    }
421
422    #[test]
423    fn test_validate_page_uses_fallback() {
424        let logical_page_size = 64usize;
425        let physical_page_size = logical_page_size + Checksum::SIZE;
426        let mut page = vec![0u8; physical_page_size];
427
428        // Write data
429        let data = b"fallback data";
430        page[..data.len()].copy_from_slice(data);
431        let valid_crc = Crc32::checksum(&page[..data.len()]);
432        let valid_len = data.len() as u16;
433
434        // Create a record where:
435        // len1 is larger (primary) but INVALID
436        // len2 is smaller (fallback) but VALID
437        let record = Checksum {
438            len1: valid_len + 10, // Larger, so it's primary
439            crc1: 0xBAD1DEA,      // Invalid CRC
440            len2: valid_len,      // Smaller, so it's fallback
441            crc2: valid_crc,      // Valid CRC
442        };
443
444        let crc_start = physical_page_size - Checksum::SIZE;
445        page[crc_start..].copy_from_slice(&record.to_bytes());
446
447        // Should validate using the fallback (len2)
448        let validated = Checksum::validate_page(&page);
449
450        assert!(validated.is_some(), "Should have validated using fallback");
451        let validated = validated.unwrap();
452        let (len, crc) = validated.get_crc();
453        assert_eq!(len, valid_len);
454        assert_eq!(crc, valid_crc);
455
456        // Verify that the invalid primary was zeroed out
457        assert_eq!(validated.len1, 0);
458        assert_eq!(validated.crc1, 0);
459    }
460
461    #[test]
462    fn test_validate_page_no_fallback_available() {
463        let logical_page_size = 64usize;
464        let physical_page_size = logical_page_size + Checksum::SIZE;
465        let mut page = vec![0u8; physical_page_size];
466
467        // Write some data
468        let data = b"some data";
469        page[..data.len()].copy_from_slice(data);
470
471        // Create a record where:
472        // len1 > 0 (primary) but with INVALID CRC
473        // len2 = 0 (no fallback available)
474        let record = Checksum {
475            len1: data.len() as u16,
476            crc1: 0xBAD1DEA, // Invalid CRC
477            len2: 0,         // No fallback
478            crc2: 0,
479        };
480
481        let crc_start = physical_page_size - Checksum::SIZE;
482        page[crc_start..].copy_from_slice(&record.to_bytes());
483
484        // Should fail validation since primary is invalid and no fallback exists
485        let validated = Checksum::validate_page(&page);
486        assert!(
487            validated.is_none(),
488            "Should fail when primary is invalid and fallback has len=0"
489        );
490    }
491
492    #[cfg(feature = "arbitrary")]
493    mod conformance {
494        use super::*;
495        use commonware_codec::conformance::CodecConformance;
496
497        commonware_conformance::conformance_tests! {
498            CodecConformance<Checksum>,
499        }
500    }
501}