1use super::read::{PageReader, Replay};
30use crate::{
31 buffer::{
32 paged::{CacheRef, Checksum, CHECKSUM_SIZE},
33 tip::Buffer,
34 },
35 Blob, Error, IoBufMut, IoBufs, IoBufsMut, RwLock, RwLockWriteGuard,
36};
37use commonware_cryptography::Crc32;
38use std::{
39 num::{NonZeroU16, NonZeroUsize},
40 sync::Arc,
41};
42use tracing::warn;
43
44#[derive(Clone, Copy)]
46enum ProtectedCrc {
47 First,
48 Second,
49}
50
51#[derive(Clone)]
53struct BlobState<B: Blob> {
54 blob: B,
55
56 current_page: u64,
58
59 partial_page_state: Option<Checksum>,
62}
63
64#[derive(Clone)]
67pub struct Append<B: Blob> {
68 blob_state: Arc<RwLock<BlobState<B>>>,
70
71 id: u64,
73
74 cache_ref: CacheRef,
76
77 buffer: Arc<RwLock<Buffer>>,
80}
81
82fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
85 let floor = page_size as usize * 2;
86 if capacity < floor {
87 warn!(
88 floor,
89 "requested buffer capacity is too low, increasing it to floor"
90 );
91 floor
92 } else {
93 capacity
94 }
95}
96
97impl<B: Blob> Append<B> {
98 pub async fn new(
103 blob: B,
104 original_blob_size: u64,
105 capacity: usize,
106 cache_ref: CacheRef,
107 ) -> Result<Self, Error> {
108 let (partial_page_state, pages, invalid_data_found) =
109 Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
110 if invalid_data_found {
111 let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
113 warn!(
114 original_blob_size,
115 new_blob_size, "truncating blob to remove invalid data"
116 );
117 blob.resize(new_blob_size).await?;
118 blob.sync().await?;
119 }
120
121 let capacity = capacity_with_floor(capacity, cache_ref.page_size());
122
123 let (blob_state, data) = match partial_page_state {
124 Some((mut partial_page, crc_record)) => {
125 partial_page.reserve(capacity - partial_page.len());
127 (
128 BlobState {
129 blob,
130 current_page: pages - 1,
131 partial_page_state: Some(crc_record),
132 },
133 partial_page,
134 )
135 }
136 None => (
137 BlobState {
138 blob,
139 current_page: pages,
140 partial_page_state: None,
141 },
142 Vec::with_capacity(capacity),
143 ),
144 };
145
146 let buffer = Buffer {
147 offset: blob_state.current_page * cache_ref.page_size(),
148 data,
149 capacity,
150 immutable: false,
151 };
152
153 Ok(Self {
154 blob_state: Arc::new(RwLock::new(blob_state)),
155 id: cache_ref.next_id().await,
156 cache_ref,
157 buffer: Arc::new(RwLock::new(buffer)),
158 })
159 }
160
161 pub async fn new_immutable(
168 blob: B,
169 blob_size: u64,
170 capacity: usize,
171 cache_ref: CacheRef,
172 ) -> Result<Self, Error> {
173 let (partial_page_state, pages, invalid_data_found) =
174 Self::read_last_valid_page(&blob, blob_size, cache_ref.page_size()).await?;
175 if invalid_data_found {
176 return Err(Error::InvalidChecksum);
178 }
179
180 let capacity = capacity_with_floor(capacity, cache_ref.page_size());
181
182 let (blob_state, data) = match partial_page_state {
183 Some((mut partial_page, crc_record)) => {
184 partial_page.shrink_to_fit();
186 (
187 BlobState {
188 blob,
189 current_page: pages - 1,
190 partial_page_state: Some(crc_record),
191 },
192 partial_page,
193 )
194 }
195 None => (
196 BlobState {
197 blob,
198 current_page: pages,
199 partial_page_state: None,
200 },
201 vec![],
202 ),
203 };
204 let buffer = Buffer {
205 data,
206 capacity,
207 offset: blob_state.current_page * cache_ref.page_size(),
208 immutable: true,
209 };
210
211 Ok(Self {
212 blob_state: Arc::new(RwLock::new(blob_state)),
213 id: cache_ref.next_id().await,
214 cache_ref,
215 buffer: Arc::new(RwLock::new(buffer)),
216 })
217 }
218
219 pub async fn is_immutable(&self) -> bool {
221 let buffer = self.buffer.read().await;
222
223 buffer.immutable
224 }
225
226 pub async fn to_immutable(&self) -> Result<(), Error> {
230 let mut buf_guard = self.buffer.write().await;
233 if buf_guard.immutable {
234 return Ok(());
235 }
236 buf_guard.immutable = true;
237 self.flush_internal(buf_guard, true).await?;
238
239 {
242 let mut buf_guard = self.buffer.write().await;
243 buf_guard.data.shrink_to_fit();
244 }
245
246 let blob_state = self.blob_state.read().await;
249 blob_state.blob.sync().await
250 }
251
252 pub async fn to_mutable(&self) {
254 let mut buffer = self.buffer.write().await;
255 if !buffer.immutable {
256 return;
257 }
258 buffer.immutable = false;
259 }
260
261 async fn read_last_valid_page(
279 blob: &B,
280 blob_size: u64,
281 page_size: u64,
282 ) -> Result<(Option<(Vec<u8>, Checksum)>, u64, bool), Error> {
283 let physical_page_size = page_size + CHECKSUM_SIZE;
284 let partial_bytes = blob_size % physical_page_size;
285 let mut last_page_end = blob_size - partial_bytes;
286
287 let mut invalid_data_found = partial_bytes != 0;
290
291 while last_page_end != 0 {
292 let page_start = last_page_end - physical_page_size;
294 let buf = blob
295 .read_at(page_start, IoBufMut::zeroed(physical_page_size as usize))
296 .await?
297 .coalesce()
298 .freeze();
299
300 match Checksum::validate_page(buf.as_ref()) {
301 Some(crc_record) => {
302 let (len, _) = crc_record.get_crc();
304 let len = len as u64;
305 if len != page_size {
306 let logical_bytes = buf.slice(..len as usize).into();
308 return Ok((
309 Some((logical_bytes, crc_record)),
310 last_page_end / physical_page_size,
311 invalid_data_found,
312 ));
313 }
314 return Ok((None, last_page_end / physical_page_size, invalid_data_found));
316 }
317 None => {
318 last_page_end = page_start;
320 invalid_data_found = true;
321 }
322 }
323 }
324
325 Ok((None, 0, invalid_data_found))
327 }
328
329 pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
335 let mut buffer = self.buffer.write().await;
336 if buffer.immutable {
337 return Err(Error::ImmutableBlob);
338 }
339
340 if !buffer.append(buf) {
341 return Ok(());
342 }
343
344 self.flush_internal(buffer, false).await
346 }
347
348 async fn flush_internal(
352 &self,
353 mut buf_guard: RwLockWriteGuard<'_, Buffer>,
354 write_partial_page: bool,
355 ) -> Result<(), Error> {
356 let buffer = &mut *buf_guard;
357
358 let remaining_byte_count = self
361 .cache_ref
362 .cache(self.id, &buffer.data, buffer.offset)
363 .await;
364
365 let old_partial_page_state = {
369 let blob_state = self.blob_state.read().await;
370 blob_state.partial_page_state.clone()
371 };
372
373 let (physical_pages, partial_page_state) = self.to_physical_pages(
376 &*buffer,
377 write_partial_page,
378 old_partial_page_state.as_ref(),
379 );
380
381 if physical_pages.is_empty() {
383 return Ok(());
384 }
385
386 let bytes_to_drain = buffer.data.len() - remaining_byte_count;
389 buffer.data.drain(0..bytes_to_drain);
390 buffer.offset += bytes_to_drain as u64;
391 let new_offset = buffer.offset;
392
393 let mut blob_state = self.blob_state.write().await;
396
397 drop(buf_guard);
400
401 let logical_page_size = self.cache_ref.page_size() as usize;
402 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
403 let write_at_offset = blob_state.current_page * physical_page_size as u64;
404
405 let total_pages_in_buffer = physical_pages.len() / physical_page_size;
409 let full_pages_written = if partial_page_state.is_some() {
410 total_pages_in_buffer.saturating_sub(1)
411 } else {
412 total_pages_in_buffer
413 };
414
415 let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
417
418 blob_state.current_page += full_pages_written as u64;
422 blob_state.partial_page_state = partial_page_state;
423
424 assert_eq!(
426 blob_state.current_page * self.cache_ref.page_size(),
427 new_offset
428 );
429
430 if let Some((prefix_len, protected_crc)) = protected_regions {
433 match protected_crc {
434 ProtectedCrc::First => {
435 if prefix_len < logical_page_size {
438 blob_state
439 .blob
440 .write_at(
441 write_at_offset + prefix_len as u64,
442 physical_pages[prefix_len..logical_page_size].to_vec(),
443 )
444 .await?;
445 }
446 let second_crc_start = logical_page_size + 6;
448 blob_state
449 .blob
450 .write_at(
451 write_at_offset + second_crc_start as u64,
452 physical_pages[second_crc_start..].to_vec(),
453 )
454 .await?;
455 }
456 ProtectedCrc::Second => {
457 let first_crc_end = logical_page_size + 6;
460 if prefix_len < first_crc_end {
461 blob_state
462 .blob
463 .write_at(
464 write_at_offset + prefix_len as u64,
465 physical_pages[prefix_len..first_crc_end].to_vec(),
466 )
467 .await?;
468 }
469 if physical_pages.len() > physical_page_size {
471 blob_state
472 .blob
473 .write_at(
474 write_at_offset + physical_page_size as u64,
475 physical_pages[physical_page_size..].to_vec(),
476 )
477 .await?;
478 }
479 }
480 }
481 } else {
482 blob_state
484 .blob
485 .write_at(write_at_offset, physical_pages)
486 .await?;
487 }
488
489 Ok(())
490 }
491
492 pub async fn size(&self) -> u64 {
494 let buffer = self.buffer.read().await;
495 buffer.size()
496 }
497
498 pub async fn read_up_to(
507 &self,
508 buf: impl Into<IoBufMut> + Send,
509 logical_offset: u64,
510 ) -> Result<(IoBufMut, usize), Error> {
511 let mut buf = buf.into();
512 if buf.is_empty() {
513 return Ok((buf, 0));
514 }
515 let blob_size = self.size().await;
516 let available = (blob_size.saturating_sub(logical_offset) as usize).min(buf.len());
517 if available == 0 {
518 return Err(Error::BlobInsufficientLength);
519 }
520 buf.truncate(available);
521 self.read_into(buf.as_mut(), logical_offset).await?;
522
523 Ok((buf, available))
524 }
525
526 pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
531 let end_offset = logical_offset
533 .checked_add(buf.len() as u64)
534 .ok_or(Error::OffsetOverflow)?;
535
536 let buffer = self.buffer.read().await;
538
539 if end_offset > buffer.size() {
541 return Err(Error::BlobInsufficientLength);
542 }
543
544 let remaining = buffer.extract(buf.as_mut(), logical_offset);
546
547 drop(buffer);
549
550 if remaining == 0 {
551 return Ok(());
552 }
553
554 let cached = self
557 .cache_ref
558 .read_cached(self.id, &mut buf[..remaining], logical_offset)
559 .await;
560
561 if cached == remaining {
562 return Ok(());
564 }
565
566 let blob_guard = self.blob_state.read().await;
569
570 let uncached_offset = logical_offset + cached as u64;
572 let uncached_len = remaining - cached;
573 self.cache_ref
574 .read(
575 &blob_guard.blob,
576 self.id,
577 &mut buf[cached..cached + uncached_len],
578 uncached_offset,
579 )
580 .await
581 }
582
583 fn identify_protected_regions(
594 partial_page_state: Option<&Checksum>,
595 ) -> Option<(usize, ProtectedCrc)> {
596 let crc_record = partial_page_state?;
597 let (old_len, _) = crc_record.get_crc();
598 let protected_crc = if crc_record.len1 >= crc_record.len2 {
600 ProtectedCrc::First
601 } else {
602 ProtectedCrc::Second
603 };
604 Some((old_len as usize, protected_crc))
605 }
606
607 fn to_physical_pages(
619 &self,
620 buffer: &Buffer,
621 include_partial_page: bool,
622 old_crc_record: Option<&Checksum>,
623 ) -> (Vec<u8>, Option<Checksum>) {
624 let logical_page_size = self.cache_ref.page_size() as usize;
625 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
626 let pages_to_write = buffer.data.len() / logical_page_size;
627 let mut write_buffer = Vec::with_capacity(pages_to_write * physical_page_size);
628
629 for page in 0..pages_to_write {
631 let start_read_idx = page * logical_page_size;
632 let end_read_idx = start_read_idx + logical_page_size;
633 let logical_page = &buffer.data[start_read_idx..end_read_idx];
634 write_buffer.extend_from_slice(logical_page);
635
636 let crc = Crc32::checksum(logical_page);
637 let logical_page_size_u16 =
638 u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
639
640 let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
643 Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
644 } else {
645 Checksum::new(logical_page_size_u16, crc)
646 };
647 write_buffer.extend_from_slice(&crc_record.to_bytes());
648 }
649
650 if !include_partial_page {
651 return (write_buffer, None);
652 }
653
654 let partial_page = &buffer.data[pages_to_write * logical_page_size..];
655 if partial_page.is_empty() {
656 return (write_buffer, None);
658 }
659
660 if pages_to_write == 0 {
663 if let Some(old_crc) = old_crc_record {
664 let (old_len, _) = old_crc.get_crc();
665 if partial_page.len() == old_len as usize {
666 return (write_buffer, None);
667 }
668 }
669 }
670 write_buffer.extend_from_slice(partial_page);
671 let partial_len = partial_page.len();
672 let crc = Crc32::checksum(partial_page);
673
674 write_buffer.resize(write_buffer.len() + (logical_page_size - partial_len), 0);
676
677 let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
680 Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
681 } else {
682 Checksum::new(partial_len as u16, crc)
683 };
684
685 write_buffer.extend_from_slice(&crc_record.to_bytes());
686
687 (write_buffer, Some(crc_record))
690 }
691
692 const fn build_crc_record_preserving_old(
695 new_len: u16,
696 new_crc: u32,
697 old_crc: &Checksum,
698 ) -> Checksum {
699 let (old_len, old_crc_val) = old_crc.get_crc();
700 if old_crc.len1 >= old_crc.len2 {
702 Checksum {
704 len1: old_len,
705 crc1: old_crc_val,
706 len2: new_len,
707 crc2: new_crc,
708 }
709 } else {
710 Checksum {
712 len1: new_len,
713 crc1: new_crc,
714 len2: old_len,
715 crc2: old_crc_val,
716 }
717 }
718 }
719
720 pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
725 let logical_page_size = self.cache_ref.page_size();
726 let logical_page_size_nz =
727 NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
728
729 {
731 let buf_guard = self.buffer.write().await;
732 if !buf_guard.immutable {
733 self.flush_internal(buf_guard, true).await?;
734 }
735 }
736
737 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
738
739 let prefetch_pages = buffer_size.get() / physical_page_size as usize;
741 let prefetch_pages = prefetch_pages.max(1); let blob_guard = self.blob_state.read().await;
743
744 let (physical_blob_size, logical_blob_size) =
746 blob_guard.partial_page_state.as_ref().map_or_else(
747 || {
748 let physical = physical_page_size * blob_guard.current_page;
750 let logical = logical_page_size * blob_guard.current_page;
751 (physical, logical)
752 },
753 |crc_record| {
754 let (partial_len, _) = crc_record.get_crc();
756 let partial_len = partial_len as u64;
757 let physical = physical_page_size * (blob_guard.current_page + 1);
759 let logical = logical_page_size * blob_guard.current_page + partial_len;
761 (physical, logical)
762 },
763 );
764
765 let reader = PageReader::new(
766 blob_guard.blob.clone(),
767 physical_blob_size,
768 logical_blob_size,
769 prefetch_pages,
770 logical_page_size_nz,
771 );
772 Ok(Replay::new(reader))
773 }
774}
775
776impl<B: Blob> Blob for Append<B> {
777 async fn read_at(
778 &self,
779 logical_offset: u64,
780 buf: impl Into<IoBufsMut> + Send,
781 ) -> Result<IoBufsMut, Error> {
782 let buf = buf.into();
783 let len = buf.len();
784 match buf {
785 IoBufsMut::Single(mut single) => {
786 self.read_into(single.as_mut(), logical_offset).await?;
787 Ok(IoBufsMut::Single(single))
788 }
789 IoBufsMut::Chunked(mut chunks) => {
790 let mut temp = vec![0u8; len];
792 self.read_into(&mut temp, logical_offset).await?;
793 let mut pos = 0;
794 for chunk in chunks.iter_mut() {
795 let chunk_len = chunk.len();
796 chunk.as_mut().copy_from_slice(&temp[pos..pos + chunk_len]);
797 pos += chunk_len;
798 }
799 Ok(IoBufsMut::Chunked(chunks))
800 }
801 }
802 }
803
804 async fn sync(&self) -> Result<(), Error> {
805 let buf_guard = self.buffer.write().await;
808 if buf_guard.immutable {
809 return Ok(());
810 }
811 self.flush_internal(buf_guard, true).await?;
812
813 let blob_state = self.blob_state.read().await;
816 blob_state.blob.sync().await
817 }
818
819 async fn write_at(&self, _offset: u64, _buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
821 unimplemented!("append-only blob type does not support write_at")
824 }
825
826 async fn resize(&self, size: u64) -> Result<(), Error> {
837 let current_size = self.size().await;
838
839 if size > current_size {
841 let zeros_needed = (size - current_size) as usize;
842 let zeros = vec![0u8; zeros_needed];
843 self.append(&zeros).await?;
844 return Ok(());
845 }
846
847 let logical_page_size = self.cache_ref.page_size();
854 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
855
856 self.sync().await?;
858
859 let mut buf_guard = self.buffer.write().await;
861 if buf_guard.immutable {
862 return Err(Error::ImmutableBlob);
863 }
864 let mut blob_guard = self.blob_state.write().await;
865
866 let full_pages = size / logical_page_size;
868 let partial_bytes = size % logical_page_size;
869 let new_physical_size = if partial_bytes > 0 {
870 (full_pages + 1) * physical_page_size
873 } else {
874 full_pages * physical_page_size
876 };
877
878 blob_guard.blob.resize(new_physical_size).await?;
880 blob_guard.partial_page_state = None;
881
882 blob_guard.current_page = full_pages;
892 buf_guard.offset = full_pages * logical_page_size;
893
894 if partial_bytes > 0 {
895 let page_data =
897 super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
898
899 if (page_data.len() as u64) < partial_bytes {
901 return Err(Error::InvalidChecksum);
902 }
903
904 buf_guard.data = page_data[..partial_bytes as usize].to_vec();
905 } else {
906 buf_guard.data = vec![];
908 }
909
910 Ok(())
911 }
912}
913
914#[cfg(test)]
915mod tests {
916 use super::*;
917 use crate::{deterministic, IoBufMut, Runner as _, Storage as _};
918 use commonware_codec::ReadExt;
919 use commonware_macros::test_traced;
920 use commonware_utils::{NZUsize, NZU16};
921 use std::num::NonZeroU16;
922
923 const PAGE_SIZE: NonZeroU16 = NZU16!(103); const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
925
926 #[test_traced("DEBUG")]
927 fn test_append_crc_empty() {
928 let executor = deterministic::Runner::default();
929 executor.start(|context: deterministic::Context| async move {
930 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
932 assert_eq!(blob_size, 0);
933
934 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
936
937 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
939 .await
940 .unwrap();
941
942 assert_eq!(append.size().await, 0);
944
945 append.sync().await.unwrap();
947 drop(append);
948
949 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
950 assert_eq!(blob_size, 0); let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
953 .await
954 .unwrap();
955
956 assert_eq!(append.size().await, 0);
957 });
958 }
959
960 #[test_traced("DEBUG")]
961 fn test_append_crc_basic() {
962 let executor = deterministic::Runner::default();
963 executor.start(|context: deterministic::Context| async move {
964 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
966 assert_eq!(blob_size, 0);
967
968 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
970
971 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
973 .await
974 .unwrap();
975
976 assert_eq!(append.size().await, 0);
978
979 let data = vec![1, 2, 3, 4, 5];
981 append.append(&data).await.unwrap();
982
983 assert_eq!(append.size().await, 5);
985
986 let more_data = vec![6, 7, 8, 9, 10];
988 append.append(&more_data).await.unwrap();
989
990 assert_eq!(append.size().await, 10);
992
993 let read_buf = append
995 .read_at(0, IoBufMut::zeroed(5))
996 .await
997 .unwrap()
998 .coalesce();
999 assert_eq!(read_buf, &data[..]);
1000
1001 let read_buf = append
1003 .read_at(5, IoBufMut::zeroed(5))
1004 .await
1005 .unwrap()
1006 .coalesce();
1007 assert_eq!(read_buf, &more_data[..]);
1008
1009 let read_buf = append
1011 .read_at(0, IoBufMut::zeroed(10))
1012 .await
1013 .unwrap()
1014 .coalesce();
1015 assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1016
1017 append.sync().await.unwrap();
1020 drop(append);
1021
1022 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1023 assert_eq!(blob_size, 115);
1025 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1026 .await
1027 .unwrap();
1028 assert_eq!(append.size().await, 10); let spanning_data: Vec<u8> = (11..=110).collect();
1034 append.append(&spanning_data).await.unwrap();
1035 assert_eq!(append.size().await, 110);
1036
1037 let read_buf = append
1039 .read_at(10, IoBufMut::zeroed(100))
1040 .await
1041 .unwrap()
1042 .coalesce();
1043 assert_eq!(read_buf, &spanning_data[..]);
1044
1045 let read_buf = append
1047 .read_at(0, IoBufMut::zeroed(110))
1048 .await
1049 .unwrap()
1050 .coalesce();
1051 let expected: Vec<u8> = (1..=110).collect();
1052 assert_eq!(read_buf, &expected[..]);
1053
1054 append.sync().await.unwrap();
1056 drop(append);
1057
1058 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1059 assert_eq!(blob_size, 230);
1061 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1062 .await
1063 .unwrap();
1064 assert_eq!(append.size().await, 110);
1065
1066 let boundary_data: Vec<u8> = (111..=206).collect();
1070 assert_eq!(boundary_data.len(), 96);
1071 append.append(&boundary_data).await.unwrap();
1072 assert_eq!(append.size().await, 206);
1073
1074 let read_buf = append
1076 .read_at(0, IoBufMut::zeroed(206))
1077 .await
1078 .unwrap()
1079 .coalesce();
1080 let expected: Vec<u8> = (1..=206).collect();
1081 assert_eq!(read_buf, &expected[..]);
1082
1083 append.sync().await.unwrap();
1085 drop(append);
1086
1087 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1088 assert_eq!(blob_size, 230);
1090 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1091 .await
1092 .unwrap();
1093 assert_eq!(append.size().await, 206);
1094
1095 let read_buf = append
1097 .read_at(0, IoBufMut::zeroed(206))
1098 .await
1099 .unwrap()
1100 .coalesce();
1101 assert_eq!(read_buf, &expected[..]);
1102 });
1103 }
1104
1105 fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1107 let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1108 Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1109 }
1110
1111 const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1114
1115 #[test]
1116 fn test_identify_protected_regions_equal_lengths() {
1117 let record = Checksum {
1119 len1: 50,
1120 crc1: 0xAAAAAAAA,
1121 len2: 50,
1122 crc2: 0xBBBBBBBB,
1123 };
1124
1125 let result =
1126 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1127 assert!(result.is_some());
1128 let (prefix_len, protected_crc) = result.unwrap();
1129 assert_eq!(prefix_len, 50);
1130 assert!(
1131 matches!(protected_crc, ProtectedCrc::First),
1132 "First CRC should be protected when lengths are equal"
1133 );
1134 }
1135
1136 #[test]
1137 fn test_identify_protected_regions_len1_larger() {
1138 let record = Checksum {
1140 len1: 100,
1141 crc1: 0xAAAAAAAA,
1142 len2: 50,
1143 crc2: 0xBBBBBBBB,
1144 };
1145
1146 let result =
1147 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1148 assert!(result.is_some());
1149 let (prefix_len, protected_crc) = result.unwrap();
1150 assert_eq!(prefix_len, 100);
1151 assert!(
1152 matches!(protected_crc, ProtectedCrc::First),
1153 "First CRC should be protected when len1 > len2"
1154 );
1155 }
1156
1157 #[test]
1158 fn test_identify_protected_regions_len2_larger() {
1159 let record = Checksum {
1161 len1: 50,
1162 crc1: 0xAAAAAAAA,
1163 len2: 100,
1164 crc2: 0xBBBBBBBB,
1165 };
1166
1167 let result =
1168 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1169 assert!(result.is_some());
1170 let (prefix_len, protected_crc) = result.unwrap();
1171 assert_eq!(prefix_len, 100);
1172 assert!(
1173 matches!(protected_crc, ProtectedCrc::Second),
1174 "Second CRC should be protected when len2 > len1"
1175 );
1176 }
1177
1178 #[test_traced("DEBUG")]
1184 fn test_crc_slot1_protected() {
1185 let executor = deterministic::Runner::default();
1186 executor.start(|context: deterministic::Context| async move {
1187 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1188 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1189 let slot0_offset = PAGE_SIZE.get() as u64;
1190 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1191
1192 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1194 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1195 .await
1196 .unwrap();
1197 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1198 append.sync().await.unwrap();
1199 drop(append);
1200
1201 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1203 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1204 .await
1205 .unwrap();
1206 append
1207 .append(&(11..=30).collect::<Vec<u8>>())
1208 .await
1209 .unwrap();
1210 append.sync().await.unwrap();
1211 drop(append);
1212
1213 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1215 let page = blob
1216 .read_at(0, IoBufMut::zeroed(physical_page_size))
1217 .await
1218 .unwrap()
1219 .coalesce();
1220 let crc = read_crc_record_from_page(page.as_ref());
1221 assert!(
1222 crc.len2 > crc.len1,
1223 "Slot 1 should be authoritative (len2={} > len1={})",
1224 crc.len2,
1225 crc.len1
1226 );
1227
1228 let slot1_before: Vec<u8> = blob
1230 .read_at(slot1_offset, IoBufMut::zeroed(6))
1231 .await
1232 .unwrap()
1233 .coalesce()
1234 .freeze()
1235 .into();
1236
1237 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1239 .await
1240 .unwrap();
1241 blob.sync().await.unwrap();
1242
1243 let slot0_mangled: Vec<u8> = blob
1245 .read_at(slot0_offset, IoBufMut::zeroed(6))
1246 .await
1247 .unwrap()
1248 .coalesce()
1249 .freeze()
1250 .into();
1251 assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1252
1253 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1255 .await
1256 .unwrap();
1257 append
1258 .append(&(31..=50).collect::<Vec<u8>>())
1259 .await
1260 .unwrap();
1261 append.sync().await.unwrap();
1262 drop(append);
1263
1264 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1266
1267 let slot0_after: Vec<u8> = blob
1269 .read_at(slot0_offset, IoBufMut::zeroed(6))
1270 .await
1271 .unwrap()
1272 .coalesce()
1273 .freeze()
1274 .into();
1275 assert_ne!(
1276 slot0_after, DUMMY_MARKER,
1277 "Slot 0 should have been overwritten with new CRC"
1278 );
1279
1280 let slot1_after: Vec<u8> = blob
1282 .read_at(slot1_offset, IoBufMut::zeroed(6))
1283 .await
1284 .unwrap()
1285 .coalesce()
1286 .freeze()
1287 .into();
1288 assert_eq!(
1289 slot1_before, slot1_after,
1290 "Slot 1 was modified! Protected region violated."
1291 );
1292
1293 let page = blob
1295 .read_at(0, IoBufMut::zeroed(physical_page_size))
1296 .await
1297 .unwrap()
1298 .coalesce();
1299 let crc = read_crc_record_from_page(page.as_ref());
1300 assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1301 });
1302 }
1303
1304 #[test_traced("DEBUG")]
1310 fn test_crc_slot0_protected() {
1311 let executor = deterministic::Runner::default();
1312 executor.start(|context: deterministic::Context| async move {
1313 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1314 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1315 let slot0_offset = PAGE_SIZE.get() as u64;
1316 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1317
1318 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1320 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1321 .await
1322 .unwrap();
1323 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1324 append.sync().await.unwrap();
1325 drop(append);
1326
1327 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1329 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1330 .await
1331 .unwrap();
1332 append
1333 .append(&(11..=30).collect::<Vec<u8>>())
1334 .await
1335 .unwrap();
1336 append.sync().await.unwrap();
1337 drop(append);
1338
1339 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1341 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1342 .await
1343 .unwrap();
1344 append
1345 .append(&(31..=50).collect::<Vec<u8>>())
1346 .await
1347 .unwrap();
1348 append.sync().await.unwrap();
1349 drop(append);
1350
1351 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1353 let page = blob
1354 .read_at(0, IoBufMut::zeroed(physical_page_size))
1355 .await
1356 .unwrap()
1357 .coalesce();
1358 let crc = read_crc_record_from_page(page.as_ref());
1359 assert!(
1360 crc.len1 > crc.len2,
1361 "Slot 0 should be authoritative (len1={} > len2={})",
1362 crc.len1,
1363 crc.len2
1364 );
1365
1366 let slot0_before: Vec<u8> = blob
1368 .read_at(slot0_offset, IoBufMut::zeroed(6))
1369 .await
1370 .unwrap()
1371 .coalesce()
1372 .freeze()
1373 .into();
1374
1375 blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
1377 .await
1378 .unwrap();
1379 blob.sync().await.unwrap();
1380
1381 let slot1_mangled: Vec<u8> = blob
1383 .read_at(slot1_offset, IoBufMut::zeroed(6))
1384 .await
1385 .unwrap()
1386 .coalesce()
1387 .freeze()
1388 .into();
1389 assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1390
1391 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1393 .await
1394 .unwrap();
1395 append
1396 .append(&(51..=70).collect::<Vec<u8>>())
1397 .await
1398 .unwrap();
1399 append.sync().await.unwrap();
1400 drop(append);
1401
1402 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1404
1405 let slot1_after: Vec<u8> = blob
1407 .read_at(slot1_offset, IoBufMut::zeroed(6))
1408 .await
1409 .unwrap()
1410 .coalesce()
1411 .freeze()
1412 .into();
1413 assert_ne!(
1414 slot1_after, DUMMY_MARKER,
1415 "Slot 1 should have been overwritten with new CRC"
1416 );
1417
1418 let slot0_after: Vec<u8> = blob
1420 .read_at(slot0_offset, IoBufMut::zeroed(6))
1421 .await
1422 .unwrap()
1423 .coalesce()
1424 .freeze()
1425 .into();
1426 assert_eq!(
1427 slot0_before, slot0_after,
1428 "Slot 0 was modified! Protected region violated."
1429 );
1430
1431 let page = blob
1433 .read_at(0, IoBufMut::zeroed(physical_page_size))
1434 .await
1435 .unwrap()
1436 .coalesce();
1437 let crc = read_crc_record_from_page(page.as_ref());
1438 assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1439 });
1440 }
1441
1442 #[test_traced("DEBUG")]
1448 fn test_data_prefix_not_overwritten() {
1449 let executor = deterministic::Runner::default();
1450 executor.start(|context: deterministic::Context| async move {
1451 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1452 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1453
1454 let (blob, _) = context
1456 .open("test_partition", b"prefix_test")
1457 .await
1458 .unwrap();
1459 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1460 .await
1461 .unwrap();
1462 let data1: Vec<u8> = (1..=20).collect();
1463 append.append(&data1).await.unwrap();
1464 append.sync().await.unwrap();
1465 drop(append);
1466
1467 let (blob, size) = context
1469 .open("test_partition", b"prefix_test")
1470 .await
1471 .unwrap();
1472 assert_eq!(size, physical_page_size as u64);
1473
1474 let prefix_before: Vec<u8> = blob
1475 .read_at(0, IoBufMut::zeroed(20))
1476 .await
1477 .unwrap()
1478 .coalesce()
1479 .freeze()
1480 .into();
1481
1482 blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
1484 blob.sync().await.unwrap();
1485
1486 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1488 .await
1489 .unwrap();
1490 append
1491 .append(&(21..=40).collect::<Vec<u8>>())
1492 .await
1493 .unwrap();
1494 append.sync().await.unwrap();
1495 drop(append);
1496
1497 let (blob, _) = context
1499 .open("test_partition", b"prefix_test")
1500 .await
1501 .unwrap();
1502
1503 let prefix_after: Vec<u8> = blob
1505 .read_at(0, IoBufMut::zeroed(20))
1506 .await
1507 .unwrap()
1508 .coalesce()
1509 .freeze()
1510 .into();
1511 assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1512
1513 let overwritten: Vec<u8> = blob
1515 .read_at(25, IoBufMut::zeroed(6))
1516 .await
1517 .unwrap()
1518 .coalesce()
1519 .freeze()
1520 .into();
1521 assert_eq!(
1522 overwritten,
1523 vec![26, 27, 28, 29, 30, 31],
1524 "New data should overwrite padding area"
1525 );
1526 });
1527 }
1528
1529 #[test_traced("DEBUG")]
1535 fn test_crc_slot_protection_across_page_boundary() {
1536 let executor = deterministic::Runner::default();
1537 executor.start(|context: deterministic::Context| async move {
1538 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1539 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1540 let slot0_offset = PAGE_SIZE.get() as u64;
1541 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1542
1543 let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1545 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1546 .await
1547 .unwrap();
1548 append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1549 append.sync().await.unwrap();
1550 drop(append);
1551
1552 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1554 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1555 .await
1556 .unwrap();
1557 append
1558 .append(&(51..=80).collect::<Vec<u8>>())
1559 .await
1560 .unwrap();
1561 append.sync().await.unwrap();
1562 drop(append);
1563
1564 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1566 let page = blob
1567 .read_at(0, IoBufMut::zeroed(physical_page_size))
1568 .await
1569 .unwrap()
1570 .coalesce();
1571 let crc = read_crc_record_from_page(page.as_ref());
1572 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1573
1574 let slot1_before: Vec<u8> = blob
1576 .read_at(slot1_offset, IoBufMut::zeroed(6))
1577 .await
1578 .unwrap()
1579 .coalesce()
1580 .freeze()
1581 .into();
1582
1583 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1585 .await
1586 .unwrap();
1587 blob.sync().await.unwrap();
1588
1589 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1591 .await
1592 .unwrap();
1593 append
1594 .append(&(81..=120).collect::<Vec<u8>>())
1595 .await
1596 .unwrap();
1597 append.sync().await.unwrap();
1598 drop(append);
1599
1600 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1602 assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1603
1604 let slot0_after: Vec<u8> = blob
1606 .read_at(slot0_offset, IoBufMut::zeroed(6))
1607 .await
1608 .unwrap()
1609 .coalesce()
1610 .freeze()
1611 .into();
1612 assert_ne!(
1613 slot0_after, DUMMY_MARKER,
1614 "Slot 0 should have full-page CRC"
1615 );
1616
1617 let slot1_after: Vec<u8> = blob
1619 .read_at(slot1_offset, IoBufMut::zeroed(6))
1620 .await
1621 .unwrap()
1622 .coalesce()
1623 .freeze()
1624 .into();
1625 assert_eq!(
1626 slot1_before, slot1_after,
1627 "Slot 1 was modified during page boundary crossing!"
1628 );
1629
1630 let page0 = blob
1632 .read_at(0, IoBufMut::zeroed(physical_page_size))
1633 .await
1634 .unwrap()
1635 .coalesce();
1636 let crc0 = read_crc_record_from_page(page0.as_ref());
1637 assert_eq!(
1638 crc0.len1,
1639 PAGE_SIZE.get(),
1640 "Slot 0 should have full page length"
1641 );
1642
1643 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1645 .await
1646 .unwrap();
1647 assert_eq!(append.size().await, 120);
1648 let all_data: Vec<u8> = append
1649 .read_at(0, IoBufMut::zeroed(120))
1650 .await
1651 .unwrap()
1652 .coalesce()
1653 .freeze()
1654 .into();
1655 let expected: Vec<u8> = (1..=120).collect();
1656 assert_eq!(all_data, expected);
1657 });
1658 }
1659
1660 #[test_traced("DEBUG")]
1669 fn test_crc_fallback_on_corrupted_primary() {
1670 let executor = deterministic::Runner::default();
1671 executor.start(|context: deterministic::Context| async move {
1672 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1673 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1674 let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1676
1677 let (blob, _) = context
1679 .open("test_partition", b"crc_fallback")
1680 .await
1681 .unwrap();
1682 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1683 .await
1684 .unwrap();
1685 let data1: Vec<u8> = (1..=10).collect();
1686 append.append(&data1).await.unwrap();
1687 append.sync().await.unwrap();
1688 drop(append);
1689
1690 let (blob, size) = context
1692 .open("test_partition", b"crc_fallback")
1693 .await
1694 .unwrap();
1695 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1696 .await
1697 .unwrap();
1698 append
1699 .append(&(11..=30).collect::<Vec<u8>>())
1700 .await
1701 .unwrap();
1702 append.sync().await.unwrap();
1703 drop(append);
1704
1705 let (blob, size) = context
1707 .open("test_partition", b"crc_fallback")
1708 .await
1709 .unwrap();
1710 assert_eq!(size, physical_page_size as u64);
1711
1712 let page = blob
1713 .read_at(0, IoBufMut::zeroed(physical_page_size))
1714 .await
1715 .unwrap()
1716 .coalesce();
1717 let crc = read_crc_record_from_page(page.as_ref());
1718 assert!(
1719 crc.len2 > crc.len1,
1720 "Slot 1 should be authoritative (len2={} > len1={})",
1721 crc.len2,
1722 crc.len1
1723 );
1724 assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1725 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1726
1727 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1729 .await
1730 .unwrap();
1731 assert_eq!(append.size().await, 30);
1732 let all_data: Vec<u8> = append
1733 .read_at(0, IoBufMut::zeroed(30))
1734 .await
1735 .unwrap()
1736 .coalesce()
1737 .freeze()
1738 .into();
1739 let expected: Vec<u8> = (1..=30).collect();
1740 assert_eq!(all_data, expected);
1741 drop(append);
1742
1743 blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1746 .await
1747 .unwrap();
1748 blob.sync().await.unwrap();
1749
1750 let page = blob
1752 .read_at(0, IoBufMut::zeroed(physical_page_size))
1753 .await
1754 .unwrap()
1755 .coalesce();
1756 let crc = read_crc_record_from_page(page.as_ref());
1757 assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1758 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1759
1760 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1762 .await
1763 .unwrap();
1764
1765 assert_eq!(
1767 append.size().await,
1768 10,
1769 "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1770 );
1771
1772 let fallback_data: Vec<u8> = append
1774 .read_at(0, IoBufMut::zeroed(10))
1775 .await
1776 .unwrap()
1777 .coalesce()
1778 .freeze()
1779 .into();
1780 assert_eq!(
1781 fallback_data, data1,
1782 "Fallback data should match original 10 bytes"
1783 );
1784
1785 let result = append.read_at(0, IoBufMut::zeroed(11)).await;
1787 assert!(result.is_err(), "Reading beyond fallback size should fail");
1788 });
1789 }
1790
1791 #[test_traced("DEBUG")]
1803 fn test_non_last_page_rejects_partial_fallback() {
1804 let executor = deterministic::Runner::default();
1805 executor.start(|context: deterministic::Context| async move {
1806 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1807 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1808 let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1810
1811 let (blob, _) = context
1813 .open("test_partition", b"non_last_page")
1814 .await
1815 .unwrap();
1816 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1817 .await
1818 .unwrap();
1819 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1820 append.sync().await.unwrap();
1821 drop(append);
1822
1823 let (blob, size) = context
1825 .open("test_partition", b"non_last_page")
1826 .await
1827 .unwrap();
1828 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1829 .await
1830 .unwrap();
1831 append
1833 .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1834 .await
1835 .unwrap();
1836 append.sync().await.unwrap();
1837 drop(append);
1838
1839 let (blob, size) = context
1841 .open("test_partition", b"non_last_page")
1842 .await
1843 .unwrap();
1844 let page = blob
1845 .read_at(0, IoBufMut::zeroed(physical_page_size))
1846 .await
1847 .unwrap()
1848 .coalesce();
1849 let crc = read_crc_record_from_page(page.as_ref());
1850 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1851 assert_eq!(
1852 crc.len2,
1853 PAGE_SIZE.get(),
1854 "Slot 1 should have len=103 (full page)"
1855 );
1856 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1857
1858 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1860 .await
1861 .unwrap();
1862 append
1864 .append(&(104..=113).collect::<Vec<u8>>())
1865 .await
1866 .unwrap();
1867 append.sync().await.unwrap();
1868 drop(append);
1869
1870 let (blob, size) = context
1872 .open("test_partition", b"non_last_page")
1873 .await
1874 .unwrap();
1875 assert_eq!(
1876 size,
1877 (physical_page_size * 2) as u64,
1878 "Should have 2 physical pages"
1879 );
1880
1881 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1883 .await
1884 .unwrap();
1885 assert_eq!(append.size().await, 113);
1886 let all_data: Vec<u8> = append
1887 .read_at(0, IoBufMut::zeroed(113))
1888 .await
1889 .unwrap()
1890 .coalesce()
1891 .freeze()
1892 .into();
1893 let expected: Vec<u8> = (1..=113).collect();
1894 assert_eq!(all_data, expected);
1895 drop(append);
1896
1897 blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1899 .await
1900 .unwrap();
1901 blob.sync().await.unwrap();
1902
1903 let page = blob
1905 .read_at(0, IoBufMut::zeroed(physical_page_size))
1906 .await
1907 .unwrap()
1908 .coalesce();
1909 let crc = read_crc_record_from_page(page.as_ref());
1910 assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1911 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1912 assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1914
1915 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1921 .await
1922 .unwrap();
1923
1924 assert_eq!(append.size().await, 113);
1927
1928 let result = append.read_at(0, IoBufMut::zeroed(10)).await;
1931 assert!(
1932 result.is_err(),
1933 "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1934 result
1935 );
1936 drop(append);
1937
1938 let (blob, size) = context
1940 .open("test_partition", b"non_last_page")
1941 .await
1942 .unwrap();
1943 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1944 .await
1945 .unwrap();
1946 let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1947
1948 let result = replay.ensure(1).await;
1950 assert!(
1951 result.is_err(),
1952 "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1953 result
1954 );
1955 });
1956 }
1957
1958 #[test]
1959 fn test_resize_shrink_validates_crc() {
1960 let executor = deterministic::Runner::default();
1963
1964 executor.start(|context| async move {
1965 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1966 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1967
1968 let (blob, size) = context
1969 .open("test_partition", b"resize_crc_test")
1970 .await
1971 .unwrap();
1972
1973 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1974 .await
1975 .unwrap();
1976
1977 let data: Vec<u8> = (0..=249).collect();
1980 append.append(&data).await.unwrap();
1981 append.sync().await.unwrap();
1982 assert_eq!(append.size().await, 250);
1983 drop(append);
1984
1985 let (blob, size) = context
1987 .open("test_partition", b"resize_crc_test")
1988 .await
1989 .unwrap();
1990 assert_eq!(size as usize, physical_page_size * 3);
1991
1992 let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
1994 blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
1995 .await
1996 .unwrap();
1997 blob.sync().await.unwrap();
1998
1999 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2002 .await
2003 .unwrap();
2004 assert_eq!(append.size().await, 250);
2005
2006 let result = append.resize(150).await;
2010 assert!(
2011 matches!(result, Err(crate::Error::InvalidChecksum)),
2012 "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
2013 result
2014 );
2015 });
2016 }
2017
2018 #[test]
2019 fn test_immutable_blob_rejects_append_and_resize() {
2020 let executor = deterministic::Runner::default();
2021
2022 executor.start(|context| async move {
2023 const PAGE_SIZE: NonZeroU16 = NZU16!(64);
2024 const BUFFER_SIZE: usize = 256;
2025
2026 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(4));
2027
2028 let (blob, size) = context
2029 .open("test_partition", b"immutable_test")
2030 .await
2031 .unwrap();
2032
2033 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2034 .await
2035 .unwrap();
2036
2037 append.append(&[1, 2, 3, 4, 5]).await.unwrap();
2039 append.sync().await.unwrap();
2040 assert_eq!(append.size().await, 5);
2041
2042 append.to_immutable().await.unwrap();
2044 assert!(append.is_immutable().await);
2045
2046 let result = append.append(&[6, 7, 8]).await;
2048 assert!(
2049 matches!(result, Err(crate::Error::ImmutableBlob)),
2050 "Expected ImmutableBlob error from append(), got: {:?}",
2051 result
2052 );
2053
2054 let result = append.resize(100).await;
2056 assert!(
2057 matches!(result, Err(crate::Error::ImmutableBlob)),
2058 "Expected ImmutableBlob error from resize(), got: {:?}",
2059 result
2060 );
2061
2062 let result = append.sync().await;
2064 assert!(
2065 result.is_ok(),
2066 "sync() on immutable blob should return Ok, got: {:?}",
2067 result
2068 );
2069
2070 let data: Vec<u8> = append
2072 .read_at(0, IoBufMut::zeroed(5))
2073 .await
2074 .unwrap()
2075 .coalesce()
2076 .freeze()
2077 .into();
2078 assert_eq!(data, vec![1, 2, 3, 4, 5]);
2079 });
2080 }
2081
2082 #[test]
2083 fn test_corrupted_crc_len_too_large() {
2084 let executor = deterministic::Runner::default();
2085
2086 executor.start(|context| async move {
2087 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2088 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2089
2090 let (blob, size) = context
2092 .open("test_partition", b"crc_len_test")
2093 .await
2094 .unwrap();
2095
2096 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2097 .await
2098 .unwrap();
2099
2100 append.append(&[0x42; 50]).await.unwrap();
2101 append.sync().await.unwrap();
2102 drop(append);
2103
2104 let (blob, size) = context
2106 .open("test_partition", b"crc_len_test")
2107 .await
2108 .unwrap();
2109 assert_eq!(size as usize, physical_page_size);
2110
2111 let crc_offset = PAGE_SIZE.get() as u64;
2113
2114 let bad_crc_record: [u8; 12] = [
2117 0xFF, 0xFF, 0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
2122 blob.write_at(crc_offset, bad_crc_record.to_vec())
2123 .await
2124 .unwrap();
2125 blob.sync().await.unwrap();
2126
2127 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2129
2130 match result {
2133 Ok(append) => {
2134 let recovered_size = append.size().await;
2136 assert_eq!(
2137 recovered_size, 0,
2138 "Corrupted page should be truncated, size should be 0"
2139 );
2140 }
2141 Err(e) => {
2142 assert!(
2144 matches!(e, crate::Error::InvalidChecksum),
2145 "Expected InvalidChecksum error, got: {:?}",
2146 e
2147 );
2148 }
2149 }
2150 });
2151 }
2152
2153 #[test]
2154 fn test_corrupted_crc_both_slots_len_too_large() {
2155 let executor = deterministic::Runner::default();
2156
2157 executor.start(|context| async move {
2158 let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2159
2160 let (blob, size) = context
2162 .open("test_partition", b"crc_both_bad")
2163 .await
2164 .unwrap();
2165
2166 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2167 .await
2168 .unwrap();
2169
2170 append.append(&[0x42; 50]).await.unwrap();
2171 append.sync().await.unwrap();
2172 drop(append);
2173
2174 let (blob, size) = context
2176 .open("test_partition", b"crc_both_bad")
2177 .await
2178 .unwrap();
2179
2180 let crc_offset = PAGE_SIZE.get() as u64;
2181
2182 let bad_crc_record: [u8; 12] = [
2184 0x01, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0x02, 0x00, 0xCA, 0xFE, 0xBA, 0xBE, ];
2189 blob.write_at(crc_offset, bad_crc_record.to_vec())
2190 .await
2191 .unwrap();
2192 blob.sync().await.unwrap();
2193
2194 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2196
2197 match result {
2198 Ok(append) => {
2199 assert_eq!(append.size().await, 0);
2201 }
2202 Err(e) => {
2203 assert!(
2204 matches!(e, crate::Error::InvalidChecksum),
2205 "Expected InvalidChecksum, got: {:?}",
2206 e
2207 );
2208 }
2209 }
2210 });
2211 }
2212}