use crate::{Blob, Buf, BufMut, Error, IoBuf};
use commonware_codec::{EncodeFixed, FixedSize, Read as CodecRead, ReadExt, Write};
use commonware_cryptography::{crc32, Crc32};
mod append;
mod cache;
mod read;
pub use append::Append;
pub use cache::CacheRef;
pub use read::Replay;
use tracing::{debug, error};
const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64;
async fn get_page_from_blob(
blob: &impl Blob,
page_num: u64,
logical_page_size: u64,
) -> Result<IoBuf, Error> {
let physical_page_size = logical_page_size + CHECKSUM_SIZE;
let physical_page_start = page_num * physical_page_size;
let page = blob
.read_at(physical_page_start, physical_page_size as usize)
.await?
.coalesce();
let Some(record) = Checksum::validate_page(page.as_ref()) else {
return Err(Error::InvalidChecksum);
};
let (len, _) = record.get_crc();
Ok(page.freeze().slice(..len as usize))
}
#[derive(Clone)]
struct Checksum {
len1: u16,
crc1: u32,
len2: u16,
crc2: u32,
}
impl Checksum {
const fn new(len: u16, crc: u32) -> Self {
Self {
len1: len,
crc1: crc,
len2: 0,
crc2: 0,
}
}
fn validate_page(buf: &[u8]) -> Option<Self> {
let page_size = buf.len() as u64;
if page_size < CHECKSUM_SIZE {
error!(
page_size,
required = CHECKSUM_SIZE,
"read page smaller than CRC record"
);
return None;
}
let crc_start_idx = (page_size - CHECKSUM_SIZE) as usize;
let mut crc_bytes = &buf[crc_start_idx..];
let mut crc_record = Self::read(&mut crc_bytes).expect("CRC record read should not fail");
let (len, crc) = crc_record.get_crc();
let len_usize = len as usize;
if len_usize == 0 {
debug!("Invalid CRC: len==0");
return None;
}
if len_usize > crc_start_idx {
debug!("Invalid CRC: len too long. Using fallback CRC");
if crc_record.validate_fallback(buf, crc_start_idx) {
return Some(crc_record);
}
return None;
}
let computed_crc = Crc32::checksum(&buf[..len_usize]);
if computed_crc != crc {
debug!("Invalid CRC: doesn't match page contents. Using fallback CRC");
if crc_record.validate_fallback(buf, crc_start_idx) {
return Some(crc_record);
}
return None;
}
Some(crc_record)
}
fn validate_fallback(&mut self, buf: &[u8], crc_start_idx: usize) -> bool {
let (len, crc) = self.get_fallback_crc();
if len == 0 {
debug!("Invalid fallback CRC: len==0");
return false;
}
let len_usize = len as usize;
if len_usize > crc_start_idx {
debug!("Invalid fallback CRC: len too long.");
return false;
}
let computed_crc = Crc32::checksum(&buf[..len_usize]);
if computed_crc != crc {
debug!("Invalid fallback CRC: doesn't match page contents.");
return false;
}
true
}
const fn get_crc(&self) -> (u16, u32) {
if self.len1 >= self.len2 {
(self.len1, self.crc1)
} else {
(self.len2, self.crc2)
}
}
const fn get_fallback_crc(&mut self) -> (u16, u32) {
if self.len1 >= self.len2 {
self.len1 = 0;
self.crc1 = 0;
(self.len2, self.crc2)
} else {
self.len2 = 0;
self.crc2 = 0;
(self.len1, self.crc1)
}
}
fn to_bytes(&self) -> [u8; CHECKSUM_SIZE as usize] {
self.encode_fixed()
}
}
impl Write for Checksum {
fn write(&self, buf: &mut impl BufMut) {
self.len1.write(buf);
self.crc1.write(buf);
self.len2.write(buf);
self.crc2.write(buf);
}
}
impl CodecRead for Checksum {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
Ok(Self {
len1: u16::read(buf)?,
crc1: u32::read(buf)?,
len2: u16::read(buf)?,
crc2: u32::read(buf)?,
})
}
}
impl FixedSize for Checksum {
const SIZE: usize = 2 * u16::SIZE + 2 * crc32::Digest::SIZE;
}
#[cfg(feature = "arbitrary")]
impl arbitrary::Arbitrary<'_> for Checksum {
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
Ok(Self {
len1: u.arbitrary()?,
crc1: u.arbitrary()?,
len2: u.arbitrary()?,
crc2: u.arbitrary()?,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_crc_record_encode_read_roundtrip() {
let record = Checksum {
len1: 0x1234,
crc1: 0xAABBCCDD,
len2: 0x5678,
crc2: 0x11223344,
};
let bytes = record.to_bytes();
let restored = Checksum::read(&mut &bytes[..]).unwrap();
assert_eq!(restored.len1, 0x1234);
assert_eq!(restored.crc1, 0xAABBCCDD);
assert_eq!(restored.len2, 0x5678);
assert_eq!(restored.crc2, 0x11223344);
}
#[test]
fn test_crc_record_encoding() {
let record = Checksum {
len1: 0x0102,
crc1: 0x03040506,
len2: 0x0708,
crc2: 0x090A0B0C,
};
let bytes = record.to_bytes();
assert_eq!(
bytes,
[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C]
);
}
#[test]
fn test_crc_record_get_crc_len1_larger() {
let record = Checksum {
len1: 200,
crc1: 0xAAAAAAAA,
len2: 100,
crc2: 0xBBBBBBBB,
};
let (len, crc) = record.get_crc();
assert_eq!(len, 200);
assert_eq!(crc, 0xAAAAAAAA);
}
#[test]
fn test_crc_record_get_crc_len2_larger() {
let record = Checksum {
len1: 100,
crc1: 0xAAAAAAAA,
len2: 200,
crc2: 0xBBBBBBBB,
};
let (len, crc) = record.get_crc();
assert_eq!(len, 200);
assert_eq!(crc, 0xBBBBBBBB);
}
#[test]
fn test_crc_record_get_crc_equal_lengths() {
let record = Checksum {
len1: 100,
crc1: 0xAAAAAAAA,
len2: 100,
crc2: 0xBBBBBBBB,
};
let (len, crc) = record.get_crc();
assert_eq!(len, 100);
assert_eq!(crc, 0xAAAAAAAA);
}
#[test]
fn test_validate_page_valid() {
let logical_page_size = 64usize;
let physical_page_size = logical_page_size + Checksum::SIZE;
let mut page = vec![0u8; physical_page_size];
let data = b"hello world";
page[..data.len()].copy_from_slice(data);
let crc = Crc32::checksum(&page[..data.len()]);
let record = Checksum::new(data.len() as u16, crc);
let crc_start = physical_page_size - Checksum::SIZE;
page[crc_start..].copy_from_slice(&record.to_bytes());
let validated = Checksum::validate_page(&page);
assert!(validated.is_some());
let (len, _) = validated.unwrap().get_crc();
assert_eq!(len as usize, data.len());
}
#[test]
fn test_validate_page_invalid_crc() {
let logical_page_size = 64usize;
let physical_page_size = logical_page_size + Checksum::SIZE;
let mut page = vec![0u8; physical_page_size];
let data = b"hello world";
page[..data.len()].copy_from_slice(data);
let wrong_crc = 0xBADBADBA;
let record = Checksum::new(data.len() as u16, wrong_crc);
let crc_start = physical_page_size - Checksum::SIZE;
page[crc_start..].copy_from_slice(&record.to_bytes());
let validated = Checksum::validate_page(&page);
assert!(validated.is_none());
}
#[test]
fn test_validate_page_corrupted_data() {
let logical_page_size = 64usize;
let physical_page_size = logical_page_size + Checksum::SIZE;
let mut page = vec![0u8; physical_page_size];
let data = b"hello world";
page[..data.len()].copy_from_slice(data);
let crc = Crc32::checksum(&page[..data.len()]);
let record = Checksum::new(data.len() as u16, crc);
let crc_start = physical_page_size - Checksum::SIZE;
page[crc_start..].copy_from_slice(&record.to_bytes());
page[0] = 0xFF;
let validated = Checksum::validate_page(&page);
assert!(validated.is_none());
}
#[test]
fn test_validate_page_uses_larger_len() {
let logical_page_size = 64usize;
let physical_page_size = logical_page_size + Checksum::SIZE;
let mut page = vec![0u8; physical_page_size];
let data = b"hello world, this is longer";
page[..data.len()].copy_from_slice(data);
let crc = Crc32::checksum(&page[..data.len()]);
let record = Checksum {
len1: 5,
crc1: 0xDEADBEEF, len2: data.len() as u16,
crc2: crc,
};
let crc_start = physical_page_size - Checksum::SIZE;
page[crc_start..].copy_from_slice(&record.to_bytes());
let validated = Checksum::validate_page(&page);
assert!(validated.is_some());
let (len, _) = validated.unwrap().get_crc();
assert_eq!(len as usize, data.len());
}
#[test]
fn test_validate_page_uses_fallback() {
let logical_page_size = 64usize;
let physical_page_size = logical_page_size + Checksum::SIZE;
let mut page = vec![0u8; physical_page_size];
let data = b"fallback data";
page[..data.len()].copy_from_slice(data);
let valid_crc = Crc32::checksum(&page[..data.len()]);
let valid_len = data.len() as u16;
let record = Checksum {
len1: valid_len + 10, crc1: 0xBAD1DEA, len2: valid_len, crc2: valid_crc, };
let crc_start = physical_page_size - Checksum::SIZE;
page[crc_start..].copy_from_slice(&record.to_bytes());
let validated = Checksum::validate_page(&page);
assert!(validated.is_some(), "Should have validated using fallback");
let validated = validated.unwrap();
let (len, crc) = validated.get_crc();
assert_eq!(len, valid_len);
assert_eq!(crc, valid_crc);
assert_eq!(validated.len1, 0);
assert_eq!(validated.crc1, 0);
}
#[test]
fn test_validate_page_no_fallback_available() {
let logical_page_size = 64usize;
let physical_page_size = logical_page_size + Checksum::SIZE;
let mut page = vec![0u8; physical_page_size];
let data = b"some data";
page[..data.len()].copy_from_slice(data);
let record = Checksum {
len1: data.len() as u16,
crc1: 0xBAD1DEA, len2: 0, crc2: 0,
};
let crc_start = physical_page_size - Checksum::SIZE;
page[crc_start..].copy_from_slice(&record.to_bytes());
let validated = Checksum::validate_page(&page);
assert!(
validated.is_none(),
"Should fail when primary is invalid and fallback has len=0"
);
}
#[cfg(feature = "arbitrary")]
mod conformance {
use super::*;
use commonware_codec::conformance::CodecConformance;
commonware_conformance::conformance_tests! {
CodecConformance<Checksum>,
}
}
}