1use super::read::{PageReader, Replay};
30use crate::{
31 buffer::{
32 pool::{Checksum, PoolRef, CHECKSUM_SIZE},
33 tip::Buffer,
34 },
35 Blob, Error, RwLock, RwLockWriteGuard,
36};
37use commonware_cryptography::Crc32;
38use commonware_utils::StableBuf;
39use std::{
40 num::{NonZeroU16, NonZeroUsize},
41 sync::Arc,
42};
43use tracing::warn;
44
45#[derive(Clone, Copy)]
47enum ProtectedCrc {
48 First,
49 Second,
50}
51
52#[derive(Clone)]
54struct BlobState<B: Blob> {
55 blob: B,
56
57 current_page: u64,
59
60 partial_page_state: Option<Checksum>,
63}
64
65#[derive(Clone)]
68pub struct Append<B: Blob> {
69 blob_state: Arc<RwLock<BlobState<B>>>,
71
72 id: u64,
74
75 pool_ref: PoolRef,
77
78 buffer: Arc<RwLock<Buffer>>,
81}
82
83fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
86 let floor = page_size as usize * 2;
87 if capacity < floor {
88 warn!(
89 floor,
90 "requested buffer capacity is too low, increasing it to floor"
91 );
92 floor
93 } else {
94 capacity
95 }
96}
97
98impl<B: Blob> Append<B> {
99 pub async fn new(
104 blob: B,
105 original_blob_size: u64,
106 capacity: usize,
107 pool_ref: PoolRef,
108 ) -> Result<Self, Error> {
109 let (partial_page_state, pages, invalid_data_found) =
110 Self::read_last_valid_page(&blob, original_blob_size, pool_ref.page_size()).await?;
111 if invalid_data_found {
112 let new_blob_size = pages * (pool_ref.page_size() + CHECKSUM_SIZE);
114 warn!(
115 original_blob_size,
116 new_blob_size, "truncating blob to remove invalid data"
117 );
118 blob.resize(new_blob_size).await?;
119 blob.sync().await?;
120 }
121
122 let capacity = capacity_with_floor(capacity, pool_ref.page_size());
123
124 let (blob_state, data) = match partial_page_state {
125 Some((mut partial_page, crc_record)) => {
126 partial_page.reserve(capacity - partial_page.len());
128 (
129 BlobState {
130 blob,
131 current_page: pages - 1,
132 partial_page_state: Some(crc_record),
133 },
134 partial_page,
135 )
136 }
137 None => (
138 BlobState {
139 blob,
140 current_page: pages,
141 partial_page_state: None,
142 },
143 Vec::with_capacity(capacity),
144 ),
145 };
146
147 let buffer = Buffer {
148 offset: blob_state.current_page * pool_ref.page_size(),
149 data,
150 capacity,
151 immutable: false,
152 };
153
154 Ok(Self {
155 blob_state: Arc::new(RwLock::new(blob_state)),
156 id: pool_ref.next_id().await,
157 pool_ref,
158 buffer: Arc::new(RwLock::new(buffer)),
159 })
160 }
161
162 pub async fn new_immutable(
168 blob: B,
169 blob_size: u64,
170 capacity: usize,
171 pool_ref: PoolRef,
172 ) -> Result<Self, Error> {
173 let (partial_page_state, pages, invalid_data_found) =
174 Self::read_last_valid_page(&blob, blob_size, pool_ref.page_size()).await?;
175 if invalid_data_found {
176 return Err(Error::InvalidChecksum);
178 }
179
180 let capacity = capacity_with_floor(capacity, pool_ref.page_size());
181
182 let (blob_state, data) = match partial_page_state {
183 Some((mut partial_page, crc_record)) => {
184 partial_page.shrink_to_fit();
186 (
187 BlobState {
188 blob,
189 current_page: pages - 1,
190 partial_page_state: Some(crc_record),
191 },
192 partial_page,
193 )
194 }
195 None => (
196 BlobState {
197 blob,
198 current_page: pages,
199 partial_page_state: None,
200 },
201 vec![],
202 ),
203 };
204 let buffer = Buffer {
205 data,
206 capacity,
207 offset: blob_state.current_page * pool_ref.page_size(),
208 immutable: true,
209 };
210
211 Ok(Self {
212 blob_state: Arc::new(RwLock::new(blob_state)),
213 id: pool_ref.next_id().await,
214 pool_ref,
215 buffer: Arc::new(RwLock::new(buffer)),
216 })
217 }
218
219 pub async fn is_immutable(&self) -> bool {
221 let buffer = self.buffer.read().await;
222
223 buffer.immutable
224 }
225
226 pub async fn to_immutable(&self) -> Result<(), Error> {
230 let mut buf_guard = self.buffer.write().await;
233 if buf_guard.immutable {
234 return Ok(());
235 }
236 buf_guard.immutable = true;
237 self.flush_internal(buf_guard, true).await?;
238
239 {
242 let mut buf_guard = self.buffer.write().await;
243 buf_guard.data.shrink_to_fit();
244 }
245
246 let blob_state = self.blob_state.read().await;
249 blob_state.blob.sync().await
250 }
251
252 pub async fn to_mutable(&self) {
254 let mut buffer = self.buffer.write().await;
255 if !buffer.immutable {
256 return;
257 }
258 buffer.immutable = false;
259 }
260
261 async fn read_last_valid_page(
279 blob: &B,
280 blob_size: u64,
281 page_size: u64,
282 ) -> Result<(Option<(Vec<u8>, Checksum)>, u64, bool), Error> {
283 let physical_page_size = page_size + CHECKSUM_SIZE;
284 let partial_bytes = blob_size % physical_page_size;
285 let mut last_page_end = blob_size - partial_bytes;
286
287 let mut invalid_data_found = partial_bytes != 0;
290
291 while last_page_end != 0 {
292 let page_start = last_page_end - physical_page_size;
294 let buf = vec![0; physical_page_size as usize];
295 let buf = blob.read_at(buf, page_start).await?;
296
297 match Checksum::validate_page(buf.as_ref()) {
298 Some(crc_record) => {
299 let (len, _) = crc_record.get_crc();
301 let len = len as u64;
302 if len != page_size {
303 let buf: Vec<u8> = buf.into();
305 let logical_bytes = buf[..(len as usize)].to_vec();
306 return Ok((
307 Some((logical_bytes, crc_record)),
308 last_page_end / physical_page_size,
309 invalid_data_found,
310 ));
311 }
312 return Ok((None, last_page_end / physical_page_size, invalid_data_found));
314 }
315 None => {
316 last_page_end = page_start;
318 invalid_data_found = true;
319 }
320 }
321 }
322
323 Ok((None, 0, invalid_data_found))
325 }
326
327 pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
333 let mut buffer = self.buffer.write().await;
334 if buffer.immutable {
335 return Err(Error::ImmutableBlob);
336 }
337
338 if !buffer.append(buf) {
339 return Ok(());
340 }
341
342 self.flush_internal(buffer, false).await
344 }
345
346 async fn flush_internal(
350 &self,
351 mut buf_guard: RwLockWriteGuard<'_, Buffer>,
352 write_partial_page: bool,
353 ) -> Result<(), Error> {
354 let buffer = &mut *buf_guard;
355
356 let remaining_byte_count = self
359 .pool_ref
360 .cache(self.id, &buffer.data, buffer.offset)
361 .await;
362
363 let old_partial_page_state = {
367 let blob_state = self.blob_state.read().await;
368 blob_state.partial_page_state.clone()
369 };
370
371 let (physical_pages, partial_page_state) = self.to_physical_pages(
374 &*buffer,
375 write_partial_page,
376 old_partial_page_state.as_ref(),
377 );
378
379 if physical_pages.is_empty() {
381 return Ok(());
382 }
383
384 let bytes_to_drain = buffer.data.len() - remaining_byte_count;
387 buffer.data.drain(0..bytes_to_drain);
388 buffer.offset += bytes_to_drain as u64;
389 let new_offset = buffer.offset;
390
391 let mut blob_state = self.blob_state.write().await;
394
395 drop(buf_guard);
398
399 let logical_page_size = self.pool_ref.page_size() as usize;
400 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
401 let write_at_offset = blob_state.current_page * physical_page_size as u64;
402
403 let total_pages_in_buffer = physical_pages.len() / physical_page_size;
407 let full_pages_written = if partial_page_state.is_some() {
408 total_pages_in_buffer.saturating_sub(1)
409 } else {
410 total_pages_in_buffer
411 };
412
413 let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
415
416 blob_state.current_page += full_pages_written as u64;
420 blob_state.partial_page_state = partial_page_state;
421
422 assert_eq!(
424 blob_state.current_page * self.pool_ref.page_size(),
425 new_offset
426 );
427
428 if let Some((prefix_len, protected_crc)) = protected_regions {
431 match protected_crc {
432 ProtectedCrc::First => {
433 if prefix_len < logical_page_size {
436 blob_state
437 .blob
438 .write_at(
439 physical_pages[prefix_len..logical_page_size].to_vec(),
440 write_at_offset + prefix_len as u64,
441 )
442 .await?;
443 }
444 let second_crc_start = logical_page_size + 6;
446 blob_state
447 .blob
448 .write_at(
449 physical_pages[second_crc_start..].to_vec(),
450 write_at_offset + second_crc_start as u64,
451 )
452 .await?;
453 }
454 ProtectedCrc::Second => {
455 let first_crc_end = logical_page_size + 6;
458 if prefix_len < first_crc_end {
459 blob_state
460 .blob
461 .write_at(
462 physical_pages[prefix_len..first_crc_end].to_vec(),
463 write_at_offset + prefix_len as u64,
464 )
465 .await?;
466 }
467 if physical_pages.len() > physical_page_size {
469 blob_state
470 .blob
471 .write_at(
472 physical_pages[physical_page_size..].to_vec(),
473 write_at_offset + physical_page_size as u64,
474 )
475 .await?;
476 }
477 }
478 }
479 } else {
480 blob_state
482 .blob
483 .write_at(physical_pages, write_at_offset)
484 .await?;
485 }
486
487 Ok(())
488 }
489
490 pub async fn size(&self) -> u64 {
492 let buffer = self.buffer.read().await;
493 buffer.size()
494 }
495
496 pub async fn read_up_to(
505 &self,
506 buf: impl Into<StableBuf> + Send,
507 logical_offset: u64,
508 ) -> Result<(StableBuf, usize), Error> {
509 let mut buf = buf.into();
510 if buf.is_empty() {
511 return Ok((buf, 0));
512 }
513 let blob_size = self.size().await;
514 let available = (blob_size.saturating_sub(logical_offset) as usize).min(buf.len());
515 if available == 0 {
516 return Err(Error::BlobInsufficientLength);
517 }
518 if buf.len() > available {
519 buf.truncate(available);
520 }
521 self.read_into(buf.as_mut(), logical_offset).await?;
522
523 Ok((buf, available))
524 }
525
526 pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
531 let end_offset = logical_offset
533 .checked_add(buf.len() as u64)
534 .ok_or(Error::OffsetOverflow)?;
535
536 let buffer = self.buffer.read().await;
538
539 if end_offset > buffer.size() {
541 return Err(Error::BlobInsufficientLength);
542 }
543
544 let remaining = buffer.extract(buf.as_mut(), logical_offset);
546
547 drop(buffer);
549
550 if remaining == 0 {
551 return Ok(());
552 }
553
554 let cached = self
557 .pool_ref
558 .read_cached(self.id, &mut buf[..remaining], logical_offset)
559 .await;
560
561 if cached == remaining {
562 return Ok(());
564 }
565
566 let blob_guard = self.blob_state.read().await;
569
570 let uncached_offset = logical_offset + cached as u64;
572 let uncached_len = remaining - cached;
573 self.pool_ref
574 .read(
575 &blob_guard.blob,
576 self.id,
577 &mut buf[cached..cached + uncached_len],
578 uncached_offset,
579 )
580 .await
581 }
582
583 fn identify_protected_regions(
594 partial_page_state: Option<&Checksum>,
595 ) -> Option<(usize, ProtectedCrc)> {
596 let crc_record = partial_page_state?;
597 let (old_len, _) = crc_record.get_crc();
598 let protected_crc = if crc_record.len1 >= crc_record.len2 {
600 ProtectedCrc::First
601 } else {
602 ProtectedCrc::Second
603 };
604 Some((old_len as usize, protected_crc))
605 }
606
607 fn to_physical_pages(
619 &self,
620 buffer: &Buffer,
621 include_partial_page: bool,
622 old_crc_record: Option<&Checksum>,
623 ) -> (Vec<u8>, Option<Checksum>) {
624 let logical_page_size = self.pool_ref.page_size() as usize;
625 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
626 let pages_to_write = buffer.data.len() / logical_page_size;
627 let mut write_buffer = Vec::with_capacity(pages_to_write * physical_page_size);
628
629 for page in 0..pages_to_write {
631 let start_read_idx = page * logical_page_size;
632 let end_read_idx = start_read_idx + logical_page_size;
633 let logical_page = &buffer.data[start_read_idx..end_read_idx];
634 write_buffer.extend_from_slice(logical_page);
635
636 let crc = Crc32::checksum(logical_page);
637 let logical_page_size_u16 =
638 u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
639
640 let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
643 Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
644 } else {
645 Checksum::new(logical_page_size_u16, crc)
646 };
647 write_buffer.extend_from_slice(&crc_record.to_bytes());
648 }
649
650 if !include_partial_page {
651 return (write_buffer, None);
652 }
653
654 let partial_page = &buffer.data[pages_to_write * logical_page_size..];
655 if partial_page.is_empty() {
656 return (write_buffer, None);
658 }
659
660 if pages_to_write == 0 {
663 if let Some(old_crc) = old_crc_record {
664 let (old_len, _) = old_crc.get_crc();
665 if partial_page.len() == old_len as usize {
666 return (write_buffer, None);
667 }
668 }
669 }
670 write_buffer.extend_from_slice(partial_page);
671 let partial_len = partial_page.len();
672 let crc = Crc32::checksum(partial_page);
673
674 write_buffer.resize(write_buffer.len() + (logical_page_size - partial_len), 0);
676
677 let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
680 Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
681 } else {
682 Checksum::new(partial_len as u16, crc)
683 };
684
685 write_buffer.extend_from_slice(&crc_record.to_bytes());
686
687 (write_buffer, Some(crc_record))
690 }
691
692 const fn build_crc_record_preserving_old(
695 new_len: u16,
696 new_crc: u32,
697 old_crc: &Checksum,
698 ) -> Checksum {
699 let (old_len, old_crc_val) = old_crc.get_crc();
700 if old_crc.len1 >= old_crc.len2 {
702 Checksum {
704 len1: old_len,
705 crc1: old_crc_val,
706 len2: new_len,
707 crc2: new_crc,
708 }
709 } else {
710 Checksum {
712 len1: new_len,
713 crc1: new_crc,
714 len2: old_len,
715 crc2: old_crc_val,
716 }
717 }
718 }
719
720 pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
725 let logical_page_size = self.pool_ref.page_size();
726 let logical_page_size_nz =
727 NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
728
729 {
731 let buf_guard = self.buffer.write().await;
732 if !buf_guard.immutable {
733 self.flush_internal(buf_guard, true).await?;
734 }
735 }
736
737 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
738
739 let prefetch_pages = buffer_size.get() / physical_page_size as usize;
741 let prefetch_pages = prefetch_pages.max(1); let blob_guard = self.blob_state.read().await;
743
744 let (physical_blob_size, logical_blob_size) =
746 blob_guard.partial_page_state.as_ref().map_or_else(
747 || {
748 let physical = physical_page_size * blob_guard.current_page;
750 let logical = logical_page_size * blob_guard.current_page;
751 (physical, logical)
752 },
753 |crc_record| {
754 let (partial_len, _) = crc_record.get_crc();
756 let partial_len = partial_len as u64;
757 let physical = physical_page_size * (blob_guard.current_page + 1);
759 let logical = logical_page_size * blob_guard.current_page + partial_len;
761 (physical, logical)
762 },
763 );
764
765 let reader = PageReader::new(
766 blob_guard.blob.clone(),
767 physical_blob_size,
768 logical_blob_size,
769 prefetch_pages,
770 logical_page_size_nz,
771 );
772 Ok(Replay::new(reader))
773 }
774}
775
776impl<B: Blob> Blob for Append<B> {
777 async fn read_at(
778 &self,
779 buf: impl Into<StableBuf> + Send,
780 logical_offset: u64,
781 ) -> Result<StableBuf, Error> {
782 let mut buf = buf.into();
783 self.read_into(buf.as_mut(), logical_offset).await?;
784 Ok(buf)
785 }
786
787 async fn sync(&self) -> Result<(), Error> {
788 let buf_guard = self.buffer.write().await;
791 if buf_guard.immutable {
792 return Ok(());
793 }
794 self.flush_internal(buf_guard, true).await?;
795
796 let blob_state = self.blob_state.read().await;
799 blob_state.blob.sync().await
800 }
801
802 async fn write_at(&self, _buf: impl Into<StableBuf> + Send, _offset: u64) -> Result<(), Error> {
804 unimplemented!("append-only blob type does not support write_at")
807 }
808
809 async fn resize(&self, size: u64) -> Result<(), Error> {
820 let current_size = self.size().await;
821
822 if size > current_size {
824 let zeros_needed = (size - current_size) as usize;
825 let zeros = vec![0u8; zeros_needed];
826 self.append(&zeros).await?;
827 return Ok(());
828 }
829
830 let logical_page_size = self.pool_ref.page_size();
837 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
838
839 self.sync().await?;
841
842 let mut buf_guard = self.buffer.write().await;
844 if buf_guard.immutable {
845 return Err(Error::ImmutableBlob);
846 }
847 let mut blob_guard = self.blob_state.write().await;
848
849 let full_pages = size / logical_page_size;
851 let partial_bytes = size % logical_page_size;
852 let new_physical_size = if partial_bytes > 0 {
853 (full_pages + 1) * physical_page_size
856 } else {
857 full_pages * physical_page_size
859 };
860
861 blob_guard.blob.resize(new_physical_size).await?;
863 blob_guard.partial_page_state = None;
864
865 blob_guard.current_page = full_pages;
875 buf_guard.offset = full_pages * logical_page_size;
876
877 if partial_bytes > 0 {
878 let page_data =
880 super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
881
882 if (page_data.len() as u64) < partial_bytes {
884 return Err(Error::InvalidChecksum);
885 }
886
887 buf_guard.data = page_data.as_ref()[..partial_bytes as usize].to_vec();
888 } else {
889 buf_guard.data = vec![];
891 }
892
893 Ok(())
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900 use crate::{deterministic, Runner as _, Storage as _};
901 use commonware_codec::ReadExt;
902 use commonware_macros::test_traced;
903 use commonware_utils::{NZUsize, NZU16};
904 use std::num::NonZeroU16;
905
906 const PAGE_SIZE: NonZeroU16 = NZU16!(103); const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
908
909 #[test_traced("DEBUG")]
910 fn test_append_crc_empty() {
911 let executor = deterministic::Runner::default();
912 executor.start(|context: deterministic::Context| async move {
913 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
915 assert_eq!(blob_size, 0);
916
917 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
919
920 let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
922 .await
923 .unwrap();
924
925 assert_eq!(append.size().await, 0);
927
928 append.sync().await.unwrap();
930 drop(append);
931
932 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
933 assert_eq!(blob_size, 0); let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
936 .await
937 .unwrap();
938
939 assert_eq!(append.size().await, 0);
940 });
941 }
942
943 #[test_traced("DEBUG")]
944 fn test_append_crc_basic() {
945 let executor = deterministic::Runner::default();
946 executor.start(|context: deterministic::Context| async move {
947 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
949 assert_eq!(blob_size, 0);
950
951 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
953
954 let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
956 .await
957 .unwrap();
958
959 assert_eq!(append.size().await, 0);
961
962 let data = vec![1, 2, 3, 4, 5];
964 append.append(&data).await.unwrap();
965
966 assert_eq!(append.size().await, 5);
968
969 let more_data = vec![6, 7, 8, 9, 10];
971 append.append(&more_data).await.unwrap();
972
973 assert_eq!(append.size().await, 10);
975
976 let read_buf = vec![0u8; 5];
978 let read_buf = append.read_at(read_buf, 0).await.unwrap();
979 assert_eq!(read_buf.as_ref(), &data[..]);
980
981 let read_buf = vec![0u8; 5];
983 let read_buf = append.read_at(read_buf, 5).await.unwrap();
984 assert_eq!(read_buf.as_ref(), &more_data[..]);
985
986 let read_buf = vec![0u8; 10];
988 let read_buf = append.read_at(read_buf, 0).await.unwrap();
989 assert_eq!(read_buf.as_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
990
991 append.sync().await.unwrap();
994 drop(append);
995
996 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
997 assert_eq!(blob_size, 115);
999 let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
1000 .await
1001 .unwrap();
1002 assert_eq!(append.size().await, 10); let spanning_data: Vec<u8> = (11..=110).collect();
1008 append.append(&spanning_data).await.unwrap();
1009 assert_eq!(append.size().await, 110);
1010
1011 let read_buf = vec![0u8; 100];
1013 let read_buf = append.read_at(read_buf, 10).await.unwrap();
1014 assert_eq!(read_buf.as_ref(), &spanning_data[..]);
1015
1016 let read_buf = vec![0u8; 110];
1018 let read_buf = append.read_at(read_buf, 0).await.unwrap();
1019 let expected: Vec<u8> = (1..=110).collect();
1020 assert_eq!(read_buf.as_ref(), &expected[..]);
1021
1022 append.sync().await.unwrap();
1024 drop(append);
1025
1026 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1027 assert_eq!(blob_size, 230);
1029 let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
1030 .await
1031 .unwrap();
1032 assert_eq!(append.size().await, 110);
1033
1034 let boundary_data: Vec<u8> = (111..=206).collect();
1038 assert_eq!(boundary_data.len(), 96);
1039 append.append(&boundary_data).await.unwrap();
1040 assert_eq!(append.size().await, 206);
1041
1042 let read_buf = vec![0u8; 206];
1044 let read_buf = append.read_at(read_buf, 0).await.unwrap();
1045 let expected: Vec<u8> = (1..=206).collect();
1046 assert_eq!(read_buf.as_ref(), &expected[..]);
1047
1048 append.sync().await.unwrap();
1050 drop(append);
1051
1052 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1053 assert_eq!(blob_size, 230);
1055 let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref)
1056 .await
1057 .unwrap();
1058 assert_eq!(append.size().await, 206);
1059
1060 let read_buf = vec![0u8; 206];
1062 let read_buf = append.read_at(read_buf, 0).await.unwrap();
1063 assert_eq!(read_buf.as_ref(), &expected[..]);
1064 });
1065 }
1066
1067 fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1069 let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1070 Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1071 }
1072
1073 const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1076
1077 #[test]
1078 fn test_identify_protected_regions_equal_lengths() {
1079 let record = Checksum {
1081 len1: 50,
1082 crc1: 0xAAAAAAAA,
1083 len2: 50,
1084 crc2: 0xBBBBBBBB,
1085 };
1086
1087 let result =
1088 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1089 assert!(result.is_some());
1090 let (prefix_len, protected_crc) = result.unwrap();
1091 assert_eq!(prefix_len, 50);
1092 assert!(
1093 matches!(protected_crc, ProtectedCrc::First),
1094 "First CRC should be protected when lengths are equal"
1095 );
1096 }
1097
1098 #[test]
1099 fn test_identify_protected_regions_len1_larger() {
1100 let record = Checksum {
1102 len1: 100,
1103 crc1: 0xAAAAAAAA,
1104 len2: 50,
1105 crc2: 0xBBBBBBBB,
1106 };
1107
1108 let result =
1109 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1110 assert!(result.is_some());
1111 let (prefix_len, protected_crc) = result.unwrap();
1112 assert_eq!(prefix_len, 100);
1113 assert!(
1114 matches!(protected_crc, ProtectedCrc::First),
1115 "First CRC should be protected when len1 > len2"
1116 );
1117 }
1118
1119 #[test]
1120 fn test_identify_protected_regions_len2_larger() {
1121 let record = Checksum {
1123 len1: 50,
1124 crc1: 0xAAAAAAAA,
1125 len2: 100,
1126 crc2: 0xBBBBBBBB,
1127 };
1128
1129 let result =
1130 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1131 assert!(result.is_some());
1132 let (prefix_len, protected_crc) = result.unwrap();
1133 assert_eq!(prefix_len, 100);
1134 assert!(
1135 matches!(protected_crc, ProtectedCrc::Second),
1136 "Second CRC should be protected when len2 > len1"
1137 );
1138 }
1139
1140 #[test_traced("DEBUG")]
1146 fn test_crc_slot1_protected() {
1147 let executor = deterministic::Runner::default();
1148 executor.start(|context: deterministic::Context| async move {
1149 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1150 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1151 let slot0_offset = PAGE_SIZE.get() as u64;
1152 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1153
1154 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1156 let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1157 .await
1158 .unwrap();
1159 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1160 append.sync().await.unwrap();
1161 drop(append);
1162
1163 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1165 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1166 .await
1167 .unwrap();
1168 append
1169 .append(&(11..=30).collect::<Vec<u8>>())
1170 .await
1171 .unwrap();
1172 append.sync().await.unwrap();
1173 drop(append);
1174
1175 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1177 let page = blob
1178 .read_at(vec![0u8; physical_page_size], 0)
1179 .await
1180 .unwrap();
1181 let crc = read_crc_record_from_page(page.as_ref());
1182 assert!(
1183 crc.len2 > crc.len1,
1184 "Slot 1 should be authoritative (len2={} > len1={})",
1185 crc.len2,
1186 crc.len1
1187 );
1188
1189 let slot1_before: Vec<u8> = blob
1191 .read_at(vec![0u8; 6], slot1_offset)
1192 .await
1193 .unwrap()
1194 .into();
1195
1196 blob.write_at(DUMMY_MARKER.to_vec(), slot0_offset)
1198 .await
1199 .unwrap();
1200 blob.sync().await.unwrap();
1201
1202 let slot0_mangled: Vec<u8> = blob
1204 .read_at(vec![0u8; 6], slot0_offset)
1205 .await
1206 .unwrap()
1207 .into();
1208 assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1209
1210 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1212 .await
1213 .unwrap();
1214 append
1215 .append(&(31..=50).collect::<Vec<u8>>())
1216 .await
1217 .unwrap();
1218 append.sync().await.unwrap();
1219 drop(append);
1220
1221 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1223
1224 let slot0_after: Vec<u8> = blob
1226 .read_at(vec![0u8; 6], slot0_offset)
1227 .await
1228 .unwrap()
1229 .into();
1230 assert_ne!(
1231 slot0_after, DUMMY_MARKER,
1232 "Slot 0 should have been overwritten with new CRC"
1233 );
1234
1235 let slot1_after: Vec<u8> = blob
1237 .read_at(vec![0u8; 6], slot1_offset)
1238 .await
1239 .unwrap()
1240 .into();
1241 assert_eq!(
1242 slot1_before, slot1_after,
1243 "Slot 1 was modified! Protected region violated."
1244 );
1245
1246 let page = blob
1248 .read_at(vec![0u8; physical_page_size], 0)
1249 .await
1250 .unwrap();
1251 let crc = read_crc_record_from_page(page.as_ref());
1252 assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1253 });
1254 }
1255
1256 #[test_traced("DEBUG")]
1262 fn test_crc_slot0_protected() {
1263 let executor = deterministic::Runner::default();
1264 executor.start(|context: deterministic::Context| async move {
1265 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1266 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1267 let slot0_offset = PAGE_SIZE.get() as u64;
1268 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1269
1270 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1272 let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1273 .await
1274 .unwrap();
1275 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1276 append.sync().await.unwrap();
1277 drop(append);
1278
1279 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1281 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1282 .await
1283 .unwrap();
1284 append
1285 .append(&(11..=30).collect::<Vec<u8>>())
1286 .await
1287 .unwrap();
1288 append.sync().await.unwrap();
1289 drop(append);
1290
1291 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1293 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1294 .await
1295 .unwrap();
1296 append
1297 .append(&(31..=50).collect::<Vec<u8>>())
1298 .await
1299 .unwrap();
1300 append.sync().await.unwrap();
1301 drop(append);
1302
1303 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1305 let page = blob
1306 .read_at(vec![0u8; physical_page_size], 0)
1307 .await
1308 .unwrap();
1309 let crc = read_crc_record_from_page(page.as_ref());
1310 assert!(
1311 crc.len1 > crc.len2,
1312 "Slot 0 should be authoritative (len1={} > len2={})",
1313 crc.len1,
1314 crc.len2
1315 );
1316
1317 let slot0_before: Vec<u8> = blob
1319 .read_at(vec![0u8; 6], slot0_offset)
1320 .await
1321 .unwrap()
1322 .into();
1323
1324 blob.write_at(DUMMY_MARKER.to_vec(), slot1_offset)
1326 .await
1327 .unwrap();
1328 blob.sync().await.unwrap();
1329
1330 let slot1_mangled: Vec<u8> = blob
1332 .read_at(vec![0u8; 6], slot1_offset)
1333 .await
1334 .unwrap()
1335 .into();
1336 assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1337
1338 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1340 .await
1341 .unwrap();
1342 append
1343 .append(&(51..=70).collect::<Vec<u8>>())
1344 .await
1345 .unwrap();
1346 append.sync().await.unwrap();
1347 drop(append);
1348
1349 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1351
1352 let slot1_after: Vec<u8> = blob
1354 .read_at(vec![0u8; 6], slot1_offset)
1355 .await
1356 .unwrap()
1357 .into();
1358 assert_ne!(
1359 slot1_after, DUMMY_MARKER,
1360 "Slot 1 should have been overwritten with new CRC"
1361 );
1362
1363 let slot0_after: Vec<u8> = blob
1365 .read_at(vec![0u8; 6], slot0_offset)
1366 .await
1367 .unwrap()
1368 .into();
1369 assert_eq!(
1370 slot0_before, slot0_after,
1371 "Slot 0 was modified! Protected region violated."
1372 );
1373
1374 let page = blob
1376 .read_at(vec![0u8; physical_page_size], 0)
1377 .await
1378 .unwrap();
1379 let crc = read_crc_record_from_page(page.as_ref());
1380 assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1381 });
1382 }
1383
1384 #[test_traced("DEBUG")]
1390 fn test_data_prefix_not_overwritten() {
1391 let executor = deterministic::Runner::default();
1392 executor.start(|context: deterministic::Context| async move {
1393 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1394 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1395
1396 let (blob, _) = context
1398 .open("test_partition", b"prefix_test")
1399 .await
1400 .unwrap();
1401 let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1402 .await
1403 .unwrap();
1404 let data1: Vec<u8> = (1..=20).collect();
1405 append.append(&data1).await.unwrap();
1406 append.sync().await.unwrap();
1407 drop(append);
1408
1409 let (blob, size) = context
1411 .open("test_partition", b"prefix_test")
1412 .await
1413 .unwrap();
1414 assert_eq!(size, physical_page_size as u64);
1415
1416 let prefix_before: Vec<u8> = blob.read_at(vec![0u8; 20], 0).await.unwrap().into();
1417
1418 blob.write_at(DUMMY_MARKER.to_vec(), 25).await.unwrap();
1420 blob.sync().await.unwrap();
1421
1422 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1424 .await
1425 .unwrap();
1426 append
1427 .append(&(21..=40).collect::<Vec<u8>>())
1428 .await
1429 .unwrap();
1430 append.sync().await.unwrap();
1431 drop(append);
1432
1433 let (blob, _) = context
1435 .open("test_partition", b"prefix_test")
1436 .await
1437 .unwrap();
1438
1439 let prefix_after: Vec<u8> = blob.read_at(vec![0u8; 20], 0).await.unwrap().into();
1441 assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1442
1443 let overwritten: Vec<u8> = blob.read_at(vec![0u8; 6], 25).await.unwrap().into();
1445 assert_eq!(
1446 overwritten,
1447 vec![26, 27, 28, 29, 30, 31],
1448 "New data should overwrite padding area"
1449 );
1450 });
1451 }
1452
1453 #[test_traced("DEBUG")]
1459 fn test_crc_slot_protection_across_page_boundary() {
1460 let executor = deterministic::Runner::default();
1461 executor.start(|context: deterministic::Context| async move {
1462 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1463 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1464 let slot0_offset = PAGE_SIZE.get() as u64;
1465 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1466
1467 let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1469 let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1470 .await
1471 .unwrap();
1472 append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1473 append.sync().await.unwrap();
1474 drop(append);
1475
1476 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1478 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1479 .await
1480 .unwrap();
1481 append
1482 .append(&(51..=80).collect::<Vec<u8>>())
1483 .await
1484 .unwrap();
1485 append.sync().await.unwrap();
1486 drop(append);
1487
1488 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1490 let page = blob
1491 .read_at(vec![0u8; physical_page_size], 0)
1492 .await
1493 .unwrap();
1494 let crc = read_crc_record_from_page(page.as_ref());
1495 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1496
1497 let slot1_before: Vec<u8> = blob
1499 .read_at(vec![0u8; 6], slot1_offset)
1500 .await
1501 .unwrap()
1502 .into();
1503
1504 blob.write_at(DUMMY_MARKER.to_vec(), slot0_offset)
1506 .await
1507 .unwrap();
1508 blob.sync().await.unwrap();
1509
1510 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1512 .await
1513 .unwrap();
1514 append
1515 .append(&(81..=120).collect::<Vec<u8>>())
1516 .await
1517 .unwrap();
1518 append.sync().await.unwrap();
1519 drop(append);
1520
1521 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1523 assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1524
1525 let slot0_after: Vec<u8> = blob
1527 .read_at(vec![0u8; 6], slot0_offset)
1528 .await
1529 .unwrap()
1530 .into();
1531 assert_ne!(
1532 slot0_after, DUMMY_MARKER,
1533 "Slot 0 should have full-page CRC"
1534 );
1535
1536 let slot1_after: Vec<u8> = blob
1538 .read_at(vec![0u8; 6], slot1_offset)
1539 .await
1540 .unwrap()
1541 .into();
1542 assert_eq!(
1543 slot1_before, slot1_after,
1544 "Slot 1 was modified during page boundary crossing!"
1545 );
1546
1547 let page0 = blob
1549 .read_at(vec![0u8; physical_page_size], 0)
1550 .await
1551 .unwrap();
1552 let crc0 = read_crc_record_from_page(page0.as_ref());
1553 assert_eq!(
1554 crc0.len1,
1555 PAGE_SIZE.get(),
1556 "Slot 0 should have full page length"
1557 );
1558
1559 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1561 .await
1562 .unwrap();
1563 assert_eq!(append.size().await, 120);
1564 let all_data: Vec<u8> = append.read_at(vec![0u8; 120], 0).await.unwrap().into();
1565 let expected: Vec<u8> = (1..=120).collect();
1566 assert_eq!(all_data, expected);
1567 });
1568 }
1569
1570 #[test_traced("DEBUG")]
1579 fn test_crc_fallback_on_corrupted_primary() {
1580 let executor = deterministic::Runner::default();
1581 executor.start(|context: deterministic::Context| async move {
1582 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1583 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1584 let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1586
1587 let (blob, _) = context
1589 .open("test_partition", b"crc_fallback")
1590 .await
1591 .unwrap();
1592 let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1593 .await
1594 .unwrap();
1595 let data1: Vec<u8> = (1..=10).collect();
1596 append.append(&data1).await.unwrap();
1597 append.sync().await.unwrap();
1598 drop(append);
1599
1600 let (blob, size) = context
1602 .open("test_partition", b"crc_fallback")
1603 .await
1604 .unwrap();
1605 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1606 .await
1607 .unwrap();
1608 append
1609 .append(&(11..=30).collect::<Vec<u8>>())
1610 .await
1611 .unwrap();
1612 append.sync().await.unwrap();
1613 drop(append);
1614
1615 let (blob, size) = context
1617 .open("test_partition", b"crc_fallback")
1618 .await
1619 .unwrap();
1620 assert_eq!(size, physical_page_size as u64);
1621
1622 let page = blob
1623 .read_at(vec![0u8; physical_page_size], 0)
1624 .await
1625 .unwrap();
1626 let crc = read_crc_record_from_page(page.as_ref());
1627 assert!(
1628 crc.len2 > crc.len1,
1629 "Slot 1 should be authoritative (len2={} > len1={})",
1630 crc.len2,
1631 crc.len1
1632 );
1633 assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1634 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1635
1636 let append = Append::new(blob.clone(), size, BUFFER_SIZE, pool_ref.clone())
1638 .await
1639 .unwrap();
1640 assert_eq!(append.size().await, 30);
1641 let all_data: Vec<u8> = append.read_at(vec![0u8; 30], 0).await.unwrap().into();
1642 let expected: Vec<u8> = (1..=30).collect();
1643 assert_eq!(all_data, expected);
1644 drop(append);
1645
1646 blob.write_at(vec![0xDE, 0xAD, 0xBE, 0xEF], crc2_offset)
1649 .await
1650 .unwrap();
1651 blob.sync().await.unwrap();
1652
1653 let page = blob
1655 .read_at(vec![0u8; physical_page_size], 0)
1656 .await
1657 .unwrap();
1658 let crc = read_crc_record_from_page(page.as_ref());
1659 assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1660 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1661
1662 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1664 .await
1665 .unwrap();
1666
1667 assert_eq!(
1669 append.size().await,
1670 10,
1671 "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1672 );
1673
1674 let fallback_data: Vec<u8> = append.read_at(vec![0u8; 10], 0).await.unwrap().into();
1676 assert_eq!(
1677 fallback_data, data1,
1678 "Fallback data should match original 10 bytes"
1679 );
1680
1681 let result = append.read_at(vec![0u8; 11], 0).await;
1683 assert!(result.is_err(), "Reading beyond fallback size should fail");
1684 });
1685 }
1686
1687 #[test_traced("DEBUG")]
1699 fn test_non_last_page_rejects_partial_fallback() {
1700 let executor = deterministic::Runner::default();
1701 executor.start(|context: deterministic::Context| async move {
1702 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1703 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1704 let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1706
1707 let (blob, _) = context
1709 .open("test_partition", b"non_last_page")
1710 .await
1711 .unwrap();
1712 let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1713 .await
1714 .unwrap();
1715 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1716 append.sync().await.unwrap();
1717 drop(append);
1718
1719 let (blob, size) = context
1721 .open("test_partition", b"non_last_page")
1722 .await
1723 .unwrap();
1724 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1725 .await
1726 .unwrap();
1727 append
1729 .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1730 .await
1731 .unwrap();
1732 append.sync().await.unwrap();
1733 drop(append);
1734
1735 let (blob, size) = context
1737 .open("test_partition", b"non_last_page")
1738 .await
1739 .unwrap();
1740 let page = blob
1741 .read_at(vec![0u8; physical_page_size], 0)
1742 .await
1743 .unwrap();
1744 let crc = read_crc_record_from_page(page.as_ref());
1745 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1746 assert_eq!(
1747 crc.len2,
1748 PAGE_SIZE.get(),
1749 "Slot 1 should have len=103 (full page)"
1750 );
1751 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1752
1753 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1755 .await
1756 .unwrap();
1757 append
1759 .append(&(104..=113).collect::<Vec<u8>>())
1760 .await
1761 .unwrap();
1762 append.sync().await.unwrap();
1763 drop(append);
1764
1765 let (blob, size) = context
1767 .open("test_partition", b"non_last_page")
1768 .await
1769 .unwrap();
1770 assert_eq!(
1771 size,
1772 (physical_page_size * 2) as u64,
1773 "Should have 2 physical pages"
1774 );
1775
1776 let append = Append::new(blob.clone(), size, BUFFER_SIZE, pool_ref.clone())
1778 .await
1779 .unwrap();
1780 assert_eq!(append.size().await, 113);
1781 let all_data: Vec<u8> = append.read_at(vec![0u8; 113], 0).await.unwrap().into();
1782 let expected: Vec<u8> = (1..=113).collect();
1783 assert_eq!(all_data, expected);
1784 drop(append);
1785
1786 blob.write_at(vec![0xDE, 0xAD, 0xBE, 0xEF], page0_crc2_offset)
1788 .await
1789 .unwrap();
1790 blob.sync().await.unwrap();
1791
1792 let page = blob
1794 .read_at(vec![0u8; physical_page_size], 0)
1795 .await
1796 .unwrap();
1797 let crc = read_crc_record_from_page(page.as_ref());
1798 assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1799 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1800 assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1802
1803 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1809 .await
1810 .unwrap();
1811
1812 assert_eq!(append.size().await, 113);
1815
1816 let result = append.read_at(vec![0u8; 10], 0).await;
1819 assert!(
1820 result.is_err(),
1821 "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1822 result
1823 );
1824 drop(append);
1825
1826 let (blob, size) = context
1828 .open("test_partition", b"non_last_page")
1829 .await
1830 .unwrap();
1831 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1832 .await
1833 .unwrap();
1834 let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1835
1836 let result = replay.ensure(1).await;
1838 assert!(
1839 result.is_err(),
1840 "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1841 result
1842 );
1843 });
1844 }
1845
1846 #[test]
1847 fn test_resize_shrink_validates_crc() {
1848 let executor = deterministic::Runner::default();
1851
1852 executor.start(|context| async move {
1853 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1854 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1855
1856 let (blob, size) = context
1857 .open("test_partition", b"resize_crc_test")
1858 .await
1859 .unwrap();
1860
1861 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1862 .await
1863 .unwrap();
1864
1865 let data: Vec<u8> = (0..=249).collect();
1868 append.append(&data).await.unwrap();
1869 append.sync().await.unwrap();
1870 assert_eq!(append.size().await, 250);
1871 drop(append);
1872
1873 let (blob, size) = context
1875 .open("test_partition", b"resize_crc_test")
1876 .await
1877 .unwrap();
1878 assert_eq!(size as usize, physical_page_size * 3);
1879
1880 let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
1882 blob.write_at(vec![0xFF; CHECKSUM_SIZE as usize], page1_crc_offset)
1883 .await
1884 .unwrap();
1885 blob.sync().await.unwrap();
1886
1887 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1890 .await
1891 .unwrap();
1892 assert_eq!(append.size().await, 250);
1893
1894 let result = append.resize(150).await;
1898 assert!(
1899 matches!(result, Err(crate::Error::InvalidChecksum)),
1900 "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
1901 result
1902 );
1903 });
1904 }
1905
1906 #[test]
1907 fn test_immutable_blob_rejects_append_and_resize() {
1908 let executor = deterministic::Runner::default();
1909
1910 executor.start(|context| async move {
1911 const PAGE_SIZE: NonZeroU16 = NZU16!(64);
1912 const BUFFER_SIZE: usize = 256;
1913
1914 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(4));
1915
1916 let (blob, size) = context
1917 .open("test_partition", b"immutable_test")
1918 .await
1919 .unwrap();
1920
1921 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1922 .await
1923 .unwrap();
1924
1925 append.append(&[1, 2, 3, 4, 5]).await.unwrap();
1927 append.sync().await.unwrap();
1928 assert_eq!(append.size().await, 5);
1929
1930 append.to_immutable().await.unwrap();
1932 assert!(append.is_immutable().await);
1933
1934 let result = append.append(&[6, 7, 8]).await;
1936 assert!(
1937 matches!(result, Err(crate::Error::ImmutableBlob)),
1938 "Expected ImmutableBlob error from append(), got: {:?}",
1939 result
1940 );
1941
1942 let result = append.resize(100).await;
1944 assert!(
1945 matches!(result, Err(crate::Error::ImmutableBlob)),
1946 "Expected ImmutableBlob error from resize(), got: {:?}",
1947 result
1948 );
1949
1950 let result = append.sync().await;
1952 assert!(
1953 result.is_ok(),
1954 "sync() on immutable blob should return Ok, got: {:?}",
1955 result
1956 );
1957
1958 let data: Vec<u8> = append.read_at(vec![0u8; 5], 0).await.unwrap().into();
1960 assert_eq!(data, vec![1, 2, 3, 4, 5]);
1961 });
1962 }
1963
1964 #[test]
1965 fn test_corrupted_crc_len_too_large() {
1966 let executor = deterministic::Runner::default();
1967
1968 executor.start(|context| async move {
1969 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1970 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1971
1972 let (blob, size) = context
1974 .open("test_partition", b"crc_len_test")
1975 .await
1976 .unwrap();
1977
1978 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1979 .await
1980 .unwrap();
1981
1982 append.append(&[0x42; 50]).await.unwrap();
1983 append.sync().await.unwrap();
1984 drop(append);
1985
1986 let (blob, size) = context
1988 .open("test_partition", b"crc_len_test")
1989 .await
1990 .unwrap();
1991 assert_eq!(size as usize, physical_page_size);
1992
1993 let crc_offset = PAGE_SIZE.get() as u64;
1995
1996 let bad_crc_record: [u8; 12] = [
1999 0xFF, 0xFF, 0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
2004 blob.write_at(bad_crc_record.to_vec(), crc_offset)
2005 .await
2006 .unwrap();
2007 blob.sync().await.unwrap();
2008
2009 let result = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone()).await;
2011
2012 match result {
2015 Ok(append) => {
2016 let recovered_size = append.size().await;
2018 assert_eq!(
2019 recovered_size, 0,
2020 "Corrupted page should be truncated, size should be 0"
2021 );
2022 }
2023 Err(e) => {
2024 assert!(
2026 matches!(e, crate::Error::InvalidChecksum),
2027 "Expected InvalidChecksum error, got: {:?}",
2028 e
2029 );
2030 }
2031 }
2032 });
2033 }
2034
2035 #[test]
2036 fn test_corrupted_crc_both_slots_len_too_large() {
2037 let executor = deterministic::Runner::default();
2038
2039 executor.start(|context| async move {
2040 let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2041
2042 let (blob, size) = context
2044 .open("test_partition", b"crc_both_bad")
2045 .await
2046 .unwrap();
2047
2048 let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
2049 .await
2050 .unwrap();
2051
2052 append.append(&[0x42; 50]).await.unwrap();
2053 append.sync().await.unwrap();
2054 drop(append);
2055
2056 let (blob, size) = context
2058 .open("test_partition", b"crc_both_bad")
2059 .await
2060 .unwrap();
2061
2062 let crc_offset = PAGE_SIZE.get() as u64;
2063
2064 let bad_crc_record: [u8; 12] = [
2066 0x01, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0x02, 0x00, 0xCA, 0xFE, 0xBA, 0xBE, ];
2071 blob.write_at(bad_crc_record.to_vec(), crc_offset)
2072 .await
2073 .unwrap();
2074 blob.sync().await.unwrap();
2075
2076 let result = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone()).await;
2078
2079 match result {
2080 Ok(append) => {
2081 assert_eq!(append.size().await, 0);
2083 }
2084 Err(e) => {
2085 assert!(
2086 matches!(e, crate::Error::InvalidChecksum),
2087 "Expected InvalidChecksum, got: {:?}",
2088 e
2089 );
2090 }
2091 }
2092 });
2093 }
2094}