1use super::read::{PageReader, Replay};
17use crate::{
18 buffer::{
19 paged::{CacheRef, Checksum, CHECKSUM_SIZE},
20 tip::Buffer,
21 },
22 Blob, Error, IoBuf, IoBufMut, IoBufs,
23};
24use bytes::BufMut;
25use commonware_cryptography::Crc32;
26use commonware_utils::sync::{AsyncRwLock, AsyncRwLockWriteGuard};
27use std::{
28 num::{NonZeroU16, NonZeroUsize},
29 sync::Arc,
30};
31use tracing::warn;
32
33#[derive(Clone, Copy)]
35enum ProtectedCrc {
36 First,
37 Second,
38}
39
40#[derive(Clone)]
42struct BlobState<B: Blob> {
43 blob: B,
44
45 current_page: u64,
47
48 partial_page_state: Option<Checksum>,
51}
52
53#[derive(Clone)]
56pub struct Append<B: Blob> {
57 blob_state: Arc<AsyncRwLock<BlobState<B>>>,
59
60 id: u64,
62
63 cache_ref: CacheRef,
65
66 buffer: Arc<AsyncRwLock<Buffer>>,
69}
70
71fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
74 let floor = page_size as usize * 2;
75 if capacity < floor {
76 warn!(
77 floor,
78 "requested buffer capacity is too low, increasing it to floor"
79 );
80 floor
81 } else {
82 capacity
83 }
84}
85
86impl<B: Blob> Append<B> {
87 pub async fn new(
92 blob: B,
93 original_blob_size: u64,
94 capacity: usize,
95 cache_ref: CacheRef,
96 ) -> Result<Self, Error> {
97 let (partial_page_state, pages, invalid_data_found) =
98 Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
99 if invalid_data_found {
100 let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
102 warn!(
103 original_blob_size,
104 new_blob_size, "truncating blob to remove invalid data"
105 );
106 blob.resize(new_blob_size).await?;
107 blob.sync().await?;
108 }
109
110 let capacity = capacity_with_floor(capacity, cache_ref.page_size());
111
112 let (blob_state, partial_data) = match partial_page_state {
113 Some((partial_page, crc_record)) => (
114 BlobState {
115 blob,
116 current_page: pages - 1,
117 partial_page_state: Some(crc_record),
118 },
119 Some(partial_page),
120 ),
121 None => (
122 BlobState {
123 blob,
124 current_page: pages,
125 partial_page_state: None,
126 },
127 None,
128 ),
129 };
130
131 let buffer = Buffer::from(
132 blob_state.current_page * cache_ref.page_size(),
133 partial_data.unwrap_or_default(),
134 capacity,
135 cache_ref.pool().clone(),
136 );
137
138 Ok(Self {
139 blob_state: Arc::new(AsyncRwLock::new(blob_state)),
140 id: cache_ref.next_id(),
141 cache_ref,
142 buffer: Arc::new(AsyncRwLock::new(buffer)),
143 })
144 }
145
146 async fn read_last_valid_page(
164 blob: &B,
165 blob_size: u64,
166 page_size: u64,
167 ) -> Result<(Option<(IoBuf, Checksum)>, u64, bool), Error> {
168 let physical_page_size = page_size + CHECKSUM_SIZE;
169 let partial_bytes = blob_size % physical_page_size;
170 let mut last_page_end = blob_size - partial_bytes;
171
172 let mut invalid_data_found = partial_bytes != 0;
175
176 while last_page_end != 0 {
177 let page_start = last_page_end - physical_page_size;
179 let buf = blob
180 .read_at(page_start, physical_page_size as usize)
181 .await?
182 .coalesce()
183 .freeze();
184
185 match Checksum::validate_page(buf.as_ref()) {
186 Some(crc_record) => {
187 let (len, _) = crc_record.get_crc();
189 let len = len as u64;
190 if len != page_size {
191 let logical_bytes = buf.slice(..len as usize);
193 return Ok((
194 Some((logical_bytes, crc_record)),
195 last_page_end / physical_page_size,
196 invalid_data_found,
197 ));
198 }
199 return Ok((None, last_page_end / physical_page_size, invalid_data_found));
201 }
202 None => {
203 last_page_end = page_start;
205 invalid_data_found = true;
206 }
207 }
208 }
209
210 Ok((None, 0, invalid_data_found))
212 }
213
214 pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
216 let mut buffer = self.buffer.write().await;
217
218 if !buffer.append(buf) {
219 return Ok(());
220 }
221
222 self.flush_internal(buffer, false).await
224 }
225
226 async fn flush_internal(
230 &self,
231 mut buf_guard: AsyncRwLockWriteGuard<'_, Buffer>,
232 write_partial_page: bool,
233 ) -> Result<(), Error> {
234 let buffer = &mut *buf_guard;
235
236 let remaining_byte_count = self
239 .cache_ref
240 .cache(self.id, buffer.as_ref(), buffer.offset);
241
242 let old_partial_page_state = {
246 let blob_state = self.blob_state.read().await;
247 blob_state.partial_page_state.clone()
248 };
249
250 let (physical_pages, partial_page_state) = self.to_physical_pages(
253 &*buffer,
254 write_partial_page,
255 old_partial_page_state.as_ref(),
256 );
257
258 if physical_pages.is_empty() {
260 return Ok(());
261 }
262
263 if remaining_byte_count == 0 {
267 let _ = buffer
268 .take()
269 .expect("take must succeed when flush drains all buffered bytes");
270 } else {
271 let bytes_to_drain = buffer.len() - remaining_byte_count;
272 buffer.drop_prefix(bytes_to_drain);
273 buffer.offset += bytes_to_drain as u64;
274 }
275 let new_offset = buffer.offset;
276
277 let mut blob_state = self.blob_state.write().await;
280
281 drop(buf_guard);
284
285 let logical_page_size = self.cache_ref.page_size() as usize;
286 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
287 let write_at_offset = blob_state.current_page * physical_page_size as u64;
288
289 let total_pages_in_buffer = physical_pages.len() / physical_page_size;
293 let full_pages_written = if partial_page_state.is_some() {
294 total_pages_in_buffer.saturating_sub(1)
295 } else {
296 total_pages_in_buffer
297 };
298
299 let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
301
302 blob_state.current_page += full_pages_written as u64;
306 blob_state.partial_page_state = partial_page_state;
307
308 assert_eq!(
310 blob_state.current_page * self.cache_ref.page_size(),
311 new_offset
312 );
313
314 if let Some((prefix_len, protected_crc)) = protected_regions {
317 match protected_crc {
318 ProtectedCrc::First => {
319 if prefix_len < logical_page_size {
322 let payload = physical_pages.slice(prefix_len..logical_page_size);
323 blob_state
324 .blob
325 .write_at(write_at_offset + prefix_len as u64, payload)
326 .await?;
327 }
328 let second_crc_start = logical_page_size + 6;
330 let payload = physical_pages.slice(second_crc_start..);
331 blob_state
332 .blob
333 .write_at(write_at_offset + second_crc_start as u64, payload)
334 .await?;
335 }
336 ProtectedCrc::Second => {
337 let first_crc_end = logical_page_size + 6;
340 if prefix_len < first_crc_end {
341 let payload = physical_pages.slice(prefix_len..first_crc_end);
342 blob_state
343 .blob
344 .write_at(write_at_offset + prefix_len as u64, payload)
345 .await?;
346 }
347 if physical_pages.len() > physical_page_size {
349 let payload = physical_pages.slice(physical_page_size..);
350 blob_state
351 .blob
352 .write_at(write_at_offset + physical_page_size as u64, payload)
353 .await?;
354 }
355 }
356 }
357 } else {
358 blob_state
360 .blob
361 .write_at(write_at_offset, physical_pages)
362 .await?;
363 }
364
365 Ok(())
366 }
367
368 pub async fn size(&self) -> u64 {
370 let buffer = self.buffer.read().await;
371 buffer.size()
372 }
373
374 pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
376 let mut buf = unsafe { self.cache_ref.pool().alloc_len(len) };
379 self.read_into(buf.as_mut(), offset).await?;
380 Ok(buf.into())
381 }
382
383 pub async fn read_up_to(
392 &self,
393 logical_offset: u64,
394 len: usize,
395 bufs: impl Into<IoBufMut> + Send,
396 ) -> Result<(IoBufMut, usize), Error> {
397 let mut bufs = bufs.into();
398 if len == 0 {
399 bufs.truncate(0);
400 return Ok((bufs, 0));
401 }
402 let blob_size = self.size().await;
403 let available = (blob_size.saturating_sub(logical_offset) as usize).min(len);
404 if available == 0 {
405 return Err(Error::BlobInsufficientLength);
406 }
407 unsafe { bufs.set_len(available) };
409 self.read_into(bufs.as_mut(), logical_offset).await?;
410
411 Ok((bufs, available))
412 }
413
414 pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
419 let end_offset = logical_offset
421 .checked_add(buf.len() as u64)
422 .ok_or(Error::OffsetOverflow)?;
423
424 let buffer = self.buffer.read().await;
426
427 if end_offset > buffer.size() {
429 return Err(Error::BlobInsufficientLength);
430 }
431
432 let remaining = if end_offset <= buffer.offset {
434 buf.len()
436 } else {
437 let overlap_start = buffer.offset.max(logical_offset);
439 let dst_start = (overlap_start - logical_offset) as usize;
440 let src_start = (overlap_start - buffer.offset) as usize;
441 let copied = buf.len() - dst_start;
442 buf[dst_start..].copy_from_slice(&buffer.as_ref()[src_start..src_start + copied]);
443 dst_start
444 };
445
446 drop(buffer);
448
449 if remaining == 0 {
450 return Ok(());
451 }
452
453 let cached = self
456 .cache_ref
457 .read_cached(self.id, &mut buf[..remaining], logical_offset);
458
459 if cached == remaining {
460 return Ok(());
462 }
463
464 let blob_guard = self.blob_state.read().await;
467
468 let uncached_offset = logical_offset + cached as u64;
470 let uncached_len = remaining - cached;
471 self.cache_ref
472 .read(
473 &blob_guard.blob,
474 self.id,
475 &mut buf[cached..cached + uncached_len],
476 uncached_offset,
477 )
478 .await
479 }
480
481 fn identify_protected_regions(
492 partial_page_state: Option<&Checksum>,
493 ) -> Option<(usize, ProtectedCrc)> {
494 let crc_record = partial_page_state?;
495 let (old_len, _) = crc_record.get_crc();
496 let protected_crc = if crc_record.len1 >= crc_record.len2 {
498 ProtectedCrc::First
499 } else {
500 ProtectedCrc::Second
501 };
502 Some((old_len as usize, protected_crc))
503 }
504
505 fn to_physical_pages(
517 &self,
518 buffer: &Buffer,
519 include_partial_page: bool,
520 old_crc_record: Option<&Checksum>,
521 ) -> (IoBuf, Option<Checksum>) {
522 let logical_page_size = self.cache_ref.page_size() as usize;
523 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
524 let pages_to_write = buffer.len() / logical_page_size;
525 let max_pages_to_write = pages_to_write + if include_partial_page { 1 } else { 0 };
526 let mut write_buffer = self
527 .cache_ref
528 .pool()
529 .alloc(max_pages_to_write * physical_page_size);
530 let buffer_data = buffer.as_ref();
531
532 for page in 0..pages_to_write {
534 let start_read_idx = page * logical_page_size;
535 let end_read_idx = start_read_idx + logical_page_size;
536 let logical_page = &buffer_data[start_read_idx..end_read_idx];
537 write_buffer.put_slice(logical_page);
538
539 let crc = Crc32::checksum(logical_page);
540 let logical_page_size_u16 =
541 u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
542
543 let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
546 Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
547 } else {
548 Checksum::new(logical_page_size_u16, crc)
549 };
550 write_buffer.put_slice(&crc_record.to_bytes());
551 }
552
553 if !include_partial_page {
554 return (write_buffer.freeze(), None);
555 }
556
557 let partial_page = &buffer_data[pages_to_write * logical_page_size..];
558 if partial_page.is_empty() {
559 return (write_buffer.freeze(), None);
561 }
562
563 if pages_to_write == 0 {
566 if let Some(old_crc) = old_crc_record {
567 let (old_len, _) = old_crc.get_crc();
568 if partial_page.len() == old_len as usize {
569 return (write_buffer.freeze(), None);
570 }
571 }
572 }
573 write_buffer.put_slice(partial_page);
574 let partial_len = partial_page.len();
575 let crc = Crc32::checksum(partial_page);
576
577 let zero_count = logical_page_size - partial_len;
579 if zero_count > 0 {
580 write_buffer.put_bytes(0, zero_count);
581 }
582
583 let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
586 Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
587 } else {
588 Checksum::new(partial_len as u16, crc)
589 };
590
591 write_buffer.put_slice(&crc_record.to_bytes());
592
593 (write_buffer.freeze(), Some(crc_record))
596 }
597
598 const fn build_crc_record_preserving_old(
601 new_len: u16,
602 new_crc: u32,
603 old_crc: &Checksum,
604 ) -> Checksum {
605 let (old_len, old_crc_val) = old_crc.get_crc();
606 if old_crc.len1 >= old_crc.len2 {
608 Checksum {
610 len1: old_len,
611 crc1: old_crc_val,
612 len2: new_len,
613 crc2: new_crc,
614 }
615 } else {
616 Checksum {
618 len1: new_len,
619 crc1: new_crc,
620 len2: old_len,
621 crc2: old_crc_val,
622 }
623 }
624 }
625
626 pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
631 let logical_page_size = self.cache_ref.page_size();
632 let logical_page_size_nz =
633 NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
634
635 {
637 let buf_guard = self.buffer.write().await;
638 self.flush_internal(buf_guard, true).await?;
639 }
640
641 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
643 let prefetch_pages = buffer_size.get() / physical_page_size as usize;
644 let prefetch_pages = prefetch_pages.max(1); let blob_guard = self.blob_state.read().await;
646
647 let (physical_blob_size, logical_blob_size) =
649 blob_guard.partial_page_state.as_ref().map_or_else(
650 || {
651 let physical = physical_page_size * blob_guard.current_page;
653 let logical = logical_page_size * blob_guard.current_page;
654 (physical, logical)
655 },
656 |crc_record| {
657 let (partial_len, _) = crc_record.get_crc();
659 let partial_len = partial_len as u64;
660 let physical = physical_page_size * (blob_guard.current_page + 1);
662 let logical = logical_page_size * blob_guard.current_page + partial_len;
664 (physical, logical)
665 },
666 );
667
668 let reader = PageReader::new(
669 blob_guard.blob.clone(),
670 physical_blob_size,
671 logical_blob_size,
672 prefetch_pages,
673 logical_page_size_nz,
674 );
675 Ok(Replay::new(reader))
676 }
677}
678
679impl<B: Blob> Append<B> {
680 pub async fn sync(&self) -> Result<(), Error> {
681 let buf_guard = self.buffer.write().await;
684 self.flush_internal(buf_guard, true).await?;
685
686 let blob_state = self.blob_state.read().await;
689 blob_state.blob.sync().await
690 }
691
692 pub async fn resize(&self, size: u64) -> Result<(), Error> {
703 let current_size = self.size().await;
704
705 if size > current_size {
707 let zeros_needed = (size - current_size) as usize;
708 let mut zeros = self.cache_ref.pool().alloc(zeros_needed);
709 zeros.put_bytes(0, zeros_needed);
710 self.append(zeros.as_ref()).await?;
711 return Ok(());
712 }
713
714 let logical_page_size = self.cache_ref.page_size();
721 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
722
723 self.sync().await?;
725
726 let mut buf_guard = self.buffer.write().await;
728 let mut blob_guard = self.blob_state.write().await;
729
730 let full_pages = size / logical_page_size;
732 let partial_bytes = size % logical_page_size;
733 let new_physical_size = if partial_bytes > 0 {
734 (full_pages + 1) * physical_page_size
737 } else {
738 full_pages * physical_page_size
740 };
741
742 blob_guard.blob.resize(new_physical_size).await?;
744 blob_guard.partial_page_state = None;
745
746 blob_guard.current_page = full_pages;
756 buf_guard.offset = full_pages * logical_page_size;
757
758 if partial_bytes > 0 {
759 let page_data =
761 super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
762
763 if (page_data.len() as u64) < partial_bytes {
765 return Err(Error::InvalidChecksum);
766 }
767
768 buf_guard.clear();
769 let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]);
770 assert!(!over_capacity);
771 } else {
772 buf_guard.clear();
774 }
775
776 Ok(())
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use super::*;
783 use crate::{deterministic, BufferPool, BufferPoolConfig, Runner as _, Storage as _};
784 use commonware_codec::ReadExt;
785 use commonware_macros::test_traced;
786 use commonware_utils::{NZUsize, NZU16};
787 use prometheus_client::registry::Registry;
788 use std::num::NonZeroU16;
789
790 const PAGE_SIZE: NonZeroU16 = NZU16!(103); const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
792
793 #[test_traced("DEBUG")]
794 fn test_append_crc_empty() {
795 let executor = deterministic::Runner::default();
796 executor.start(|context: deterministic::Context| async move {
797 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
799 assert_eq!(blob_size, 0);
800
801 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
803
804 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
806 .await
807 .unwrap();
808
809 assert_eq!(append.size().await, 0);
811
812 append.sync().await.unwrap();
814 drop(append);
815
816 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
817 assert_eq!(blob_size, 0); let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
820 .await
821 .unwrap();
822
823 assert_eq!(append.size().await, 0);
824 });
825 }
826
827 #[test_traced("DEBUG")]
828 fn test_append_crc_basic() {
829 let executor = deterministic::Runner::default();
830 executor.start(|context: deterministic::Context| async move {
831 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
833 assert_eq!(blob_size, 0);
834
835 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
837
838 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
840 .await
841 .unwrap();
842
843 assert_eq!(append.size().await, 0);
845
846 let data = vec![1, 2, 3, 4, 5];
848 append.append(&data).await.unwrap();
849
850 assert_eq!(append.size().await, 5);
852
853 let more_data = vec![6, 7, 8, 9, 10];
855 append.append(&more_data).await.unwrap();
856
857 assert_eq!(append.size().await, 10);
859
860 let read_buf = append.read_at(0, 5).await.unwrap().coalesce();
862 assert_eq!(read_buf, &data[..]);
863
864 let read_buf = append.read_at(5, 5).await.unwrap().coalesce();
866 assert_eq!(read_buf, &more_data[..]);
867
868 let read_buf = append.read_at(0, 10).await.unwrap().coalesce();
870 assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
871
872 append.sync().await.unwrap();
875 drop(append);
876
877 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
878 assert_eq!(blob_size, 115);
880 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
881 .await
882 .unwrap();
883 assert_eq!(append.size().await, 10); let spanning_data: Vec<u8> = (11..=110).collect();
889 append.append(&spanning_data).await.unwrap();
890 assert_eq!(append.size().await, 110);
891
892 let read_buf = append.read_at(10, 100).await.unwrap().coalesce();
894 assert_eq!(read_buf, &spanning_data[..]);
895
896 let read_buf = append.read_at(0, 110).await.unwrap().coalesce();
898 let expected: Vec<u8> = (1..=110).collect();
899 assert_eq!(read_buf, &expected[..]);
900
901 append.sync().await.unwrap();
903 drop(append);
904
905 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
906 assert_eq!(blob_size, 230);
908 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
909 .await
910 .unwrap();
911 assert_eq!(append.size().await, 110);
912
913 let boundary_data: Vec<u8> = (111..=206).collect();
917 assert_eq!(boundary_data.len(), 96);
918 append.append(&boundary_data).await.unwrap();
919 assert_eq!(append.size().await, 206);
920
921 let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
923 let expected: Vec<u8> = (1..=206).collect();
924 assert_eq!(read_buf, &expected[..]);
925
926 append.sync().await.unwrap();
928 drop(append);
929
930 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
931 assert_eq!(blob_size, 230);
933 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
934 .await
935 .unwrap();
936 assert_eq!(append.size().await, 206);
937
938 let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
940 assert_eq!(read_buf, &expected[..]);
941 });
942 }
943
944 #[test_traced("DEBUG")]
945 fn test_sync_releases_tip_pool_slot_after_full_drain() {
946 let executor = deterministic::Runner::default();
947 executor.start(|context: deterministic::Context| async move {
948 let mut registry = Registry::default();
949 let pool = BufferPool::new(
950 BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(2)),
951 &mut registry,
952 );
953 let cache_ref = CacheRef::new(pool.clone(), PAGE_SIZE, NZUsize!(1));
954
955 let (blob, blob_size) = context
956 .open("test_partition", b"release_tip_backing")
957 .await
958 .unwrap();
959 assert_eq!(blob_size, 0);
960
961 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
962 .await
963 .unwrap();
964
965 append
966 .append(&vec![7; PAGE_SIZE.get() as usize])
967 .await
968 .unwrap();
969
970 assert!(
972 matches!(
973 pool.try_alloc(BUFFER_SIZE),
974 Err(crate::iobuf::PoolError::Exhausted)
975 ),
976 "full-page tip should occupy the remaining pooled slot before sync"
977 );
978
979 append.sync().await.unwrap();
980
981 assert!(
983 pool.try_alloc(BUFFER_SIZE).is_ok(),
984 "sync should release pooled backing when no partial tail remains"
985 );
986 });
987 }
988
989 #[test_traced("DEBUG")]
990 fn test_read_up_to_zero_len_truncates_buffer() {
991 let executor = deterministic::Runner::default();
992 executor.start(|context: deterministic::Context| async move {
993 let (blob, blob_size) = context
995 .open("test_partition", b"read_up_to_zero_len")
996 .await
997 .unwrap();
998 assert_eq!(blob_size, 0);
999
1000 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1002
1003 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1005 .await
1006 .unwrap();
1007 append.append(&[1, 2, 3, 4]).await.unwrap();
1008
1009 let stale = vec![9, 8, 7, 6];
1011 let (buf, read) = append.read_up_to(0, 0, stale).await.unwrap();
1012
1013 assert_eq!(read, 0);
1014 assert_eq!(buf.len(), 0, "read_up_to must truncate returned buffer");
1015 assert_eq!(buf.freeze().as_ref(), b"");
1016 });
1017 }
1018
1019 fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1021 let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1022 Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1023 }
1024
1025 const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1028
1029 #[test]
1030 fn test_identify_protected_regions_equal_lengths() {
1031 let record = Checksum {
1033 len1: 50,
1034 crc1: 0xAAAAAAAA,
1035 len2: 50,
1036 crc2: 0xBBBBBBBB,
1037 };
1038
1039 let result =
1040 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1041 assert!(result.is_some());
1042 let (prefix_len, protected_crc) = result.unwrap();
1043 assert_eq!(prefix_len, 50);
1044 assert!(
1045 matches!(protected_crc, ProtectedCrc::First),
1046 "First CRC should be protected when lengths are equal"
1047 );
1048 }
1049
1050 #[test]
1051 fn test_identify_protected_regions_len1_larger() {
1052 let record = Checksum {
1054 len1: 100,
1055 crc1: 0xAAAAAAAA,
1056 len2: 50,
1057 crc2: 0xBBBBBBBB,
1058 };
1059
1060 let result =
1061 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1062 assert!(result.is_some());
1063 let (prefix_len, protected_crc) = result.unwrap();
1064 assert_eq!(prefix_len, 100);
1065 assert!(
1066 matches!(protected_crc, ProtectedCrc::First),
1067 "First CRC should be protected when len1 > len2"
1068 );
1069 }
1070
1071 #[test]
1072 fn test_identify_protected_regions_len2_larger() {
1073 let record = Checksum {
1075 len1: 50,
1076 crc1: 0xAAAAAAAA,
1077 len2: 100,
1078 crc2: 0xBBBBBBBB,
1079 };
1080
1081 let result =
1082 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1083 assert!(result.is_some());
1084 let (prefix_len, protected_crc) = result.unwrap();
1085 assert_eq!(prefix_len, 100);
1086 assert!(
1087 matches!(protected_crc, ProtectedCrc::Second),
1088 "Second CRC should be protected when len2 > len1"
1089 );
1090 }
1091
1092 #[test_traced("DEBUG")]
1098 fn test_crc_slot1_protected() {
1099 let executor = deterministic::Runner::default();
1100 executor.start(|context: deterministic::Context| async move {
1101 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1102 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1103 let slot0_offset = PAGE_SIZE.get() as u64;
1104 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1105
1106 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1108 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1109 .await
1110 .unwrap();
1111 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1112 append.sync().await.unwrap();
1113 drop(append);
1114
1115 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1117 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1118 .await
1119 .unwrap();
1120 append
1121 .append(&(11..=30).collect::<Vec<u8>>())
1122 .await
1123 .unwrap();
1124 append.sync().await.unwrap();
1125 drop(append);
1126
1127 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1129 let page = blob
1130 .read_at(0, physical_page_size)
1131 .await
1132 .unwrap()
1133 .coalesce();
1134 let crc = read_crc_record_from_page(page.as_ref());
1135 assert!(
1136 crc.len2 > crc.len1,
1137 "Slot 1 should be authoritative (len2={} > len1={})",
1138 crc.len2,
1139 crc.len1
1140 );
1141
1142 let slot1_before: Vec<u8> = blob
1144 .read_at(slot1_offset, 6)
1145 .await
1146 .unwrap()
1147 .coalesce()
1148 .freeze()
1149 .into();
1150
1151 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1153 .await
1154 .unwrap();
1155 blob.sync().await.unwrap();
1156
1157 let slot0_mangled: Vec<u8> = blob
1159 .read_at(slot0_offset, 6)
1160 .await
1161 .unwrap()
1162 .coalesce()
1163 .freeze()
1164 .into();
1165 assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1166
1167 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1169 .await
1170 .unwrap();
1171 append
1172 .append(&(31..=50).collect::<Vec<u8>>())
1173 .await
1174 .unwrap();
1175 append.sync().await.unwrap();
1176 drop(append);
1177
1178 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1180
1181 let slot0_after: Vec<u8> = blob
1183 .read_at(slot0_offset, 6)
1184 .await
1185 .unwrap()
1186 .coalesce()
1187 .freeze()
1188 .into();
1189 assert_ne!(
1190 slot0_after, DUMMY_MARKER,
1191 "Slot 0 should have been overwritten with new CRC"
1192 );
1193
1194 let slot1_after: Vec<u8> = blob
1196 .read_at(slot1_offset, 6)
1197 .await
1198 .unwrap()
1199 .coalesce()
1200 .freeze()
1201 .into();
1202 assert_eq!(
1203 slot1_before, slot1_after,
1204 "Slot 1 was modified! Protected region violated."
1205 );
1206
1207 let page = blob
1209 .read_at(0, physical_page_size)
1210 .await
1211 .unwrap()
1212 .coalesce();
1213 let crc = read_crc_record_from_page(page.as_ref());
1214 assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1215 });
1216 }
1217
1218 #[test_traced("DEBUG")]
1224 fn test_crc_slot0_protected() {
1225 let executor = deterministic::Runner::default();
1226 executor.start(|context: deterministic::Context| async move {
1227 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1228 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1229 let slot0_offset = PAGE_SIZE.get() as u64;
1230 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1231
1232 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1234 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1235 .await
1236 .unwrap();
1237 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1238 append.sync().await.unwrap();
1239 drop(append);
1240
1241 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1243 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1244 .await
1245 .unwrap();
1246 append
1247 .append(&(11..=30).collect::<Vec<u8>>())
1248 .await
1249 .unwrap();
1250 append.sync().await.unwrap();
1251 drop(append);
1252
1253 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1255 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1256 .await
1257 .unwrap();
1258 append
1259 .append(&(31..=50).collect::<Vec<u8>>())
1260 .await
1261 .unwrap();
1262 append.sync().await.unwrap();
1263 drop(append);
1264
1265 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1267 let page = blob
1268 .read_at(0, physical_page_size)
1269 .await
1270 .unwrap()
1271 .coalesce();
1272 let crc = read_crc_record_from_page(page.as_ref());
1273 assert!(
1274 crc.len1 > crc.len2,
1275 "Slot 0 should be authoritative (len1={} > len2={})",
1276 crc.len1,
1277 crc.len2
1278 );
1279
1280 let slot0_before: Vec<u8> = blob
1282 .read_at(slot0_offset, 6)
1283 .await
1284 .unwrap()
1285 .coalesce()
1286 .freeze()
1287 .into();
1288
1289 blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
1291 .await
1292 .unwrap();
1293 blob.sync().await.unwrap();
1294
1295 let slot1_mangled: Vec<u8> = blob
1297 .read_at(slot1_offset, 6)
1298 .await
1299 .unwrap()
1300 .coalesce()
1301 .freeze()
1302 .into();
1303 assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1304
1305 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1307 .await
1308 .unwrap();
1309 append
1310 .append(&(51..=70).collect::<Vec<u8>>())
1311 .await
1312 .unwrap();
1313 append.sync().await.unwrap();
1314 drop(append);
1315
1316 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1318
1319 let slot1_after: Vec<u8> = blob
1321 .read_at(slot1_offset, 6)
1322 .await
1323 .unwrap()
1324 .coalesce()
1325 .freeze()
1326 .into();
1327 assert_ne!(
1328 slot1_after, DUMMY_MARKER,
1329 "Slot 1 should have been overwritten with new CRC"
1330 );
1331
1332 let slot0_after: Vec<u8> = blob
1334 .read_at(slot0_offset, 6)
1335 .await
1336 .unwrap()
1337 .coalesce()
1338 .freeze()
1339 .into();
1340 assert_eq!(
1341 slot0_before, slot0_after,
1342 "Slot 0 was modified! Protected region violated."
1343 );
1344
1345 let page = blob
1347 .read_at(0, physical_page_size)
1348 .await
1349 .unwrap()
1350 .coalesce();
1351 let crc = read_crc_record_from_page(page.as_ref());
1352 assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1353 });
1354 }
1355
1356 #[test_traced("DEBUG")]
1362 fn test_data_prefix_not_overwritten() {
1363 let executor = deterministic::Runner::default();
1364 executor.start(|context: deterministic::Context| async move {
1365 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1366 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1367
1368 let (blob, _) = context
1370 .open("test_partition", b"prefix_test")
1371 .await
1372 .unwrap();
1373 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1374 .await
1375 .unwrap();
1376 let data1: Vec<u8> = (1..=20).collect();
1377 append.append(&data1).await.unwrap();
1378 append.sync().await.unwrap();
1379 drop(append);
1380
1381 let (blob, size) = context
1383 .open("test_partition", b"prefix_test")
1384 .await
1385 .unwrap();
1386 assert_eq!(size, physical_page_size as u64);
1387
1388 let prefix_before: Vec<u8> = blob
1389 .read_at(0, 20)
1390 .await
1391 .unwrap()
1392 .coalesce()
1393 .freeze()
1394 .into();
1395
1396 blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
1398 blob.sync().await.unwrap();
1399
1400 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1402 .await
1403 .unwrap();
1404 append
1405 .append(&(21..=40).collect::<Vec<u8>>())
1406 .await
1407 .unwrap();
1408 append.sync().await.unwrap();
1409 drop(append);
1410
1411 let (blob, _) = context
1413 .open("test_partition", b"prefix_test")
1414 .await
1415 .unwrap();
1416
1417 let prefix_after: Vec<u8> = blob
1419 .read_at(0, 20)
1420 .await
1421 .unwrap()
1422 .coalesce()
1423 .freeze()
1424 .into();
1425 assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1426
1427 let overwritten: Vec<u8> = blob
1429 .read_at(25, 6)
1430 .await
1431 .unwrap()
1432 .coalesce()
1433 .freeze()
1434 .into();
1435 assert_eq!(
1436 overwritten,
1437 vec![26, 27, 28, 29, 30, 31],
1438 "New data should overwrite padding area"
1439 );
1440 });
1441 }
1442
1443 #[test_traced("DEBUG")]
1449 fn test_crc_slot_protection_across_page_boundary() {
1450 let executor = deterministic::Runner::default();
1451 executor.start(|context: deterministic::Context| async move {
1452 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1453 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1454 let slot0_offset = PAGE_SIZE.get() as u64;
1455 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1456
1457 let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1459 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1460 .await
1461 .unwrap();
1462 append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1463 append.sync().await.unwrap();
1464 drop(append);
1465
1466 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1468 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1469 .await
1470 .unwrap();
1471 append
1472 .append(&(51..=80).collect::<Vec<u8>>())
1473 .await
1474 .unwrap();
1475 append.sync().await.unwrap();
1476 drop(append);
1477
1478 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1480 let page = blob
1481 .read_at(0, physical_page_size)
1482 .await
1483 .unwrap()
1484 .coalesce();
1485 let crc = read_crc_record_from_page(page.as_ref());
1486 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1487
1488 let slot1_before: Vec<u8> = blob
1490 .read_at(slot1_offset, 6)
1491 .await
1492 .unwrap()
1493 .coalesce()
1494 .freeze()
1495 .into();
1496
1497 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1499 .await
1500 .unwrap();
1501 blob.sync().await.unwrap();
1502
1503 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1505 .await
1506 .unwrap();
1507 append
1508 .append(&(81..=120).collect::<Vec<u8>>())
1509 .await
1510 .unwrap();
1511 append.sync().await.unwrap();
1512 drop(append);
1513
1514 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1516 assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1517
1518 let slot0_after: Vec<u8> = blob
1520 .read_at(slot0_offset, 6)
1521 .await
1522 .unwrap()
1523 .coalesce()
1524 .freeze()
1525 .into();
1526 assert_ne!(
1527 slot0_after, DUMMY_MARKER,
1528 "Slot 0 should have full-page CRC"
1529 );
1530
1531 let slot1_after: Vec<u8> = blob
1533 .read_at(slot1_offset, 6)
1534 .await
1535 .unwrap()
1536 .coalesce()
1537 .freeze()
1538 .into();
1539 assert_eq!(
1540 slot1_before, slot1_after,
1541 "Slot 1 was modified during page boundary crossing!"
1542 );
1543
1544 let page0 = blob
1546 .read_at(0, physical_page_size)
1547 .await
1548 .unwrap()
1549 .coalesce();
1550 let crc0 = read_crc_record_from_page(page0.as_ref());
1551 assert_eq!(
1552 crc0.len1,
1553 PAGE_SIZE.get(),
1554 "Slot 0 should have full page length"
1555 );
1556
1557 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1559 .await
1560 .unwrap();
1561 assert_eq!(append.size().await, 120);
1562 let all_data: Vec<u8> = append.read_at(0, 120).await.unwrap().coalesce().into();
1563 let expected: Vec<u8> = (1..=120).collect();
1564 assert_eq!(all_data, expected);
1565 });
1566 }
1567
1568 #[test_traced("DEBUG")]
1577 fn test_crc_fallback_on_corrupted_primary() {
1578 let executor = deterministic::Runner::default();
1579 executor.start(|context: deterministic::Context| async move {
1580 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1581 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1582 let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1584
1585 let (blob, _) = context
1587 .open("test_partition", b"crc_fallback")
1588 .await
1589 .unwrap();
1590 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1591 .await
1592 .unwrap();
1593 let data1: Vec<u8> = (1..=10).collect();
1594 append.append(&data1).await.unwrap();
1595 append.sync().await.unwrap();
1596 drop(append);
1597
1598 let (blob, size) = context
1600 .open("test_partition", b"crc_fallback")
1601 .await
1602 .unwrap();
1603 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1604 .await
1605 .unwrap();
1606 append
1607 .append(&(11..=30).collect::<Vec<u8>>())
1608 .await
1609 .unwrap();
1610 append.sync().await.unwrap();
1611 drop(append);
1612
1613 let (blob, size) = context
1615 .open("test_partition", b"crc_fallback")
1616 .await
1617 .unwrap();
1618 assert_eq!(size, physical_page_size as u64);
1619
1620 let page = blob
1621 .read_at(0, physical_page_size)
1622 .await
1623 .unwrap()
1624 .coalesce();
1625 let crc = read_crc_record_from_page(page.as_ref());
1626 assert!(
1627 crc.len2 > crc.len1,
1628 "Slot 1 should be authoritative (len2={} > len1={})",
1629 crc.len2,
1630 crc.len1
1631 );
1632 assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1633 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1634
1635 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1637 .await
1638 .unwrap();
1639 assert_eq!(append.size().await, 30);
1640 let all_data: Vec<u8> = append.read_at(0, 30).await.unwrap().coalesce().into();
1641 let expected: Vec<u8> = (1..=30).collect();
1642 assert_eq!(all_data, expected);
1643 drop(append);
1644
1645 blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1648 .await
1649 .unwrap();
1650 blob.sync().await.unwrap();
1651
1652 let page = blob
1654 .read_at(0, physical_page_size)
1655 .await
1656 .unwrap()
1657 .coalesce();
1658 let crc = read_crc_record_from_page(page.as_ref());
1659 assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1660 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1661
1662 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1664 .await
1665 .unwrap();
1666
1667 assert_eq!(
1669 append.size().await,
1670 10,
1671 "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1672 );
1673
1674 let fallback_data: Vec<u8> = append.read_at(0, 10).await.unwrap().coalesce().into();
1676 assert_eq!(
1677 fallback_data, data1,
1678 "Fallback data should match original 10 bytes"
1679 );
1680
1681 let result = append.read_at(0, 11).await;
1683 assert!(result.is_err(), "Reading beyond fallback size should fail");
1684 });
1685 }
1686
1687 #[test_traced("DEBUG")]
1699 fn test_non_last_page_rejects_partial_fallback() {
1700 let executor = deterministic::Runner::default();
1701 executor.start(|context: deterministic::Context| async move {
1702 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1703 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1704 let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1706
1707 let (blob, _) = context
1709 .open("test_partition", b"non_last_page")
1710 .await
1711 .unwrap();
1712 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1713 .await
1714 .unwrap();
1715 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1716 append.sync().await.unwrap();
1717 drop(append);
1718
1719 let (blob, size) = context
1721 .open("test_partition", b"non_last_page")
1722 .await
1723 .unwrap();
1724 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1725 .await
1726 .unwrap();
1727 append
1729 .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1730 .await
1731 .unwrap();
1732 append.sync().await.unwrap();
1733 drop(append);
1734
1735 let (blob, size) = context
1737 .open("test_partition", b"non_last_page")
1738 .await
1739 .unwrap();
1740 let page = blob
1741 .read_at(0, physical_page_size)
1742 .await
1743 .unwrap()
1744 .coalesce();
1745 let crc = read_crc_record_from_page(page.as_ref());
1746 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1747 assert_eq!(
1748 crc.len2,
1749 PAGE_SIZE.get(),
1750 "Slot 1 should have len=103 (full page)"
1751 );
1752 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1753
1754 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1756 .await
1757 .unwrap();
1758 append
1760 .append(&(104..=113).collect::<Vec<u8>>())
1761 .await
1762 .unwrap();
1763 append.sync().await.unwrap();
1764 drop(append);
1765
1766 let (blob, size) = context
1768 .open("test_partition", b"non_last_page")
1769 .await
1770 .unwrap();
1771 assert_eq!(
1772 size,
1773 (physical_page_size * 2) as u64,
1774 "Should have 2 physical pages"
1775 );
1776
1777 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1779 .await
1780 .unwrap();
1781 assert_eq!(append.size().await, 113);
1782 let all_data: Vec<u8> = append.read_at(0, 113).await.unwrap().coalesce().into();
1783 let expected: Vec<u8> = (1..=113).collect();
1784 assert_eq!(all_data, expected);
1785 drop(append);
1786
1787 blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1789 .await
1790 .unwrap();
1791 blob.sync().await.unwrap();
1792
1793 let page = blob
1795 .read_at(0, physical_page_size)
1796 .await
1797 .unwrap()
1798 .coalesce();
1799 let crc = read_crc_record_from_page(page.as_ref());
1800 assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1801 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1802 assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1804
1805 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1811 .await
1812 .unwrap();
1813
1814 assert_eq!(append.size().await, 113);
1817
1818 let result = append.read_at(0, 10).await;
1821 assert!(
1822 result.is_err(),
1823 "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1824 result
1825 );
1826 drop(append);
1827
1828 let (blob, size) = context
1830 .open("test_partition", b"non_last_page")
1831 .await
1832 .unwrap();
1833 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1834 .await
1835 .unwrap();
1836 let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1837
1838 let result = replay.ensure(1).await;
1840 assert!(
1841 result.is_err(),
1842 "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1843 result
1844 );
1845 });
1846 }
1847
1848 #[test]
1849 fn test_resize_shrink_validates_crc() {
1850 let executor = deterministic::Runner::default();
1853
1854 executor.start(|context| async move {
1855 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1856 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1857
1858 let (blob, size) = context
1859 .open("test_partition", b"resize_crc_test")
1860 .await
1861 .unwrap();
1862
1863 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1864 .await
1865 .unwrap();
1866
1867 let data: Vec<u8> = (0..=249).collect();
1870 append.append(&data).await.unwrap();
1871 append.sync().await.unwrap();
1872 assert_eq!(append.size().await, 250);
1873 drop(append);
1874
1875 let (blob, size) = context
1877 .open("test_partition", b"resize_crc_test")
1878 .await
1879 .unwrap();
1880 assert_eq!(size as usize, physical_page_size * 3);
1881
1882 let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
1884 blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
1885 .await
1886 .unwrap();
1887 blob.sync().await.unwrap();
1888
1889 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1892 .await
1893 .unwrap();
1894 assert_eq!(append.size().await, 250);
1895
1896 let result = append.resize(150).await;
1900 assert!(
1901 matches!(result, Err(crate::Error::InvalidChecksum)),
1902 "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
1903 result
1904 );
1905 });
1906 }
1907
1908 #[test]
1909 fn test_reopen_partial_tail_append_and_resize() {
1910 let executor = deterministic::Runner::default();
1911
1912 executor.start(|context| async move {
1913 const PAGE_SIZE: NonZeroU16 = NZU16!(64);
1914 const BUFFER_SIZE: usize = 256;
1915
1916 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(4));
1917
1918 let (blob, size) = context
1919 .open("test_partition", b"partial_tail_test")
1920 .await
1921 .unwrap();
1922
1923 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1924 .await
1925 .unwrap();
1926
1927 append.append(&[1, 2, 3, 4, 5]).await.unwrap();
1929 append.sync().await.unwrap();
1930 assert_eq!(append.size().await, 5);
1931 drop(append);
1932
1933 let (blob, size) = context
1934 .open("test_partition", b"partial_tail_test")
1935 .await
1936 .unwrap();
1937
1938 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1939 .await
1940 .unwrap();
1941 assert_eq!(append.size().await, 5);
1942
1943 append.append(&[6, 7, 8]).await.unwrap();
1944 append.resize(6).await.unwrap();
1945 append.sync().await.unwrap();
1946
1947 let data: Vec<u8> = append.read_at(0, 6).await.unwrap().coalesce().into();
1948 assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
1949 });
1950 }
1951
1952 #[test]
1953 fn test_corrupted_crc_len_too_large() {
1954 let executor = deterministic::Runner::default();
1955
1956 executor.start(|context| async move {
1957 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1958 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1959
1960 let (blob, size) = context
1962 .open("test_partition", b"crc_len_test")
1963 .await
1964 .unwrap();
1965
1966 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1967 .await
1968 .unwrap();
1969
1970 append.append(&[0x42; 50]).await.unwrap();
1971 append.sync().await.unwrap();
1972 drop(append);
1973
1974 let (blob, size) = context
1976 .open("test_partition", b"crc_len_test")
1977 .await
1978 .unwrap();
1979 assert_eq!(size as usize, physical_page_size);
1980
1981 let crc_offset = PAGE_SIZE.get() as u64;
1983
1984 let bad_crc_record: [u8; 12] = [
1987 0xFF, 0xFF, 0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
1992 blob.write_at(crc_offset, bad_crc_record.to_vec())
1993 .await
1994 .unwrap();
1995 blob.sync().await.unwrap();
1996
1997 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
1999
2000 match result {
2003 Ok(append) => {
2004 let recovered_size = append.size().await;
2006 assert_eq!(
2007 recovered_size, 0,
2008 "Corrupted page should be truncated, size should be 0"
2009 );
2010 }
2011 Err(e) => {
2012 assert!(
2014 matches!(e, crate::Error::InvalidChecksum),
2015 "Expected InvalidChecksum error, got: {:?}",
2016 e
2017 );
2018 }
2019 }
2020 });
2021 }
2022
2023 #[test]
2024 fn test_corrupted_crc_both_slots_len_too_large() {
2025 let executor = deterministic::Runner::default();
2026
2027 executor.start(|context| async move {
2028 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2029
2030 let (blob, size) = context
2032 .open("test_partition", b"crc_both_bad")
2033 .await
2034 .unwrap();
2035
2036 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2037 .await
2038 .unwrap();
2039
2040 append.append(&[0x42; 50]).await.unwrap();
2041 append.sync().await.unwrap();
2042 drop(append);
2043
2044 let (blob, size) = context
2046 .open("test_partition", b"crc_both_bad")
2047 .await
2048 .unwrap();
2049
2050 let crc_offset = PAGE_SIZE.get() as u64;
2051
2052 let bad_crc_record: [u8; 12] = [
2054 0x01, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0x02, 0x00, 0xCA, 0xFE, 0xBA, 0xBE, ];
2059 blob.write_at(crc_offset, bad_crc_record.to_vec())
2060 .await
2061 .unwrap();
2062 blob.sync().await.unwrap();
2063
2064 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2066
2067 match result {
2068 Ok(append) => {
2069 assert_eq!(append.size().await, 0);
2071 }
2072 Err(e) => {
2073 assert!(
2074 matches!(e, crate::Error::InvalidChecksum),
2075 "Expected InvalidChecksum, got: {:?}",
2076 e
2077 );
2078 }
2079 }
2080 });
2081 }
2082}