1use super::read::{PageReader, Replay};
17use crate::{
18 buffer::{
19 paged::{CacheRef, Checksum, CHECKSUM_SIZE},
20 tip::Buffer,
21 },
22 Blob, Error, IoBuf, IoBufMut, IoBufs,
23};
24use bytes::BufMut;
25use commonware_cryptography::Crc32;
26use commonware_utils::sync::{AsyncRwLock, AsyncRwLockWriteGuard};
27use std::{
28 num::{NonZeroU16, NonZeroUsize},
29 sync::Arc,
30};
31use tracing::warn;
32
33#[derive(Clone, Copy)]
35enum ProtectedCrc {
36 First,
37 Second,
38}
39
40#[derive(Clone)]
42struct BlobState<B: Blob> {
43 blob: B,
44
45 current_page: u64,
47
48 partial_page_state: Option<Checksum>,
51}
52
53#[derive(Clone)]
56pub struct Append<B: Blob> {
57 blob_state: Arc<AsyncRwLock<BlobState<B>>>,
59
60 id: u64,
62
63 cache_ref: CacheRef,
65
66 buffer: Arc<AsyncRwLock<Buffer>>,
69}
70
71fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
74 let floor = page_size as usize * 2;
75 if capacity < floor {
76 warn!(
77 floor,
78 "requested buffer capacity is too low, increasing it to floor"
79 );
80 floor
81 } else {
82 capacity
83 }
84}
85
86impl<B: Blob> Append<B> {
87 pub async fn new(
92 blob: B,
93 original_blob_size: u64,
94 capacity: usize,
95 cache_ref: CacheRef,
96 ) -> Result<Self, Error> {
97 let (partial_page_state, pages, invalid_data_found) =
98 Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
99 if invalid_data_found {
100 let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
102 warn!(
103 original_blob_size,
104 new_blob_size, "truncating blob to remove invalid data"
105 );
106 blob.resize(new_blob_size).await?;
107 blob.sync().await?;
108 }
109
110 let capacity = capacity_with_floor(capacity, cache_ref.page_size());
111
112 let (blob_state, partial_data) = match partial_page_state {
113 Some((partial_page, crc_record)) => (
114 BlobState {
115 blob,
116 current_page: pages - 1,
117 partial_page_state: Some(crc_record),
118 },
119 Some(partial_page),
120 ),
121 None => (
122 BlobState {
123 blob,
124 current_page: pages,
125 partial_page_state: None,
126 },
127 None,
128 ),
129 };
130
131 let buffer = Buffer::from(
132 blob_state.current_page * cache_ref.page_size(),
133 partial_data.unwrap_or_default(),
134 capacity,
135 cache_ref.pool().clone(),
136 );
137
138 Ok(Self {
139 blob_state: Arc::new(AsyncRwLock::new(blob_state)),
140 id: cache_ref.next_id(),
141 cache_ref,
142 buffer: Arc::new(AsyncRwLock::new(buffer)),
143 })
144 }
145
146 async fn read_last_valid_page(
164 blob: &B,
165 blob_size: u64,
166 page_size: u64,
167 ) -> Result<(Option<(IoBuf, Checksum)>, u64, bool), Error> {
168 let physical_page_size = page_size + CHECKSUM_SIZE;
169 let partial_bytes = blob_size % physical_page_size;
170 let mut last_page_end = blob_size - partial_bytes;
171
172 let mut invalid_data_found = partial_bytes != 0;
175
176 while last_page_end != 0 {
177 let page_start = last_page_end - physical_page_size;
179 let buf = blob
180 .read_at(page_start, physical_page_size as usize)
181 .await?
182 .coalesce()
183 .freeze();
184
185 match Checksum::validate_page(buf.as_ref()) {
186 Some(crc_record) => {
187 let (len, _) = crc_record.get_crc();
189 let len = len as u64;
190 if len != page_size {
191 let logical_bytes = buf.slice(..len as usize);
193 return Ok((
194 Some((logical_bytes, crc_record)),
195 last_page_end / physical_page_size,
196 invalid_data_found,
197 ));
198 }
199 return Ok((None, last_page_end / physical_page_size, invalid_data_found));
201 }
202 None => {
203 last_page_end = page_start;
205 invalid_data_found = true;
206 }
207 }
208 }
209
210 Ok((None, 0, invalid_data_found))
212 }
213
214 pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
216 let mut buffer = self.buffer.write().await;
217
218 if !buffer.append(buf) {
219 return Ok(());
220 }
221
222 self.flush_internal(buffer, false).await
224 }
225
226 async fn flush_internal(
237 &self,
238 mut buf_guard: AsyncRwLockWriteGuard<'_, Buffer>,
239 write_partial_page: bool,
240 ) -> Result<(), Error> {
241 let buffer = &mut *buf_guard;
242
243 let old_partial_page_state = {
247 let blob_state = self.blob_state.read().await;
248 blob_state.partial_page_state.clone()
249 };
250
251 let (mut physical_pages, partial_page_state) = self.to_physical_pages(
254 &*buffer,
255 write_partial_page,
256 old_partial_page_state.as_ref(),
257 );
258
259 if physical_pages.is_empty() {
261 return Ok(());
262 }
263
264 let logical_page_size = self.cache_ref.page_size() as usize;
267 let pages_to_cache = buffer.len() / logical_page_size;
268 let bytes_to_drain = pages_to_cache * logical_page_size;
269
270 let cache_pages = if pages_to_cache > 0 {
272 Some((buffer.offset, buffer.slice(..bytes_to_drain)))
273 } else {
274 None
275 };
276
277 if bytes_to_drain == buffer.len() && bytes_to_drain != 0 {
280 let _ = buffer
281 .take()
282 .expect("take must succeed when flush drains all buffered bytes");
283 } else if bytes_to_drain != 0 {
284 buffer.drop_prefix(bytes_to_drain);
285 buffer.offset += bytes_to_drain as u64;
286 }
287 let new_offset = buffer.offset;
288
289 if let Some((cache_offset, pages)) = cache_pages {
292 let remaining = self.cache_ref.cache(self.id, pages.as_ref(), cache_offset);
293 assert_eq!(remaining, 0, "cached full-page prefix must be page-aligned");
294 }
295
296 let mut blob_state = self.blob_state.write().await;
299
300 drop(buf_guard);
303
304 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
305 let write_at_offset = blob_state.current_page * physical_page_size as u64;
306
307 let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
309
310 blob_state.current_page += pages_to_cache as u64;
314 blob_state.partial_page_state = partial_page_state;
315
316 assert_eq!(
318 blob_state.current_page * self.cache_ref.page_size(),
319 new_offset
320 );
321
322 if let Some((prefix_len, protected_crc)) = protected_regions {
325 match protected_crc {
326 ProtectedCrc::First => {
327 if prefix_len < logical_page_size {
330 let _ = physical_pages.split_to(prefix_len);
331 let first_payload = physical_pages.split_to(logical_page_size - prefix_len);
332 blob_state
333 .blob
334 .write_at(write_at_offset + prefix_len as u64, first_payload)
335 .await?;
336 } else {
337 let _ = physical_pages.split_to(logical_page_size);
339 }
340
341 if physical_pages.len() > 6 {
343 let _ = physical_pages.split_to(6);
344 blob_state
345 .blob
346 .write_at(
347 write_at_offset + (logical_page_size + 6) as u64,
348 physical_pages,
349 )
350 .await?;
351 }
352 }
353 ProtectedCrc::Second => {
354 let first_crc_end = logical_page_size + 6;
357 if prefix_len < first_crc_end {
358 let _ = physical_pages.split_to(prefix_len);
359 let first_payload = physical_pages.split_to(first_crc_end - prefix_len);
360 blob_state
361 .blob
362 .write_at(write_at_offset + prefix_len as u64, first_payload)
363 .await?;
364 } else {
365 let _ = physical_pages.split_to(first_crc_end);
367 }
368
369 let skip = physical_page_size - first_crc_end;
371 if physical_pages.len() > skip {
372 let _ = physical_pages.split_to(skip);
373 blob_state
374 .blob
375 .write_at(write_at_offset + physical_page_size as u64, physical_pages)
376 .await?;
377 }
378 }
379 }
380 } else {
381 blob_state
383 .blob
384 .write_at(write_at_offset, physical_pages)
385 .await?;
386 }
387
388 Ok(())
389 }
390
391 pub async fn size(&self) -> u64 {
393 let buffer = self.buffer.read().await;
394 buffer.size()
395 }
396
397 pub fn try_size(&self) -> Option<u64> {
402 let buffer = self.buffer.try_read().ok()?;
403 Some(buffer.size())
404 }
405
406 pub fn try_read_sync(&self, offset: u64, buf: &mut [u8]) -> bool {
411 self.cache_ref.read_cached(self.id, buf, offset) == buf.len()
412 }
413
414 pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
416 let mut buf = unsafe { self.cache_ref.pool().alloc_len(len) };
419 self.read_into(buf.as_mut(), offset).await?;
420 Ok(buf.into())
421 }
422
423 pub async fn read_up_to(
432 &self,
433 logical_offset: u64,
434 len: usize,
435 bufs: impl Into<IoBufMut> + Send,
436 ) -> Result<(IoBufMut, usize), Error> {
437 let mut bufs = bufs.into();
438 if len == 0 {
439 bufs.truncate(0);
440 return Ok((bufs, 0));
441 }
442 let blob_size = self.size().await;
443 let available = (blob_size.saturating_sub(logical_offset) as usize).min(len);
444 if available == 0 {
445 return Err(Error::BlobInsufficientLength);
446 }
447 unsafe { bufs.set_len(available) };
449 self.read_into(bufs.as_mut(), logical_offset).await?;
450
451 Ok((bufs, available))
452 }
453
454 pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
459 let end_offset = logical_offset
461 .checked_add(buf.len() as u64)
462 .ok_or(Error::OffsetOverflow)?;
463
464 let buffer = self.buffer.read().await;
466
467 if end_offset > buffer.size() {
469 return Err(Error::BlobInsufficientLength);
470 }
471
472 let remaining = if end_offset <= buffer.offset {
474 buf.len()
476 } else {
477 let overlap_start = buffer.offset.max(logical_offset);
479 let dst_start = (overlap_start - logical_offset) as usize;
480 let src_start = (overlap_start - buffer.offset) as usize;
481 let copied = buf.len() - dst_start;
482 buf[dst_start..].copy_from_slice(&buffer.as_ref()[src_start..src_start + copied]);
483 dst_start
484 };
485
486 drop(buffer);
488
489 if remaining == 0 {
490 return Ok(());
491 }
492
493 let cached = self
496 .cache_ref
497 .read_cached(self.id, &mut buf[..remaining], logical_offset);
498
499 if cached == remaining {
500 return Ok(());
502 }
503
504 let blob_guard = self.blob_state.read().await;
507
508 let uncached_offset = logical_offset + cached as u64;
510 let uncached_len = remaining - cached;
511 self.cache_ref
512 .read(
513 &blob_guard.blob,
514 self.id,
515 &mut buf[cached..cached + uncached_len],
516 uncached_offset,
517 )
518 .await
519 }
520
521 fn identify_protected_regions(
532 partial_page_state: Option<&Checksum>,
533 ) -> Option<(usize, ProtectedCrc)> {
534 let crc_record = partial_page_state?;
535 let (old_len, _) = crc_record.get_crc();
536 let protected_crc = if crc_record.len1 >= crc_record.len2 {
538 ProtectedCrc::First
539 } else {
540 ProtectedCrc::Second
541 };
542 Some((old_len as usize, protected_crc))
543 }
544
545 fn to_physical_pages(
558 &self,
559 buffer: &Buffer,
560 include_partial_page: bool,
561 old_crc_record: Option<&Checksum>,
562 ) -> (IoBufs, Option<Checksum>) {
563 let logical_page_size = self.cache_ref.page_size() as usize;
564 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
565 let pages_to_write = buffer.len() / logical_page_size;
566 let mut write_buffer = IoBufs::default();
567 let buffer_data = buffer.as_ref();
568
569 if pages_to_write > 0 {
570 let logical_page_size_u16 =
571 u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
572
573 let mut crcs = self
576 .cache_ref
577 .pool()
578 .alloc(CHECKSUM_SIZE as usize * pages_to_write);
579 for page in 0..pages_to_write {
580 let start_read_idx = page * logical_page_size;
581 let end_read_idx = start_read_idx + logical_page_size;
582 let logical_page = &buffer_data[start_read_idx..end_read_idx];
583 let crc = Crc32::checksum(logical_page);
584
585 let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
588 Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
589 } else {
590 Checksum::new(logical_page_size_u16, crc)
591 };
592 crcs.put_slice(&crc_record.to_bytes());
593 }
594 let crc_blob = crcs.freeze();
595
596 for page in 0..pages_to_write {
598 let start_read_idx = page * logical_page_size;
599 let end_read_idx = start_read_idx + logical_page_size;
600 write_buffer.append(buffer.slice(start_read_idx..end_read_idx));
601
602 let crc_start = page * CHECKSUM_SIZE as usize;
603 write_buffer.append(crc_blob.slice(crc_start..crc_start + CHECKSUM_SIZE as usize));
604 }
605 }
606
607 if !include_partial_page {
608 return (write_buffer, None);
609 }
610
611 let partial_page = &buffer_data[pages_to_write * logical_page_size..];
612 if partial_page.is_empty() {
613 return (write_buffer, None);
615 }
616
617 if pages_to_write == 0 {
620 if let Some(old_crc) = old_crc_record {
621 let (old_len, _) = old_crc.get_crc();
622 if partial_page.len() == old_len as usize {
623 return (write_buffer, None);
624 }
625 }
626 }
627 let partial_len = partial_page.len();
628 let crc = Crc32::checksum(partial_page);
629
630 let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
633 Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
634 } else {
635 Checksum::new(partial_len as u16, crc)
636 };
637
638 let mut padded = self.cache_ref.pool().alloc(physical_page_size);
641 padded.put_slice(partial_page);
642 let zero_count = logical_page_size - partial_len;
643 if zero_count > 0 {
644 padded.put_bytes(0, zero_count);
645 }
646 padded.put_slice(&crc_record.to_bytes());
647 write_buffer.append(padded.freeze());
648
649 (write_buffer, Some(crc_record))
652 }
653
654 const fn build_crc_record_preserving_old(
657 new_len: u16,
658 new_crc: u32,
659 old_crc: &Checksum,
660 ) -> Checksum {
661 let (old_len, old_crc_val) = old_crc.get_crc();
662 if old_crc.len1 >= old_crc.len2 {
664 Checksum {
666 len1: old_len,
667 crc1: old_crc_val,
668 len2: new_len,
669 crc2: new_crc,
670 }
671 } else {
672 Checksum {
674 len1: new_len,
675 crc1: new_crc,
676 len2: old_len,
677 crc2: old_crc_val,
678 }
679 }
680 }
681
682 pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
687 let logical_page_size = self.cache_ref.page_size();
688 let logical_page_size_nz =
689 NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
690
691 {
693 let buf_guard = self.buffer.write().await;
694 self.flush_internal(buf_guard, true).await?;
695 }
696
697 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
699 let prefetch_pages = buffer_size.get() / physical_page_size as usize;
700 let prefetch_pages = prefetch_pages.max(1); let blob_guard = self.blob_state.read().await;
702
703 let (physical_blob_size, logical_blob_size) =
705 blob_guard.partial_page_state.as_ref().map_or_else(
706 || {
707 let physical = physical_page_size * blob_guard.current_page;
709 let logical = logical_page_size * blob_guard.current_page;
710 (physical, logical)
711 },
712 |crc_record| {
713 let (partial_len, _) = crc_record.get_crc();
715 let partial_len = partial_len as u64;
716 let physical = physical_page_size * (blob_guard.current_page + 1);
718 let logical = logical_page_size * blob_guard.current_page + partial_len;
720 (physical, logical)
721 },
722 );
723
724 let reader = PageReader::new(
725 blob_guard.blob.clone(),
726 physical_blob_size,
727 logical_blob_size,
728 prefetch_pages,
729 logical_page_size_nz,
730 );
731 Ok(Replay::new(reader))
732 }
733}
734
735impl<B: Blob> Append<B> {
736 pub async fn sync(&self) -> Result<(), Error> {
737 let buf_guard = self.buffer.write().await;
740 self.flush_internal(buf_guard, true).await?;
741
742 let blob_state = self.blob_state.read().await;
745 blob_state.blob.sync().await
746 }
747
748 pub async fn resize(&self, size: u64) -> Result<(), Error> {
759 let current_size = self.size().await;
760
761 if size > current_size {
763 let zeros_needed = (size - current_size) as usize;
764 let mut zeros = self.cache_ref.pool().alloc(zeros_needed);
765 zeros.put_bytes(0, zeros_needed);
766 self.append(zeros.as_ref()).await?;
767 return Ok(());
768 }
769
770 let logical_page_size = self.cache_ref.page_size();
777 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
778
779 self.sync().await?;
781
782 let mut buf_guard = self.buffer.write().await;
784 let mut blob_guard = self.blob_state.write().await;
785
786 let full_pages = size / logical_page_size;
788 let partial_bytes = size % logical_page_size;
789 let new_physical_size = if partial_bytes > 0 {
790 (full_pages + 1) * physical_page_size
793 } else {
794 full_pages * physical_page_size
796 };
797
798 blob_guard.blob.resize(new_physical_size).await?;
800 blob_guard.partial_page_state = None;
801
802 blob_guard.current_page = full_pages;
812 buf_guard.offset = full_pages * logical_page_size;
813
814 if partial_bytes > 0 {
815 let page_data =
817 super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
818
819 if (page_data.len() as u64) < partial_bytes {
821 return Err(Error::InvalidChecksum);
822 }
823
824 buf_guard.clear();
825 let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]);
826 assert!(!over_capacity);
827 } else {
828 buf_guard.clear();
830 }
831
832 Ok(())
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 use super::*;
839 use crate::{deterministic, BufferPool, BufferPoolConfig, Runner as _, Storage as _};
840 use commonware_codec::ReadExt;
841 use commonware_macros::test_traced;
842 use commonware_utils::{NZUsize, NZU16};
843 use prometheus_client::registry::Registry;
844 use std::num::NonZeroU16;
845
846 const PAGE_SIZE: NonZeroU16 = NZU16!(103); const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
848
849 #[test_traced("DEBUG")]
850 fn test_append_crc_empty() {
851 let executor = deterministic::Runner::default();
852 executor.start(|context: deterministic::Context| async move {
853 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
855 assert_eq!(blob_size, 0);
856
857 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
859
860 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
862 .await
863 .unwrap();
864
865 assert_eq!(append.size().await, 0);
867
868 append.sync().await.unwrap();
870 drop(append);
871
872 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
873 assert_eq!(blob_size, 0); let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
876 .await
877 .unwrap();
878
879 assert_eq!(append.size().await, 0);
880 });
881 }
882
883 #[test_traced("DEBUG")]
884 fn test_append_crc_basic() {
885 let executor = deterministic::Runner::default();
886 executor.start(|context: deterministic::Context| async move {
887 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
889 assert_eq!(blob_size, 0);
890
891 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
893
894 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
896 .await
897 .unwrap();
898
899 assert_eq!(append.size().await, 0);
901
902 let data = vec![1, 2, 3, 4, 5];
904 append.append(&data).await.unwrap();
905
906 assert_eq!(append.size().await, 5);
908
909 let more_data = vec![6, 7, 8, 9, 10];
911 append.append(&more_data).await.unwrap();
912
913 assert_eq!(append.size().await, 10);
915
916 let read_buf = append.read_at(0, 5).await.unwrap().coalesce();
918 assert_eq!(read_buf, &data[..]);
919
920 let read_buf = append.read_at(5, 5).await.unwrap().coalesce();
922 assert_eq!(read_buf, &more_data[..]);
923
924 let read_buf = append.read_at(0, 10).await.unwrap().coalesce();
926 assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
927
928 append.sync().await.unwrap();
931 drop(append);
932
933 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
934 assert_eq!(blob_size, 115);
936 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
937 .await
938 .unwrap();
939 assert_eq!(append.size().await, 10); let spanning_data: Vec<u8> = (11..=110).collect();
945 append.append(&spanning_data).await.unwrap();
946 assert_eq!(append.size().await, 110);
947
948 let read_buf = append.read_at(10, 100).await.unwrap().coalesce();
950 assert_eq!(read_buf, &spanning_data[..]);
951
952 let read_buf = append.read_at(0, 110).await.unwrap().coalesce();
954 let expected: Vec<u8> = (1..=110).collect();
955 assert_eq!(read_buf, &expected[..]);
956
957 append.sync().await.unwrap();
959 drop(append);
960
961 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
962 assert_eq!(blob_size, 230);
964 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
965 .await
966 .unwrap();
967 assert_eq!(append.size().await, 110);
968
969 let boundary_data: Vec<u8> = (111..=206).collect();
973 assert_eq!(boundary_data.len(), 96);
974 append.append(&boundary_data).await.unwrap();
975 assert_eq!(append.size().await, 206);
976
977 let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
979 let expected: Vec<u8> = (1..=206).collect();
980 assert_eq!(read_buf, &expected[..]);
981
982 append.sync().await.unwrap();
984 drop(append);
985
986 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
987 assert_eq!(blob_size, 230);
989 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
990 .await
991 .unwrap();
992 assert_eq!(append.size().await, 206);
993
994 let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
996 assert_eq!(read_buf, &expected[..]);
997 });
998 }
999
1000 #[test_traced("DEBUG")]
1001 fn test_sync_releases_tip_pool_slot_after_full_drain() {
1002 let executor = deterministic::Runner::default();
1003 executor.start(|context: deterministic::Context| async move {
1004 let mut registry = Registry::default();
1005 let pool = BufferPool::new(
1006 BufferPoolConfig::for_storage()
1007 .with_pool_min_size(PAGE_SIZE.get() as usize)
1008 .with_max_per_class(NZUsize!(2)),
1009 &mut registry,
1010 );
1011 let cache_ref = CacheRef::new(pool.clone(), PAGE_SIZE, NZUsize!(1));
1012
1013 let (blob, blob_size) = context
1014 .open("test_partition", b"release_tip_backing")
1015 .await
1016 .unwrap();
1017 assert_eq!(blob_size, 0);
1018
1019 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1020 .await
1021 .unwrap();
1022
1023 append
1024 .append(&vec![7; PAGE_SIZE.get() as usize])
1025 .await
1026 .unwrap();
1027
1028 assert!(
1030 matches!(
1031 pool.try_alloc(BUFFER_SIZE),
1032 Err(crate::iobuf::PoolError::Exhausted)
1033 ),
1034 "full-page tip should occupy the remaining pooled slot before sync"
1035 );
1036
1037 append.sync().await.unwrap();
1038
1039 assert!(
1041 pool.try_alloc(BUFFER_SIZE).is_ok(),
1042 "sync should release pooled backing when no partial tail remains"
1043 );
1044 });
1045 }
1046
1047 #[test_traced("DEBUG")]
1048 fn test_read_up_to_zero_len_truncates_buffer() {
1049 let executor = deterministic::Runner::default();
1050 executor.start(|context: deterministic::Context| async move {
1051 let (blob, blob_size) = context
1053 .open("test_partition", b"read_up_to_zero_len")
1054 .await
1055 .unwrap();
1056 assert_eq!(blob_size, 0);
1057
1058 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1060
1061 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1063 .await
1064 .unwrap();
1065 append.append(&[1, 2, 3, 4]).await.unwrap();
1066
1067 let stale = vec![9, 8, 7, 6];
1069 let (buf, read) = append.read_up_to(0, 0, stale).await.unwrap();
1070
1071 assert_eq!(read, 0);
1072 assert_eq!(buf.len(), 0, "read_up_to must truncate returned buffer");
1073 assert_eq!(buf.freeze().as_ref(), b"");
1074 });
1075 }
1076
1077 fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1079 let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1080 Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1081 }
1082
1083 const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1086
1087 #[test]
1088 fn test_identify_protected_regions_equal_lengths() {
1089 let record = Checksum {
1091 len1: 50,
1092 crc1: 0xAAAAAAAA,
1093 len2: 50,
1094 crc2: 0xBBBBBBBB,
1095 };
1096
1097 let result =
1098 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1099 assert!(result.is_some());
1100 let (prefix_len, protected_crc) = result.unwrap();
1101 assert_eq!(prefix_len, 50);
1102 assert!(
1103 matches!(protected_crc, ProtectedCrc::First),
1104 "First CRC should be protected when lengths are equal"
1105 );
1106 }
1107
1108 #[test]
1109 fn test_identify_protected_regions_len1_larger() {
1110 let record = Checksum {
1112 len1: 100,
1113 crc1: 0xAAAAAAAA,
1114 len2: 50,
1115 crc2: 0xBBBBBBBB,
1116 };
1117
1118 let result =
1119 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1120 assert!(result.is_some());
1121 let (prefix_len, protected_crc) = result.unwrap();
1122 assert_eq!(prefix_len, 100);
1123 assert!(
1124 matches!(protected_crc, ProtectedCrc::First),
1125 "First CRC should be protected when len1 > len2"
1126 );
1127 }
1128
1129 #[test]
1130 fn test_identify_protected_regions_len2_larger() {
1131 let record = Checksum {
1133 len1: 50,
1134 crc1: 0xAAAAAAAA,
1135 len2: 100,
1136 crc2: 0xBBBBBBBB,
1137 };
1138
1139 let result =
1140 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1141 assert!(result.is_some());
1142 let (prefix_len, protected_crc) = result.unwrap();
1143 assert_eq!(prefix_len, 100);
1144 assert!(
1145 matches!(protected_crc, ProtectedCrc::Second),
1146 "Second CRC should be protected when len2 > len1"
1147 );
1148 }
1149
1150 #[test_traced("DEBUG")]
1153 fn test_to_physical_pages_zero_copy_full_pages_and_materialized_partial() {
1154 let executor = deterministic::Runner::default();
1161 executor.start(|context: deterministic::Context| async move {
1162 let (blob, blob_size) = context
1164 .open("test_partition", b"to_physical_pages_zero_copy")
1165 .await
1166 .unwrap();
1167 assert_eq!(blob_size, 0);
1168
1169 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1171
1172 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1174 .await
1175 .unwrap();
1176
1177 let logical_page_size = PAGE_SIZE.get() as usize;
1180 let partial_len = 17usize;
1181 let data: Vec<u8> = (0..(logical_page_size * 2 + partial_len))
1182 .map(|i| (i % 251) as u8)
1183 .collect();
1184
1185 let mut buffer = Buffer::new(0, data.len(), cache_ref.pool().clone());
1187 let over_capacity = buffer.append(&data);
1188 assert!(!over_capacity);
1189
1190 let (physical_pages, partial_page_state) =
1192 append.to_physical_pages(&buffer, true, None);
1193
1194 assert_eq!(physical_pages.chunk_count(), 5);
1197
1198 let crc_record = partial_page_state.expect("partial page state must be returned");
1200 let (len, _) = crc_record.get_crc();
1201 assert_eq!(len as usize, partial_len);
1202
1203 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
1206 let coalesced = physical_pages.coalesce();
1207 assert_eq!(coalesced.len(), physical_page_size * 3);
1208
1209 assert_eq!(
1211 &coalesced.as_ref()[..logical_page_size],
1212 &data[..logical_page_size]
1213 );
1214 assert_eq!(
1215 &coalesced.as_ref()[physical_page_size..physical_page_size + logical_page_size],
1216 &data[logical_page_size..logical_page_size * 2],
1217 );
1218
1219 let partial_start = physical_page_size * 2;
1222 assert_eq!(
1223 &coalesced.as_ref()[partial_start..partial_start + partial_len],
1224 &data[logical_page_size * 2..],
1225 );
1226 assert!(coalesced.as_ref()
1227 [partial_start + partial_len..partial_start + logical_page_size]
1228 .iter()
1229 .all(|byte| *byte == 0));
1230
1231 assert!(Checksum::validate_page(&coalesced.as_ref()[..physical_page_size]).is_some());
1233 assert!(Checksum::validate_page(
1234 &coalesced.as_ref()[physical_page_size..physical_page_size * 2]
1235 )
1236 .is_some());
1237 assert!(Checksum::validate_page(
1238 &coalesced.as_ref()[physical_page_size * 2..physical_page_size * 3]
1239 )
1240 .is_some());
1241 });
1242 }
1243
1244 #[test_traced("DEBUG")]
1250 fn test_crc_slot1_protected() {
1251 let executor = deterministic::Runner::default();
1252 executor.start(|context: deterministic::Context| async move {
1253 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1254 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1255 let slot0_offset = PAGE_SIZE.get() as u64;
1256 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1257
1258 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1260 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1261 .await
1262 .unwrap();
1263 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1264 append.sync().await.unwrap();
1265 drop(append);
1266
1267 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1269 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1270 .await
1271 .unwrap();
1272 append
1273 .append(&(11..=30).collect::<Vec<u8>>())
1274 .await
1275 .unwrap();
1276 append.sync().await.unwrap();
1277 drop(append);
1278
1279 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1281 let page = blob
1282 .read_at(0, physical_page_size)
1283 .await
1284 .unwrap()
1285 .coalesce();
1286 let crc = read_crc_record_from_page(page.as_ref());
1287 assert!(
1288 crc.len2 > crc.len1,
1289 "Slot 1 should be authoritative (len2={} > len1={})",
1290 crc.len2,
1291 crc.len1
1292 );
1293
1294 let slot1_before: Vec<u8> = blob
1296 .read_at(slot1_offset, 6)
1297 .await
1298 .unwrap()
1299 .coalesce()
1300 .freeze()
1301 .into();
1302
1303 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1305 .await
1306 .unwrap();
1307 blob.sync().await.unwrap();
1308
1309 let slot0_mangled: Vec<u8> = blob
1311 .read_at(slot0_offset, 6)
1312 .await
1313 .unwrap()
1314 .coalesce()
1315 .freeze()
1316 .into();
1317 assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1318
1319 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1321 .await
1322 .unwrap();
1323 append
1324 .append(&(31..=50).collect::<Vec<u8>>())
1325 .await
1326 .unwrap();
1327 append.sync().await.unwrap();
1328 drop(append);
1329
1330 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1332
1333 let slot0_after: Vec<u8> = blob
1335 .read_at(slot0_offset, 6)
1336 .await
1337 .unwrap()
1338 .coalesce()
1339 .freeze()
1340 .into();
1341 assert_ne!(
1342 slot0_after, DUMMY_MARKER,
1343 "Slot 0 should have been overwritten with new CRC"
1344 );
1345
1346 let slot1_after: Vec<u8> = blob
1348 .read_at(slot1_offset, 6)
1349 .await
1350 .unwrap()
1351 .coalesce()
1352 .freeze()
1353 .into();
1354 assert_eq!(
1355 slot1_before, slot1_after,
1356 "Slot 1 was modified! Protected region violated."
1357 );
1358
1359 let page = blob
1361 .read_at(0, physical_page_size)
1362 .await
1363 .unwrap()
1364 .coalesce();
1365 let crc = read_crc_record_from_page(page.as_ref());
1366 assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1367 });
1368 }
1369
1370 #[test_traced("DEBUG")]
1376 fn test_crc_slot0_protected() {
1377 let executor = deterministic::Runner::default();
1378 executor.start(|context: deterministic::Context| async move {
1379 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1380 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1381 let slot0_offset = PAGE_SIZE.get() as u64;
1382 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1383
1384 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1386 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1387 .await
1388 .unwrap();
1389 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1390 append.sync().await.unwrap();
1391 drop(append);
1392
1393 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1395 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1396 .await
1397 .unwrap();
1398 append
1399 .append(&(11..=30).collect::<Vec<u8>>())
1400 .await
1401 .unwrap();
1402 append.sync().await.unwrap();
1403 drop(append);
1404
1405 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1407 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1408 .await
1409 .unwrap();
1410 append
1411 .append(&(31..=50).collect::<Vec<u8>>())
1412 .await
1413 .unwrap();
1414 append.sync().await.unwrap();
1415 drop(append);
1416
1417 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1419 let page = blob
1420 .read_at(0, physical_page_size)
1421 .await
1422 .unwrap()
1423 .coalesce();
1424 let crc = read_crc_record_from_page(page.as_ref());
1425 assert!(
1426 crc.len1 > crc.len2,
1427 "Slot 0 should be authoritative (len1={} > len2={})",
1428 crc.len1,
1429 crc.len2
1430 );
1431
1432 let slot0_before: Vec<u8> = blob
1434 .read_at(slot0_offset, 6)
1435 .await
1436 .unwrap()
1437 .coalesce()
1438 .freeze()
1439 .into();
1440
1441 blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
1443 .await
1444 .unwrap();
1445 blob.sync().await.unwrap();
1446
1447 let slot1_mangled: Vec<u8> = blob
1449 .read_at(slot1_offset, 6)
1450 .await
1451 .unwrap()
1452 .coalesce()
1453 .freeze()
1454 .into();
1455 assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1456
1457 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1459 .await
1460 .unwrap();
1461 append
1462 .append(&(51..=70).collect::<Vec<u8>>())
1463 .await
1464 .unwrap();
1465 append.sync().await.unwrap();
1466 drop(append);
1467
1468 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1470
1471 let slot1_after: Vec<u8> = blob
1473 .read_at(slot1_offset, 6)
1474 .await
1475 .unwrap()
1476 .coalesce()
1477 .freeze()
1478 .into();
1479 assert_ne!(
1480 slot1_after, DUMMY_MARKER,
1481 "Slot 1 should have been overwritten with new CRC"
1482 );
1483
1484 let slot0_after: Vec<u8> = blob
1486 .read_at(slot0_offset, 6)
1487 .await
1488 .unwrap()
1489 .coalesce()
1490 .freeze()
1491 .into();
1492 assert_eq!(
1493 slot0_before, slot0_after,
1494 "Slot 0 was modified! Protected region violated."
1495 );
1496
1497 let page = blob
1499 .read_at(0, physical_page_size)
1500 .await
1501 .unwrap()
1502 .coalesce();
1503 let crc = read_crc_record_from_page(page.as_ref());
1504 assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1505 });
1506 }
1507
1508 #[test_traced("DEBUG")]
1514 fn test_data_prefix_not_overwritten() {
1515 let executor = deterministic::Runner::default();
1516 executor.start(|context: deterministic::Context| async move {
1517 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1518 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1519
1520 let (blob, _) = context
1522 .open("test_partition", b"prefix_test")
1523 .await
1524 .unwrap();
1525 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1526 .await
1527 .unwrap();
1528 let data1: Vec<u8> = (1..=20).collect();
1529 append.append(&data1).await.unwrap();
1530 append.sync().await.unwrap();
1531 drop(append);
1532
1533 let (blob, size) = context
1535 .open("test_partition", b"prefix_test")
1536 .await
1537 .unwrap();
1538 assert_eq!(size, physical_page_size as u64);
1539
1540 let prefix_before: Vec<u8> = blob
1541 .read_at(0, 20)
1542 .await
1543 .unwrap()
1544 .coalesce()
1545 .freeze()
1546 .into();
1547
1548 blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
1550 blob.sync().await.unwrap();
1551
1552 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1554 .await
1555 .unwrap();
1556 append
1557 .append(&(21..=40).collect::<Vec<u8>>())
1558 .await
1559 .unwrap();
1560 append.sync().await.unwrap();
1561 drop(append);
1562
1563 let (blob, _) = context
1565 .open("test_partition", b"prefix_test")
1566 .await
1567 .unwrap();
1568
1569 let prefix_after: Vec<u8> = blob
1571 .read_at(0, 20)
1572 .await
1573 .unwrap()
1574 .coalesce()
1575 .freeze()
1576 .into();
1577 assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1578
1579 let overwritten: Vec<u8> = blob
1581 .read_at(25, 6)
1582 .await
1583 .unwrap()
1584 .coalesce()
1585 .freeze()
1586 .into();
1587 assert_eq!(
1588 overwritten,
1589 vec![26, 27, 28, 29, 30, 31],
1590 "New data should overwrite padding area"
1591 );
1592 });
1593 }
1594
1595 #[test_traced("DEBUG")]
1601 fn test_crc_slot_protection_across_page_boundary() {
1602 let executor = deterministic::Runner::default();
1603 executor.start(|context: deterministic::Context| async move {
1604 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1605 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1606 let slot0_offset = PAGE_SIZE.get() as u64;
1607 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1608
1609 let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1611 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1612 .await
1613 .unwrap();
1614 append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1615 append.sync().await.unwrap();
1616 drop(append);
1617
1618 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1620 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1621 .await
1622 .unwrap();
1623 append
1624 .append(&(51..=80).collect::<Vec<u8>>())
1625 .await
1626 .unwrap();
1627 append.sync().await.unwrap();
1628 drop(append);
1629
1630 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1632 let page = blob
1633 .read_at(0, physical_page_size)
1634 .await
1635 .unwrap()
1636 .coalesce();
1637 let crc = read_crc_record_from_page(page.as_ref());
1638 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1639
1640 let slot1_before: Vec<u8> = blob
1642 .read_at(slot1_offset, 6)
1643 .await
1644 .unwrap()
1645 .coalesce()
1646 .freeze()
1647 .into();
1648
1649 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1651 .await
1652 .unwrap();
1653 blob.sync().await.unwrap();
1654
1655 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1657 .await
1658 .unwrap();
1659 append
1660 .append(&(81..=120).collect::<Vec<u8>>())
1661 .await
1662 .unwrap();
1663 append.sync().await.unwrap();
1664 drop(append);
1665
1666 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1668 assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1669
1670 let slot0_after: Vec<u8> = blob
1672 .read_at(slot0_offset, 6)
1673 .await
1674 .unwrap()
1675 .coalesce()
1676 .freeze()
1677 .into();
1678 assert_ne!(
1679 slot0_after, DUMMY_MARKER,
1680 "Slot 0 should have full-page CRC"
1681 );
1682
1683 let slot1_after: Vec<u8> = blob
1685 .read_at(slot1_offset, 6)
1686 .await
1687 .unwrap()
1688 .coalesce()
1689 .freeze()
1690 .into();
1691 assert_eq!(
1692 slot1_before, slot1_after,
1693 "Slot 1 was modified during page boundary crossing!"
1694 );
1695
1696 let page0 = blob
1698 .read_at(0, physical_page_size)
1699 .await
1700 .unwrap()
1701 .coalesce();
1702 let crc0 = read_crc_record_from_page(page0.as_ref());
1703 assert_eq!(
1704 crc0.len1,
1705 PAGE_SIZE.get(),
1706 "Slot 0 should have full page length"
1707 );
1708
1709 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1711 .await
1712 .unwrap();
1713 assert_eq!(append.size().await, 120);
1714 let all_data: Vec<u8> = append.read_at(0, 120).await.unwrap().coalesce().into();
1715 let expected: Vec<u8> = (1..=120).collect();
1716 assert_eq!(all_data, expected);
1717 });
1718 }
1719
1720 #[test_traced("DEBUG")]
1729 fn test_crc_fallback_on_corrupted_primary() {
1730 let executor = deterministic::Runner::default();
1731 executor.start(|context: deterministic::Context| async move {
1732 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1733 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1734 let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1736
1737 let (blob, _) = context
1739 .open("test_partition", b"crc_fallback")
1740 .await
1741 .unwrap();
1742 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1743 .await
1744 .unwrap();
1745 let data1: Vec<u8> = (1..=10).collect();
1746 append.append(&data1).await.unwrap();
1747 append.sync().await.unwrap();
1748 drop(append);
1749
1750 let (blob, size) = context
1752 .open("test_partition", b"crc_fallback")
1753 .await
1754 .unwrap();
1755 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1756 .await
1757 .unwrap();
1758 append
1759 .append(&(11..=30).collect::<Vec<u8>>())
1760 .await
1761 .unwrap();
1762 append.sync().await.unwrap();
1763 drop(append);
1764
1765 let (blob, size) = context
1767 .open("test_partition", b"crc_fallback")
1768 .await
1769 .unwrap();
1770 assert_eq!(size, physical_page_size as u64);
1771
1772 let page = blob
1773 .read_at(0, physical_page_size)
1774 .await
1775 .unwrap()
1776 .coalesce();
1777 let crc = read_crc_record_from_page(page.as_ref());
1778 assert!(
1779 crc.len2 > crc.len1,
1780 "Slot 1 should be authoritative (len2={} > len1={})",
1781 crc.len2,
1782 crc.len1
1783 );
1784 assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1785 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1786
1787 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1789 .await
1790 .unwrap();
1791 assert_eq!(append.size().await, 30);
1792 let all_data: Vec<u8> = append.read_at(0, 30).await.unwrap().coalesce().into();
1793 let expected: Vec<u8> = (1..=30).collect();
1794 assert_eq!(all_data, expected);
1795 drop(append);
1796
1797 blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1800 .await
1801 .unwrap();
1802 blob.sync().await.unwrap();
1803
1804 let page = blob
1806 .read_at(0, physical_page_size)
1807 .await
1808 .unwrap()
1809 .coalesce();
1810 let crc = read_crc_record_from_page(page.as_ref());
1811 assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1812 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1813
1814 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1816 .await
1817 .unwrap();
1818
1819 assert_eq!(
1821 append.size().await,
1822 10,
1823 "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1824 );
1825
1826 let fallback_data: Vec<u8> = append.read_at(0, 10).await.unwrap().coalesce().into();
1828 assert_eq!(
1829 fallback_data, data1,
1830 "Fallback data should match original 10 bytes"
1831 );
1832
1833 let result = append.read_at(0, 11).await;
1835 assert!(result.is_err(), "Reading beyond fallback size should fail");
1836 });
1837 }
1838
1839 #[test_traced("DEBUG")]
1851 fn test_non_last_page_rejects_partial_fallback() {
1852 let executor = deterministic::Runner::default();
1853 executor.start(|context: deterministic::Context| async move {
1854 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1855 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1856 let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1858
1859 let (blob, _) = context
1861 .open("test_partition", b"non_last_page")
1862 .await
1863 .unwrap();
1864 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1865 .await
1866 .unwrap();
1867 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1868 append.sync().await.unwrap();
1869 drop(append);
1870
1871 let (blob, size) = context
1873 .open("test_partition", b"non_last_page")
1874 .await
1875 .unwrap();
1876 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1877 .await
1878 .unwrap();
1879 append
1881 .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1882 .await
1883 .unwrap();
1884 append.sync().await.unwrap();
1885 drop(append);
1886
1887 let (blob, size) = context
1889 .open("test_partition", b"non_last_page")
1890 .await
1891 .unwrap();
1892 let page = blob
1893 .read_at(0, physical_page_size)
1894 .await
1895 .unwrap()
1896 .coalesce();
1897 let crc = read_crc_record_from_page(page.as_ref());
1898 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1899 assert_eq!(
1900 crc.len2,
1901 PAGE_SIZE.get(),
1902 "Slot 1 should have len=103 (full page)"
1903 );
1904 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1905
1906 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1908 .await
1909 .unwrap();
1910 append
1912 .append(&(104..=113).collect::<Vec<u8>>())
1913 .await
1914 .unwrap();
1915 append.sync().await.unwrap();
1916 drop(append);
1917
1918 let (blob, size) = context
1920 .open("test_partition", b"non_last_page")
1921 .await
1922 .unwrap();
1923 assert_eq!(
1924 size,
1925 (physical_page_size * 2) as u64,
1926 "Should have 2 physical pages"
1927 );
1928
1929 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1931 .await
1932 .unwrap();
1933 assert_eq!(append.size().await, 113);
1934 let all_data: Vec<u8> = append.read_at(0, 113).await.unwrap().coalesce().into();
1935 let expected: Vec<u8> = (1..=113).collect();
1936 assert_eq!(all_data, expected);
1937 drop(append);
1938
1939 blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1941 .await
1942 .unwrap();
1943 blob.sync().await.unwrap();
1944
1945 let page = blob
1947 .read_at(0, physical_page_size)
1948 .await
1949 .unwrap()
1950 .coalesce();
1951 let crc = read_crc_record_from_page(page.as_ref());
1952 assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1953 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1954 assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1956
1957 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1963 .await
1964 .unwrap();
1965
1966 assert_eq!(append.size().await, 113);
1969
1970 let result = append.read_at(0, 10).await;
1973 assert!(
1974 result.is_err(),
1975 "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1976 result
1977 );
1978 drop(append);
1979
1980 let (blob, size) = context
1982 .open("test_partition", b"non_last_page")
1983 .await
1984 .unwrap();
1985 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1986 .await
1987 .unwrap();
1988 let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1989
1990 let result = replay.ensure(1).await;
1992 assert!(
1993 result.is_err(),
1994 "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1995 result
1996 );
1997 });
1998 }
1999
2000 #[test]
2001 fn test_resize_shrink_validates_crc() {
2002 let executor = deterministic::Runner::default();
2005
2006 executor.start(|context| async move {
2007 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2008 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2009
2010 let (blob, size) = context
2011 .open("test_partition", b"resize_crc_test")
2012 .await
2013 .unwrap();
2014
2015 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2016 .await
2017 .unwrap();
2018
2019 let data: Vec<u8> = (0..=249).collect();
2022 append.append(&data).await.unwrap();
2023 append.sync().await.unwrap();
2024 assert_eq!(append.size().await, 250);
2025 drop(append);
2026
2027 let (blob, size) = context
2029 .open("test_partition", b"resize_crc_test")
2030 .await
2031 .unwrap();
2032 assert_eq!(size as usize, physical_page_size * 3);
2033
2034 let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
2036 blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
2037 .await
2038 .unwrap();
2039 blob.sync().await.unwrap();
2040
2041 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2044 .await
2045 .unwrap();
2046 assert_eq!(append.size().await, 250);
2047
2048 let result = append.resize(150).await;
2052 assert!(
2053 matches!(result, Err(crate::Error::InvalidChecksum)),
2054 "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
2055 result
2056 );
2057 });
2058 }
2059
2060 #[test]
2061 fn test_reopen_partial_tail_append_and_resize() {
2062 let executor = deterministic::Runner::default();
2063
2064 executor.start(|context| async move {
2065 const PAGE_SIZE: NonZeroU16 = NZU16!(64);
2066 const BUFFER_SIZE: usize = 256;
2067
2068 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(4));
2069
2070 let (blob, size) = context
2071 .open("test_partition", b"partial_tail_test")
2072 .await
2073 .unwrap();
2074
2075 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2076 .await
2077 .unwrap();
2078
2079 append.append(&[1, 2, 3, 4, 5]).await.unwrap();
2081 append.sync().await.unwrap();
2082 assert_eq!(append.size().await, 5);
2083 drop(append);
2084
2085 let (blob, size) = context
2086 .open("test_partition", b"partial_tail_test")
2087 .await
2088 .unwrap();
2089
2090 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2091 .await
2092 .unwrap();
2093 assert_eq!(append.size().await, 5);
2094
2095 append.append(&[6, 7, 8]).await.unwrap();
2096 append.resize(6).await.unwrap();
2097 append.sync().await.unwrap();
2098
2099 let data: Vec<u8> = append.read_at(0, 6).await.unwrap().coalesce().into();
2100 assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
2101 });
2102 }
2103
2104 #[test]
2105 fn test_corrupted_crc_len_too_large() {
2106 let executor = deterministic::Runner::default();
2107
2108 executor.start(|context| async move {
2109 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2110 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2111
2112 let (blob, size) = context
2114 .open("test_partition", b"crc_len_test")
2115 .await
2116 .unwrap();
2117
2118 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2119 .await
2120 .unwrap();
2121
2122 append.append(&[0x42; 50]).await.unwrap();
2123 append.sync().await.unwrap();
2124 drop(append);
2125
2126 let (blob, size) = context
2128 .open("test_partition", b"crc_len_test")
2129 .await
2130 .unwrap();
2131 assert_eq!(size as usize, physical_page_size);
2132
2133 let crc_offset = PAGE_SIZE.get() as u64;
2135
2136 let bad_crc_record: [u8; 12] = [
2139 0xFF, 0xFF, 0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
2144 blob.write_at(crc_offset, bad_crc_record.to_vec())
2145 .await
2146 .unwrap();
2147 blob.sync().await.unwrap();
2148
2149 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2151
2152 match result {
2155 Ok(append) => {
2156 let recovered_size = append.size().await;
2158 assert_eq!(
2159 recovered_size, 0,
2160 "Corrupted page should be truncated, size should be 0"
2161 );
2162 }
2163 Err(e) => {
2164 assert!(
2166 matches!(e, crate::Error::InvalidChecksum),
2167 "Expected InvalidChecksum error, got: {:?}",
2168 e
2169 );
2170 }
2171 }
2172 });
2173 }
2174
2175 #[test]
2176 fn test_corrupted_crc_both_slots_len_too_large() {
2177 let executor = deterministic::Runner::default();
2178
2179 executor.start(|context| async move {
2180 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2181
2182 let (blob, size) = context
2184 .open("test_partition", b"crc_both_bad")
2185 .await
2186 .unwrap();
2187
2188 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2189 .await
2190 .unwrap();
2191
2192 append.append(&[0x42; 50]).await.unwrap();
2193 append.sync().await.unwrap();
2194 drop(append);
2195
2196 let (blob, size) = context
2198 .open("test_partition", b"crc_both_bad")
2199 .await
2200 .unwrap();
2201
2202 let crc_offset = PAGE_SIZE.get() as u64;
2203
2204 let bad_crc_record: [u8; 12] = [
2206 0x01, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0x02, 0x00, 0xCA, 0xFE, 0xBA, 0xBE, ];
2211 blob.write_at(crc_offset, bad_crc_record.to_vec())
2212 .await
2213 .unwrap();
2214 blob.sync().await.unwrap();
2215
2216 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2218
2219 match result {
2220 Ok(append) => {
2221 assert_eq!(append.size().await, 0);
2223 }
2224 Err(e) => {
2225 assert!(
2226 matches!(e, crate::Error::InvalidChecksum),
2227 "Expected InvalidChecksum, got: {:?}",
2228 e
2229 );
2230 }
2231 }
2232 });
2233 }
2234}