commonware_runtime/utils/buffer/paged/
mod.rs1use crate::{Blob, Buf, BufMut, Error, IoBuf};
27use commonware_codec::{EncodeFixed, FixedSize, Read as CodecRead, ReadExt, Write};
28use commonware_cryptography::{crc32, Crc32};
29
30mod append;
31mod cache;
32mod read;
33
34pub use append::Append;
35pub use cache::CacheRef;
36pub use read::Replay;
37use tracing::{debug, error};
38
39const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64;
41const CHECKSUM_SLOT_SIZE: usize = u16::SIZE + crc32::Digest::SIZE;
42
43async fn get_page_from_blob(
47 blob: &impl Blob,
48 page_num: u64,
49 logical_page_size: u64,
50) -> Result<IoBuf, Error> {
51 let (page, _) = get_page_with_checksum_from_blob(blob, page_num, logical_page_size).await?;
52 Ok(page)
53}
54
55async fn get_page_with_checksum_from_blob(
57 blob: &impl Blob,
58 page_num: u64,
59 logical_page_size: u64,
60) -> Result<(IoBuf, Checksum), Error> {
61 let physical_page_size = logical_page_size
62 .checked_add(CHECKSUM_SIZE)
63 .ok_or(Error::OffsetOverflow)?;
64 let physical_page_start = page_num
65 .checked_mul(physical_page_size)
66 .ok_or(Error::OffsetOverflow)?;
67
68 let page = blob
69 .read_at(physical_page_start, physical_page_size as usize)
70 .await?
71 .coalesce();
72
73 let Some(record) = Checksum::validate_page(page.as_ref()) else {
74 return Err(Error::InvalidChecksum);
75 };
76 let (len, _) = record.get_crc();
77
78 Ok((page.freeze().slice(..len as usize), record))
79}
80
81#[derive(Clone)]
87struct Checksum {
88 len1: u16,
89 crc1: u32,
90 len2: u16,
91 crc2: u32,
92}
93
94impl Checksum {
95 const fn new(len: u16, crc: u32) -> Self {
98 Self {
99 len1: len,
100 crc1: crc,
101 len2: 0,
102 crc2: 0,
103 }
104 }
105
106 fn validate_page(buf: &[u8]) -> Option<Self> {
111 let page_size = buf.len() as u64;
112 if page_size < CHECKSUM_SIZE {
113 error!(
114 page_size,
115 required = CHECKSUM_SIZE,
116 "read page smaller than CRC record"
117 );
118 return None;
119 }
120
121 let crc_start_idx = (page_size - CHECKSUM_SIZE) as usize;
122 let mut crc_bytes = &buf[crc_start_idx..];
123 let mut crc_record = Self::read(&mut crc_bytes).expect("CRC record read should not fail");
124 let (len, crc) = crc_record.get_crc();
125
126 let len_usize = len as usize;
129 if len_usize == 0 {
130 debug!("Invalid CRC: len==0");
132 return None;
133 }
134
135 if len_usize > crc_start_idx {
136 debug!("Invalid CRC: len too long. Using fallback CRC");
138 if crc_record.validate_fallback(buf, crc_start_idx) {
139 return Some(crc_record);
140 }
141 return None;
142 }
143
144 let computed_crc = Crc32::checksum(&buf[..len_usize]);
145 if computed_crc != crc {
146 debug!("Invalid CRC: doesn't match page contents. Using fallback CRC");
147 if crc_record.validate_fallback(buf, crc_start_idx) {
148 return Some(crc_record);
149 }
150 return None;
151 }
152
153 Some(crc_record)
154 }
155
156 fn validate_fallback(&mut self, buf: &[u8], crc_start_idx: usize) -> bool {
160 let (len, crc) = self.get_fallback_crc();
161 if len == 0 {
162 debug!("Invalid fallback CRC: len==0");
164 return false;
165 }
166
167 let len_usize = len as usize;
168
169 if len_usize > crc_start_idx {
170 debug!("Invalid fallback CRC: len too long.");
172 return false;
173 }
174
175 let computed_crc = Crc32::checksum(&buf[..len_usize]);
176 if computed_crc != crc {
177 debug!("Invalid fallback CRC: doesn't match page contents.");
178 return false;
179 }
180
181 true
182 }
183
184 const fn get_crc(&self) -> (u16, u32) {
188 if self.len1 >= self.len2 {
189 (self.len1, self.crc1)
190 } else {
191 (self.len2, self.crc2)
192 }
193 }
194
195 const fn get_fallback_crc(&mut self) -> (u16, u32) {
199 if self.len1 >= self.len2 {
200 self.len1 = 0;
202 self.crc1 = 0;
203 (self.len2, self.crc2)
204 } else {
205 self.len2 = 0;
207 self.crc2 = 0;
208 (self.len1, self.crc1)
209 }
210 }
211
212 fn to_bytes(&self) -> [u8; CHECKSUM_SIZE as usize] {
214 self.encode_fixed()
215 }
216}
217
218impl Write for Checksum {
219 fn write(&self, buf: &mut impl BufMut) {
220 self.len1.write(buf);
221 self.crc1.write(buf);
222 self.len2.write(buf);
223 self.crc2.write(buf);
224 }
225}
226
227impl CodecRead for Checksum {
228 type Cfg = ();
229
230 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
231 Ok(Self {
232 len1: u16::read(buf)?,
233 crc1: u32::read(buf)?,
234 len2: u16::read(buf)?,
235 crc2: u32::read(buf)?,
236 })
237 }
238}
239
240impl FixedSize for Checksum {
241 const SIZE: usize = 2 * u16::SIZE + 2 * crc32::Digest::SIZE;
242}
243
244#[cfg(feature = "arbitrary")]
245impl arbitrary::Arbitrary<'_> for Checksum {
246 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
247 Ok(Self {
248 len1: u.arbitrary()?,
249 crc1: u.arbitrary()?,
250 len2: u.arbitrary()?,
251 crc2: u.arbitrary()?,
252 })
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259
260 #[test]
261 fn test_crc_record_encode_read_roundtrip() {
262 let record = Checksum {
263 len1: 0x1234,
264 crc1: 0xAABBCCDD,
265 len2: 0x5678,
266 crc2: 0x11223344,
267 };
268
269 let bytes = record.to_bytes();
270 let restored = Checksum::read(&mut &bytes[..]).unwrap();
271
272 assert_eq!(restored.len1, 0x1234);
273 assert_eq!(restored.crc1, 0xAABBCCDD);
274 assert_eq!(restored.len2, 0x5678);
275 assert_eq!(restored.crc2, 0x11223344);
276 }
277
278 #[test]
279 fn test_crc_record_encoding() {
280 let record = Checksum {
281 len1: 0x0102,
282 crc1: 0x03040506,
283 len2: 0x0708,
284 crc2: 0x090A0B0C,
285 };
286
287 let bytes = record.to_bytes();
288 assert_eq!(
290 bytes,
291 [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C]
292 );
293 }
294
295 #[test]
296 fn test_crc_record_get_crc_len1_larger() {
297 let record = Checksum {
298 len1: 200,
299 crc1: 0xAAAAAAAA,
300 len2: 100,
301 crc2: 0xBBBBBBBB,
302 };
303
304 let (len, crc) = record.get_crc();
305 assert_eq!(len, 200);
306 assert_eq!(crc, 0xAAAAAAAA);
307 }
308
309 #[test]
310 fn test_crc_record_get_crc_len2_larger() {
311 let record = Checksum {
312 len1: 100,
313 crc1: 0xAAAAAAAA,
314 len2: 200,
315 crc2: 0xBBBBBBBB,
316 };
317
318 let (len, crc) = record.get_crc();
319 assert_eq!(len, 200);
320 assert_eq!(crc, 0xBBBBBBBB);
321 }
322
323 #[test]
324 fn test_crc_record_get_crc_equal_lengths() {
325 let record = Checksum {
327 len1: 100,
328 crc1: 0xAAAAAAAA,
329 len2: 100,
330 crc2: 0xBBBBBBBB,
331 };
332
333 let (len, crc) = record.get_crc();
334 assert_eq!(len, 100);
335 assert_eq!(crc, 0xAAAAAAAA);
336 }
337
338 #[test]
339 fn test_validate_page_valid() {
340 let logical_page_size = 64usize;
341 let physical_page_size = logical_page_size + Checksum::SIZE;
342 let mut page = vec![0u8; physical_page_size];
343
344 let data = b"hello world";
346 page[..data.len()].copy_from_slice(data);
347
348 let crc = Crc32::checksum(&page[..data.len()]);
350 let record = Checksum::new(data.len() as u16, crc);
351
352 let crc_start = physical_page_size - Checksum::SIZE;
354 page[crc_start..].copy_from_slice(&record.to_bytes());
355
356 let validated = Checksum::validate_page(&page);
358 assert!(validated.is_some());
359 let (len, _) = validated.unwrap().get_crc();
360 assert_eq!(len as usize, data.len());
361 }
362
363 #[test]
364 fn test_validate_page_invalid_crc() {
365 let logical_page_size = 64usize;
366 let physical_page_size = logical_page_size + Checksum::SIZE;
367 let mut page = vec![0u8; physical_page_size];
368
369 let data = b"hello world";
371 page[..data.len()].copy_from_slice(data);
372
373 let wrong_crc = 0xBADBADBA;
375 let record = Checksum::new(data.len() as u16, wrong_crc);
376
377 let crc_start = physical_page_size - Checksum::SIZE;
378 page[crc_start..].copy_from_slice(&record.to_bytes());
379
380 let validated = Checksum::validate_page(&page);
382 assert!(validated.is_none());
383 }
384
385 #[test]
386 fn test_validate_page_corrupted_data() {
387 let logical_page_size = 64usize;
388 let physical_page_size = logical_page_size + Checksum::SIZE;
389 let mut page = vec![0u8; physical_page_size];
390
391 let data = b"hello world";
393 page[..data.len()].copy_from_slice(data);
394 let crc = Crc32::checksum(&page[..data.len()]);
395 let record = Checksum::new(data.len() as u16, crc);
396
397 let crc_start = physical_page_size - Checksum::SIZE;
398 page[crc_start..].copy_from_slice(&record.to_bytes());
399
400 page[0] = 0xFF;
402
403 let validated = Checksum::validate_page(&page);
405 assert!(validated.is_none());
406 }
407
408 #[test]
409 fn test_validate_page_uses_larger_len() {
410 let logical_page_size = 64usize;
411 let physical_page_size = logical_page_size + Checksum::SIZE;
412 let mut page = vec![0u8; physical_page_size];
413
414 let data = b"hello world, this is longer";
416 page[..data.len()].copy_from_slice(data);
417 let crc = Crc32::checksum(&page[..data.len()]);
418
419 let record = Checksum {
421 len1: 5,
422 crc1: 0xDEADBEEF, len2: data.len() as u16,
424 crc2: crc,
425 };
426
427 let crc_start = physical_page_size - Checksum::SIZE;
428 page[crc_start..].copy_from_slice(&record.to_bytes());
429
430 let validated = Checksum::validate_page(&page);
432 assert!(validated.is_some());
433 let (len, _) = validated.unwrap().get_crc();
434 assert_eq!(len as usize, data.len());
435 }
436
437 #[test]
438 fn test_validate_page_uses_fallback() {
439 let logical_page_size = 64usize;
440 let physical_page_size = logical_page_size + Checksum::SIZE;
441 let mut page = vec![0u8; physical_page_size];
442
443 let data = b"fallback data";
445 page[..data.len()].copy_from_slice(data);
446 let valid_crc = Crc32::checksum(&page[..data.len()]);
447 let valid_len = data.len() as u16;
448
449 let record = Checksum {
453 len1: valid_len + 10, crc1: 0xBAD1DEA, len2: valid_len, crc2: valid_crc, };
458
459 let crc_start = physical_page_size - Checksum::SIZE;
460 page[crc_start..].copy_from_slice(&record.to_bytes());
461
462 let validated = Checksum::validate_page(&page);
464
465 assert!(validated.is_some(), "Should have validated using fallback");
466 let validated = validated.unwrap();
467 let (len, crc) = validated.get_crc();
468 assert_eq!(len, valid_len);
469 assert_eq!(crc, valid_crc);
470
471 assert_eq!(validated.len1, 0);
473 assert_eq!(validated.crc1, 0);
474 }
475
476 #[test]
477 fn test_validate_page_no_fallback_available() {
478 let logical_page_size = 64usize;
479 let physical_page_size = logical_page_size + Checksum::SIZE;
480 let mut page = vec![0u8; physical_page_size];
481
482 let data = b"some data";
484 page[..data.len()].copy_from_slice(data);
485
486 let record = Checksum {
490 len1: data.len() as u16,
491 crc1: 0xBAD1DEA, len2: 0, crc2: 0,
494 };
495
496 let crc_start = physical_page_size - Checksum::SIZE;
497 page[crc_start..].copy_from_slice(&record.to_bytes());
498
499 let validated = Checksum::validate_page(&page);
501 assert!(
502 validated.is_none(),
503 "Should fail when primary is invalid and fallback has len=0"
504 );
505 }
506
507 #[cfg(feature = "arbitrary")]
508 mod conformance {
509 use super::*;
510 use commonware_codec::conformance::CodecConformance;
511
512 commonware_conformance::conformance_tests! {
513 CodecConformance<Checksum>,
514 }
515 }
516}