Skip to main content

commonware_runtime/utils/buffer/paged/
mod.rs

1//! Blob wrappers for reading and writing data with integrity guarantees, plus a page cache that
2//! manages read caching over the data.
3//!
4//! # Page-oriented structure
5//!
6//! Blob data is stored in _pages_ having a logical `page_size` dictated by the managing page cache.
7//! A _physical page_ consists of `page_size` bytes of data followed by a 12-byte _CRC
8//! record_ containing:
9//!
10//! ```text
11//! | len1 (2 bytes) |  crc1 (4 bytes) | len2 (2 bytes) | crc2 (4 bytes) |
12//! ```
13//!
14//! Two checksums are stored so that partial pages can be re-written without overwriting a valid
15//! checksum for its previously committed contents. A checksum over a page is computed over the
16//! first [0,len) bytes in the page, with all other bytes in the page ignored. Ordinary partial-page
17//! payload writes 0-pad the range [len, page_size), but recovery does not depend on bytes outside
18//! [0,len). A checksum with length 0 is never considered valid. If both checksums are valid for the
19//! page, the one with the larger `len` is considered authoritative. Partial-page shrink first makes
20//! the shorter checksum durable in the alternate slot, then invalidates the old longer checksum.
21//!
22//! A _full_ page is one whose crc stores a len equal to the logical page size. Otherwise the page
23//! is called _partial_. All pages in a blob are full except for the very last page, which can be
24//! full or partial. A partial page's committed prefix remains recoverable while it is rewritten.
25
26use crate::{Blob, Buf, BufMut, Error, IoBuf};
27use commonware_codec::{EncodeFixed, FixedSize, Read as CodecRead, ReadExt, Write};
28use commonware_cryptography::{crc32, Crc32};
29
30mod append;
31mod cache;
32mod read;
33
34pub use append::Append;
35pub use cache::CacheRef;
36pub use read::Replay;
37use tracing::{debug, error};
38
39// A checksum record contains two slots. Each slot stores one u16 length and one CRC.
40const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64;
41const CHECKSUM_SLOT_SIZE: usize = u16::SIZE + crc32::Digest::SIZE;
42
43/// Read the designated page from the underlying blob and return its logical bytes as a vector if it
44/// passes the integrity check, returning error otherwise. Safely handles partial pages. Caller can
45/// check the length of the returned vector to determine if the page was partial vs full.
46async fn get_page_from_blob(
47    blob: &impl Blob,
48    page_num: u64,
49    logical_page_size: u64,
50) -> Result<IoBuf, Error> {
51    let (page, _) = get_page_with_checksum_from_blob(blob, page_num, logical_page_size).await?;
52    Ok(page)
53}
54
55/// Read the designated page and return both its logical bytes and validated checksum record.
56async fn get_page_with_checksum_from_blob(
57    blob: &impl Blob,
58    page_num: u64,
59    logical_page_size: u64,
60) -> Result<(IoBuf, Checksum), Error> {
61    let physical_page_size = logical_page_size
62        .checked_add(CHECKSUM_SIZE)
63        .ok_or(Error::OffsetOverflow)?;
64    let physical_page_start = page_num
65        .checked_mul(physical_page_size)
66        .ok_or(Error::OffsetOverflow)?;
67
68    let page = blob
69        .read_at(physical_page_start, physical_page_size as usize)
70        .await?
71        .coalesce();
72
73    let Some(record) = Checksum::validate_page(page.as_ref()) else {
74        return Err(Error::InvalidChecksum);
75    };
76    let (len, _) = record.get_crc();
77
78    Ok((page.freeze().slice(..len as usize), record))
79}
80
81/// Describes a CRC record stored at the end of a page.
82///
83/// The CRC accompanied by the larger length is the one that should be treated as authoritative for
84/// the page. Two checksums are stored so that partial pages can be written without overwriting a
85/// valid checksum for a previously committed partial page.
86#[derive(Clone)]
87struct Checksum {
88    len1: u16,
89    crc1: u32,
90    len2: u16,
91    crc2: u32,
92}
93
94impl Checksum {
95    /// Create a new CRC record with the given length and CRC.
96    /// The new CRC is stored in the first slot (len1/crc1), with the second slot zeroed.
97    const fn new(len: u16, crc: u32) -> Self {
98        Self {
99            len1: len,
100            crc1: crc,
101            len2: 0,
102            crc2: 0,
103        }
104    }
105
106    /// Return the CRC record for the page if it is valid. The provided slice is assumed to be
107    /// exactly the size of a physical page. The record may not precisely reflect the bytes written
108    /// if what should have been the most recent CRC doesn't validate, in which case it will be
109    /// zeroed and the other CRC used as a fallback.
110    fn validate_page(buf: &[u8]) -> Option<Self> {
111        let page_size = buf.len() as u64;
112        if page_size < CHECKSUM_SIZE {
113            error!(
114                page_size,
115                required = CHECKSUM_SIZE,
116                "read page smaller than CRC record"
117            );
118            return None;
119        }
120
121        let crc_start_idx = (page_size - CHECKSUM_SIZE) as usize;
122        let mut crc_bytes = &buf[crc_start_idx..];
123        let mut crc_record = Self::read(&mut crc_bytes).expect("CRC record read should not fail");
124        let (len, crc) = crc_record.get_crc();
125
126        // Validate that len is in the valid range [1, logical_page_size].
127        // A page with len=0 is invalid (e.g., all-zero pages from unwritten data).
128        let len_usize = len as usize;
129        if len_usize == 0 {
130            // Both CRCs have 0 length, so there is no fallback possible.
131            debug!("Invalid CRC: len==0");
132            return None;
133        }
134
135        if len_usize > crc_start_idx {
136            // len is too large so this CRC isn't valid. Fall back to the other CRC.
137            debug!("Invalid CRC: len too long. Using fallback CRC");
138            if crc_record.validate_fallback(buf, crc_start_idx) {
139                return Some(crc_record);
140            }
141            return None;
142        }
143
144        let computed_crc = Crc32::checksum(&buf[..len_usize]);
145        if computed_crc != crc {
146            debug!("Invalid CRC: doesn't match page contents. Using fallback CRC");
147            if crc_record.validate_fallback(buf, crc_start_idx) {
148                return Some(crc_record);
149            }
150            return None;
151        }
152
153        Some(crc_record)
154    }
155
156    /// Attempts to validate a CRC record based on its fallback CRC because the primary CRC failed
157    /// validation. The primary CRC is zeroed in the process. Returns false if the fallback CRC
158    /// fails validation.
159    fn validate_fallback(&mut self, buf: &[u8], crc_start_idx: usize) -> bool {
160        let (len, crc) = self.get_fallback_crc();
161        if len == 0 {
162            // No fallback available (only one CRC was ever written to this page).
163            debug!("Invalid fallback CRC: len==0");
164            return false;
165        }
166
167        let len_usize = len as usize;
168
169        if len_usize > crc_start_idx {
170            // len is too large so this CRC isn't valid.
171            debug!("Invalid fallback CRC: len too long.");
172            return false;
173        }
174
175        let computed_crc = Crc32::checksum(&buf[..len_usize]);
176        if computed_crc != crc {
177            debug!("Invalid fallback CRC: doesn't match page contents.");
178            return false;
179        }
180
181        true
182    }
183
184    /// Returns the CRC record with the longer (authoritative) length, without performing any
185    /// validation. If they both have the same length (which should only happen due to data
186    /// corruption) return the first.
187    const fn get_crc(&self) -> (u16, u32) {
188        if self.len1 >= self.len2 {
189            (self.len1, self.crc1)
190        } else {
191            (self.len2, self.crc2)
192        }
193    }
194
195    /// Zeroes the primary CRC (because we assumed it failed validation) and returns the other. This
196    /// should only be called if the primary CRC failed validation. After this returns, get_crc will
197    /// no longer return the invalid primary CRC.
198    const fn get_fallback_crc(&mut self) -> (u16, u32) {
199        if self.len1 >= self.len2 {
200            // First CRC was primary, and must have been invalid. Zero it and return the second.
201            self.len1 = 0;
202            self.crc1 = 0;
203            (self.len2, self.crc2)
204        } else {
205            // Second CRC was primary, and must have been invalid. Zero it and return the first.
206            self.len2 = 0;
207            self.crc2 = 0;
208            (self.len1, self.crc1)
209        }
210    }
211
212    /// Returns the CRC record in its storage representation.
213    fn to_bytes(&self) -> [u8; CHECKSUM_SIZE as usize] {
214        self.encode_fixed()
215    }
216}
217
218impl Write for Checksum {
219    fn write(&self, buf: &mut impl BufMut) {
220        self.len1.write(buf);
221        self.crc1.write(buf);
222        self.len2.write(buf);
223        self.crc2.write(buf);
224    }
225}
226
227impl CodecRead for Checksum {
228    type Cfg = ();
229
230    fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
231        Ok(Self {
232            len1: u16::read(buf)?,
233            crc1: u32::read(buf)?,
234            len2: u16::read(buf)?,
235            crc2: u32::read(buf)?,
236        })
237    }
238}
239
240impl FixedSize for Checksum {
241    const SIZE: usize = 2 * u16::SIZE + 2 * crc32::Digest::SIZE;
242}
243
244#[cfg(feature = "arbitrary")]
245impl arbitrary::Arbitrary<'_> for Checksum {
246    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
247        Ok(Self {
248            len1: u.arbitrary()?,
249            crc1: u.arbitrary()?,
250            len2: u.arbitrary()?,
251            crc2: u.arbitrary()?,
252        })
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_crc_record_encode_read_roundtrip() {
262        let record = Checksum {
263            len1: 0x1234,
264            crc1: 0xAABBCCDD,
265            len2: 0x5678,
266            crc2: 0x11223344,
267        };
268
269        let bytes = record.to_bytes();
270        let restored = Checksum::read(&mut &bytes[..]).unwrap();
271
272        assert_eq!(restored.len1, 0x1234);
273        assert_eq!(restored.crc1, 0xAABBCCDD);
274        assert_eq!(restored.len2, 0x5678);
275        assert_eq!(restored.crc2, 0x11223344);
276    }
277
278    #[test]
279    fn test_crc_record_encoding() {
280        let record = Checksum {
281            len1: 0x0102,
282            crc1: 0x03040506,
283            len2: 0x0708,
284            crc2: 0x090A0B0C,
285        };
286
287        let bytes = record.to_bytes();
288        // Verify big-endian encoding
289        assert_eq!(
290            bytes,
291            [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C]
292        );
293    }
294
295    #[test]
296    fn test_crc_record_get_crc_len1_larger() {
297        let record = Checksum {
298            len1: 200,
299            crc1: 0xAAAAAAAA,
300            len2: 100,
301            crc2: 0xBBBBBBBB,
302        };
303
304        let (len, crc) = record.get_crc();
305        assert_eq!(len, 200);
306        assert_eq!(crc, 0xAAAAAAAA);
307    }
308
309    #[test]
310    fn test_crc_record_get_crc_len2_larger() {
311        let record = Checksum {
312            len1: 100,
313            crc1: 0xAAAAAAAA,
314            len2: 200,
315            crc2: 0xBBBBBBBB,
316        };
317
318        let (len, crc) = record.get_crc();
319        assert_eq!(len, 200);
320        assert_eq!(crc, 0xBBBBBBBB);
321    }
322
323    #[test]
324    fn test_crc_record_get_crc_equal_lengths() {
325        // When lengths are equal, len1/crc1 is returned (first slot wins ties).
326        let record = Checksum {
327            len1: 100,
328            crc1: 0xAAAAAAAA,
329            len2: 100,
330            crc2: 0xBBBBBBBB,
331        };
332
333        let (len, crc) = record.get_crc();
334        assert_eq!(len, 100);
335        assert_eq!(crc, 0xAAAAAAAA);
336    }
337
338    #[test]
339    fn test_validate_page_valid() {
340        let logical_page_size = 64usize;
341        let physical_page_size = logical_page_size + Checksum::SIZE;
342        let mut page = vec![0u8; physical_page_size];
343
344        // Write some data
345        let data = b"hello world";
346        page[..data.len()].copy_from_slice(data);
347
348        // Compute CRC of the data portion
349        let crc = Crc32::checksum(&page[..data.len()]);
350        let record = Checksum::new(data.len() as u16, crc);
351
352        // Write the CRC record at the end
353        let crc_start = physical_page_size - Checksum::SIZE;
354        page[crc_start..].copy_from_slice(&record.to_bytes());
355
356        // Validate - should return Some with the Checksum
357        let validated = Checksum::validate_page(&page);
358        assert!(validated.is_some());
359        let (len, _) = validated.unwrap().get_crc();
360        assert_eq!(len as usize, data.len());
361    }
362
363    #[test]
364    fn test_validate_page_invalid_crc() {
365        let logical_page_size = 64usize;
366        let physical_page_size = logical_page_size + Checksum::SIZE;
367        let mut page = vec![0u8; physical_page_size];
368
369        // Write some data
370        let data = b"hello world";
371        page[..data.len()].copy_from_slice(data);
372
373        // Write a record with wrong CRC
374        let wrong_crc = 0xBADBADBA;
375        let record = Checksum::new(data.len() as u16, wrong_crc);
376
377        let crc_start = physical_page_size - Checksum::SIZE;
378        page[crc_start..].copy_from_slice(&record.to_bytes());
379
380        // Should fail validation (return None)
381        let validated = Checksum::validate_page(&page);
382        assert!(validated.is_none());
383    }
384
385    #[test]
386    fn test_validate_page_corrupted_data() {
387        let logical_page_size = 64usize;
388        let physical_page_size = logical_page_size + Checksum::SIZE;
389        let mut page = vec![0u8; physical_page_size];
390
391        // Write some data and compute correct CRC
392        let data = b"hello world";
393        page[..data.len()].copy_from_slice(data);
394        let crc = Crc32::checksum(&page[..data.len()]);
395        let record = Checksum::new(data.len() as u16, crc);
396
397        let crc_start = physical_page_size - Checksum::SIZE;
398        page[crc_start..].copy_from_slice(&record.to_bytes());
399
400        // Corrupt the data
401        page[0] = 0xFF;
402
403        // Should fail validation (return None)
404        let validated = Checksum::validate_page(&page);
405        assert!(validated.is_none());
406    }
407
408    #[test]
409    fn test_validate_page_uses_larger_len() {
410        let logical_page_size = 64usize;
411        let physical_page_size = logical_page_size + Checksum::SIZE;
412        let mut page = vec![0u8; physical_page_size];
413
414        // Write data and compute CRC for the larger portion
415        let data = b"hello world, this is longer";
416        page[..data.len()].copy_from_slice(data);
417        let crc = Crc32::checksum(&page[..data.len()]);
418
419        // Create a record where len2 has the valid CRC for longer data
420        let record = Checksum {
421            len1: 5,
422            crc1: 0xDEADBEEF, // Invalid CRC for shorter data
423            len2: data.len() as u16,
424            crc2: crc,
425        };
426
427        let crc_start = physical_page_size - Checksum::SIZE;
428        page[crc_start..].copy_from_slice(&record.to_bytes());
429
430        // Should validate using len2/crc2 since len2 > len1
431        let validated = Checksum::validate_page(&page);
432        assert!(validated.is_some());
433        let (len, _) = validated.unwrap().get_crc();
434        assert_eq!(len as usize, data.len());
435    }
436
437    #[test]
438    fn test_validate_page_uses_fallback() {
439        let logical_page_size = 64usize;
440        let physical_page_size = logical_page_size + Checksum::SIZE;
441        let mut page = vec![0u8; physical_page_size];
442
443        // Write data
444        let data = b"fallback data";
445        page[..data.len()].copy_from_slice(data);
446        let valid_crc = Crc32::checksum(&page[..data.len()]);
447        let valid_len = data.len() as u16;
448
449        // Create a record where:
450        // len1 is larger (primary) but INVALID
451        // len2 is smaller (fallback) but VALID
452        let record = Checksum {
453            len1: valid_len + 10, // Larger, so it's primary
454            crc1: 0xBAD1DEA,      // Invalid CRC
455            len2: valid_len,      // Smaller, so it's fallback
456            crc2: valid_crc,      // Valid CRC
457        };
458
459        let crc_start = physical_page_size - Checksum::SIZE;
460        page[crc_start..].copy_from_slice(&record.to_bytes());
461
462        // Should validate using the fallback (len2)
463        let validated = Checksum::validate_page(&page);
464
465        assert!(validated.is_some(), "Should have validated using fallback");
466        let validated = validated.unwrap();
467        let (len, crc) = validated.get_crc();
468        assert_eq!(len, valid_len);
469        assert_eq!(crc, valid_crc);
470
471        // Verify that the invalid primary was zeroed out
472        assert_eq!(validated.len1, 0);
473        assert_eq!(validated.crc1, 0);
474    }
475
476    #[test]
477    fn test_validate_page_no_fallback_available() {
478        let logical_page_size = 64usize;
479        let physical_page_size = logical_page_size + Checksum::SIZE;
480        let mut page = vec![0u8; physical_page_size];
481
482        // Write some data
483        let data = b"some data";
484        page[..data.len()].copy_from_slice(data);
485
486        // Create a record where:
487        // len1 > 0 (primary) but with INVALID CRC
488        // len2 = 0 (no fallback available)
489        let record = Checksum {
490            len1: data.len() as u16,
491            crc1: 0xBAD1DEA, // Invalid CRC
492            len2: 0,         // No fallback
493            crc2: 0,
494        };
495
496        let crc_start = physical_page_size - Checksum::SIZE;
497        page[crc_start..].copy_from_slice(&record.to_bytes());
498
499        // Should fail validation since primary is invalid and no fallback exists
500        let validated = Checksum::validate_page(&page);
501        assert!(
502            validated.is_none(),
503            "Should fail when primary is invalid and fallback has len=0"
504        );
505    }
506
507    #[cfg(feature = "arbitrary")]
508    mod conformance {
509        use super::*;
510        use commonware_codec::conformance::CodecConformance;
511
512        commonware_conformance::conformance_tests! {
513            CodecConformance<Checksum>,
514        }
515    }
516}