1use super::read::{PageReader, Replay};
29use crate::{
30 buffer::{
31 paged::{CacheRef, Checksum, CHECKSUM_SIZE, CHECKSUM_SLOT_SIZE},
32 tip::Buffer,
33 },
34 Blob, Error, IoBuf, IoBufMut, IoBufs,
35};
36use bytes::BufMut;
37use commonware_cryptography::Crc32;
38use commonware_utils::sync::{AsyncRwLock, AsyncRwLockWriteGuard};
39use futures::stream::{FuturesUnordered, StreamExt};
40use std::{
41 num::{NonZeroU16, NonZeroUsize},
42 sync::Arc,
43};
44use tracing::warn;
45
46#[derive(Clone, Copy)]
48enum ProtectedCrc {
49 First,
50 Second,
51}
52
53#[derive(Clone)]
55struct BlobState<B: Blob> {
56 blob: B,
57
58 current_page: u64,
60
61 partial_page_state: Option<Checksum>,
64
65 needs_sync: bool,
67}
68
69impl<B: Blob> BlobState<B> {
70 async fn write_at(&mut self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
72 self.blob.write_at(offset, bufs).await?;
73 self.needs_sync = true;
74 Ok(())
75 }
76
77 async fn write_at_sync(
82 &mut self,
83 offset: u64,
84 bufs: impl Into<IoBufs> + Send,
85 ) -> Result<(), Error> {
86 if self.needs_sync {
87 self.write_at(offset, bufs).await?;
88 self.sync().await
89 } else {
90 self.needs_sync = true;
93 self.blob.write_at_sync(offset, bufs).await?;
94 self.needs_sync = false;
95 Ok(())
96 }
97 }
98
99 async fn write_at_maybe_sync(
101 &mut self,
102 offset: u64,
103 bufs: impl Into<IoBufs> + Send,
104 sync: bool,
105 ) -> Result<(), Error> {
106 if sync {
107 self.write_at_sync(offset, bufs).await
108 } else {
109 self.write_at(offset, bufs).await
110 }
111 }
112
113 async fn resize(&mut self, len: u64) -> Result<(), Error> {
115 self.blob.resize(len).await?;
116 self.needs_sync = true;
117 Ok(())
118 }
119
120 async fn sync(&mut self) -> Result<(), Error> {
122 if !self.needs_sync {
123 return Ok(());
124 }
125 self.blob.sync().await?;
126 self.needs_sync = false;
127 Ok(())
128 }
129}
130
131#[derive(Clone)]
134pub struct Append<B: Blob> {
135 blob_state: Arc<AsyncRwLock<BlobState<B>>>,
137
138 id: u64,
140
141 cache_ref: CacheRef,
143
144 buffer: Arc<AsyncRwLock<Buffer>>,
147}
148
149fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
152 let floor = page_size as usize * 2;
153 if capacity < floor {
154 warn!(
155 floor,
156 "requested buffer capacity is too low, increasing it to floor"
157 );
158 floor
159 } else {
160 capacity
161 }
162}
163
164impl<B: Blob> Append<B> {
165 pub async fn new(
170 blob: B,
171 original_blob_size: u64,
172 capacity: usize,
173 cache_ref: CacheRef,
174 ) -> Result<Self, Error> {
175 let (partial_page_state, pages, invalid_data_found) =
176 Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
177 if invalid_data_found {
178 let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
180 warn!(
181 original_blob_size,
182 new_blob_size, "truncating blob to remove invalid data"
183 );
184 blob.resize(new_blob_size).await?;
185 blob.sync().await?;
186 }
187
188 let capacity = capacity_with_floor(capacity, cache_ref.page_size());
189 let needs_sync = !invalid_data_found; let (blob_state, partial_data) = match partial_page_state {
192 Some((partial_page, crc_record)) => (
193 BlobState {
194 blob,
195 current_page: pages - 1,
196 partial_page_state: Some(crc_record),
197 needs_sync,
198 },
199 Some(partial_page),
200 ),
201 None => (
202 BlobState {
203 blob,
204 current_page: pages,
205 partial_page_state: None,
206 needs_sync,
207 },
208 None,
209 ),
210 };
211
212 let buffer = Buffer::from(
213 blob_state.current_page * cache_ref.page_size(),
214 partial_data.unwrap_or_default(),
215 capacity,
216 cache_ref.pool().clone(),
217 );
218
219 Ok(Self {
220 blob_state: Arc::new(AsyncRwLock::new(blob_state)),
221 id: cache_ref.next_id(),
222 cache_ref,
223 buffer: Arc::new(AsyncRwLock::new(buffer)),
224 })
225 }
226
227 async fn read_last_valid_page(
245 blob: &B,
246 blob_size: u64,
247 page_size: u64,
248 ) -> Result<(Option<(IoBuf, Checksum)>, u64, bool), Error> {
249 let physical_page_size = page_size + CHECKSUM_SIZE;
250 let partial_bytes = blob_size % physical_page_size;
251 let mut last_page_end = blob_size - partial_bytes;
252
253 let mut invalid_data_found = partial_bytes != 0;
256
257 while last_page_end != 0 {
258 let page_start = last_page_end - physical_page_size;
260 let buf = blob
261 .read_at(page_start, physical_page_size as usize)
262 .await?
263 .coalesce()
264 .freeze();
265
266 match Checksum::validate_page(buf.as_ref()) {
267 Some(crc_record) => {
268 let (len, _) = crc_record.get_crc();
270 let len = len as u64;
271 if len != page_size {
272 let logical_bytes = buf.slice(..len as usize);
274 return Ok((
275 Some((logical_bytes, crc_record)),
276 last_page_end / physical_page_size,
277 invalid_data_found,
278 ));
279 }
280 return Ok((None, last_page_end / physical_page_size, invalid_data_found));
282 }
283 None => {
284 last_page_end = page_start;
286 invalid_data_found = true;
287 }
288 }
289 }
290
291 Ok((None, 0, invalid_data_found))
293 }
294
295 pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
297 let mut buffer = self.buffer.write().await;
298
299 if !buffer.append(buf) {
300 return Ok(());
301 }
302
303 self.flush_internal(buffer, false, false).await?;
305 Ok(())
306 }
307
308 async fn flush_internal(
328 &self,
329 mut buf_guard: AsyncRwLockWriteGuard<'_, Buffer>,
330 write_partial_page: bool,
331 sync: bool,
332 ) -> Result<bool, Error> {
333 let buffer = &mut *buf_guard;
334
335 let old_partial_page_state = {
339 let blob_state = self.blob_state.read().await;
340 blob_state.partial_page_state.clone()
341 };
342
343 let (mut physical_pages, partial_page_state) = self.to_physical_pages(
346 &*buffer,
347 write_partial_page,
348 old_partial_page_state.as_ref(),
349 );
350
351 if physical_pages.is_empty() {
353 return Ok(false);
354 }
355
356 let logical_page_size = self.cache_ref.page_size() as usize;
359 let pages_to_cache = buffer.len() / logical_page_size;
360 let bytes_to_drain = pages_to_cache * logical_page_size;
361
362 let cache_pages = if pages_to_cache > 0 {
364 Some((buffer.offset, buffer.slice(..bytes_to_drain)))
365 } else {
366 None
367 };
368
369 if bytes_to_drain == buffer.len() && bytes_to_drain != 0 {
372 let _ = buffer
373 .take()
374 .expect("take must succeed when flush drains all buffered bytes");
375 } else if bytes_to_drain != 0 {
376 buffer.drop_prefix(bytes_to_drain);
377 buffer.offset += bytes_to_drain as u64;
378 }
379 let new_offset = buffer.offset;
380
381 if let Some((cache_offset, pages)) = cache_pages {
384 let remaining = self.cache_ref.cache(self.id, pages.as_ref(), cache_offset);
385 assert_eq!(remaining, 0, "cached full-page prefix must be page-aligned");
386 }
387
388 let mut blob_state = self.blob_state.write().await;
391
392 drop(buf_guard);
395
396 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
397 let write_at_offset = blob_state.current_page * physical_page_size as u64;
398
399 let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
401
402 blob_state.current_page += pages_to_cache as u64;
406 blob_state.partial_page_state = partial_page_state;
407
408 assert_eq!(
410 blob_state.current_page * self.cache_ref.page_size(),
411 new_offset
412 );
413
414 match protected_regions {
417 Some((prefix_len, ProtectedCrc::First)) => {
418 let has_first_write = prefix_len < logical_page_size;
425 if has_first_write {
426 let _ = physical_pages.split_to(prefix_len);
427 let first_payload = physical_pages.split_to(logical_page_size - prefix_len);
428 let has_second_write = physical_pages.len() > CHECKSUM_SLOT_SIZE;
429 blob_state
430 .write_at_maybe_sync(
431 write_at_offset + prefix_len as u64,
432 first_payload,
433 sync && !has_second_write,
434 )
435 .await?;
436 if !has_second_write {
437 return Ok(sync);
438 }
439 } else {
440 let _ = physical_pages.split_to(logical_page_size);
442 }
443
444 if physical_pages.len() > CHECKSUM_SLOT_SIZE {
446 let _ = physical_pages.split_to(CHECKSUM_SLOT_SIZE);
447 blob_state
448 .write_at_maybe_sync(
449 write_at_offset + (logical_page_size + CHECKSUM_SLOT_SIZE) as u64,
450 physical_pages,
451 sync && !has_first_write,
452 )
453 .await?;
454 if !has_first_write {
455 return Ok(sync);
456 }
457 }
458
459 Ok(false)
460 }
461 Some((prefix_len, ProtectedCrc::Second)) => {
462 let first_crc_end = logical_page_size + CHECKSUM_SLOT_SIZE;
469 let skip = physical_page_size - first_crc_end;
470 let has_first_write = prefix_len < first_crc_end;
471 if has_first_write {
472 let _ = physical_pages.split_to(prefix_len);
473 let first_payload = physical_pages.split_to(first_crc_end - prefix_len);
474 let has_second_write = physical_pages.len() > skip;
475 blob_state
476 .write_at_maybe_sync(
477 write_at_offset + prefix_len as u64,
478 first_payload,
479 sync && !has_second_write,
480 )
481 .await?;
482 if !has_second_write {
483 return Ok(sync);
484 }
485 } else {
486 let _ = physical_pages.split_to(first_crc_end);
488 }
489
490 if physical_pages.len() > skip {
492 let _ = physical_pages.split_to(skip);
493 blob_state
494 .write_at_maybe_sync(
495 write_at_offset + physical_page_size as u64,
496 physical_pages,
497 sync && !has_first_write,
498 )
499 .await?;
500 if !has_first_write {
501 return Ok(sync);
502 }
503 }
504
505 Ok(false)
506 }
507 None => {
508 blob_state
510 .write_at_maybe_sync(write_at_offset, physical_pages, sync)
511 .await?;
512 Ok(sync)
513 }
514 }
515 }
516
517 pub async fn size(&self) -> u64 {
519 let buffer = self.buffer.read().await;
520 buffer.size()
521 }
522
523 pub fn try_size(&self) -> Option<u64> {
528 let buffer = self.buffer.try_read().ok()?;
529 Some(buffer.size())
530 }
531
532 pub fn try_read_sync(&self, offset: u64, buf: &mut [u8]) -> bool {
540 if self.cache_ref.read_cached(self.id, buf, offset) == buf.len() {
541 return true;
542 }
543 let Some(end_offset) = offset.checked_add(buf.len() as u64) else {
544 return false;
545 };
546 let Ok(buffer) = self.buffer.try_read() else {
547 return false;
548 };
549 if offset < buffer.offset || end_offset > buffer.size() {
550 return false;
551 }
552 let src_start = (offset - buffer.offset) as usize;
553 buf.copy_from_slice(&buffer.as_ref()[src_start..src_start + buf.len()]);
554 true
555 }
556
557 pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
559 let mut buf = unsafe { self.cache_ref.pool().alloc_len(len) };
562 self.read_into(buf.as_mut(), offset).await?;
563 Ok(buf.into())
564 }
565
566 pub async fn read_up_to(
575 &self,
576 logical_offset: u64,
577 len: usize,
578 bufs: impl Into<IoBufMut> + Send,
579 ) -> Result<(IoBufMut, usize), Error> {
580 let mut bufs = bufs.into();
581 if len == 0 {
582 bufs.truncate(0);
583 return Ok((bufs, 0));
584 }
585 let blob_size = self.size().await;
586 let available = (blob_size.saturating_sub(logical_offset) as usize).min(len);
587 if available == 0 {
588 return Err(Error::BlobInsufficientLength);
589 }
590 unsafe { bufs.set_len(available) };
592 self.read_into(bufs.as_mut(), logical_offset).await?;
593
594 Ok((bufs, available))
595 }
596
597 pub async fn read_many_into(
603 &self,
604 buf: &mut [u8],
605 offsets: &[u64],
606 item_size: usize,
607 ) -> Result<(), Error> {
608 assert_eq!(
609 buf.len(),
610 offsets
611 .len()
612 .checked_mul(item_size)
613 .expect("read_many_into buffer length overflow"),
614 "read_many_into requires buf.len() == offsets.len() * item_size"
615 );
616 if offsets.is_empty() {
617 return Ok(());
618 }
619
620 let last_end = offsets[offsets.len() - 1]
621 .checked_add(item_size as u64)
622 .ok_or(Error::OffsetOverflow)?;
623
624 let buffer = self.buffer.read().await;
626 if last_end > buffer.size() {
627 return Err(Error::BlobInsufficientLength);
628 }
629
630 if item_size == 0 {
636 return Ok(());
637 }
638 let mut cache_ranges: Vec<(&mut [u8], u64)> = Vec::new();
639 for (item_buf, &offset) in buf.chunks_exact_mut(item_size).zip(offsets.iter()) {
640 let end = offset + item_size as u64;
641
642 if end <= buffer.offset {
643 cache_ranges.push((item_buf, offset));
645 } else if offset >= buffer.offset {
646 let src = (offset - buffer.offset) as usize;
648 item_buf.copy_from_slice(&buffer.as_ref()[src..src + item_size]);
649 } else {
650 let prefix_len = (buffer.offset - offset) as usize;
652 item_buf[prefix_len..].copy_from_slice(&buffer.as_ref()[..item_size - prefix_len]);
653 cache_ranges.push((&mut item_buf[..prefix_len], offset));
654 }
655 }
656
657 drop(buffer);
658
659 if cache_ranges.is_empty() {
660 return Ok(());
661 }
662
663 self.cache_ref.read_cached_many(self.id, &mut cache_ranges);
666 if cache_ranges.is_empty() {
667 return Ok(());
668 }
669
670 let blob_guard = self.blob_state.read().await;
672 let mut reads = cache_ranges
673 .iter_mut()
674 .map(|(item_buf, offset)| {
675 self.cache_ref
676 .read(&blob_guard.blob, self.id, item_buf, *offset)
677 })
678 .collect::<FuturesUnordered<_>>();
679 while let Some(result) = reads.next().await {
680 result?;
681 }
682
683 Ok(())
684 }
685
686 pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
691 let end_offset = logical_offset
693 .checked_add(buf.len() as u64)
694 .ok_or(Error::OffsetOverflow)?;
695
696 let buffer = self.buffer.read().await;
698
699 if end_offset > buffer.size() {
701 return Err(Error::BlobInsufficientLength);
702 }
703
704 let remaining = if end_offset <= buffer.offset {
706 buf.len()
708 } else {
709 let overlap_start = buffer.offset.max(logical_offset);
711 let dst_start = (overlap_start - logical_offset) as usize;
712 let src_start = (overlap_start - buffer.offset) as usize;
713 let copied = buf.len() - dst_start;
714 buf[dst_start..].copy_from_slice(&buffer.as_ref()[src_start..src_start + copied]);
715 dst_start
716 };
717
718 drop(buffer);
720
721 if remaining == 0 {
722 return Ok(());
723 }
724
725 let cached = self
728 .cache_ref
729 .read_cached(self.id, &mut buf[..remaining], logical_offset);
730
731 if cached == remaining {
732 return Ok(());
734 }
735
736 let blob_guard = self.blob_state.read().await;
739
740 let uncached_offset = logical_offset + cached as u64;
742 let uncached_len = remaining - cached;
743 self.cache_ref
744 .read(
745 &blob_guard.blob,
746 self.id,
747 &mut buf[cached..cached + uncached_len],
748 uncached_offset,
749 )
750 .await
751 }
752
753 fn identify_protected_regions(
764 partial_page_state: Option<&Checksum>,
765 ) -> Option<(usize, ProtectedCrc)> {
766 let crc_record = partial_page_state?;
767 let (old_len, _) = crc_record.get_crc();
768 let protected_crc = if crc_record.len1 >= crc_record.len2 {
770 ProtectedCrc::First
771 } else {
772 ProtectedCrc::Second
773 };
774 Some((old_len as usize, protected_crc))
775 }
776
777 fn to_physical_pages(
790 &self,
791 buffer: &Buffer,
792 include_partial_page: bool,
793 old_crc_record: Option<&Checksum>,
794 ) -> (IoBufs, Option<Checksum>) {
795 let logical_page_size = self.cache_ref.page_size() as usize;
796 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
797 let pages_to_write = buffer.len() / logical_page_size;
798 let mut write_buffer = IoBufs::default();
799 let buffer_data = buffer.as_ref();
800
801 if pages_to_write > 0 {
802 let logical_page_size_u16 =
803 u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
804
805 let mut crcs = self
808 .cache_ref
809 .pool()
810 .alloc(CHECKSUM_SIZE as usize * pages_to_write);
811 for page in 0..pages_to_write {
812 let start_read_idx = page * logical_page_size;
813 let end_read_idx = start_read_idx + logical_page_size;
814 let logical_page = &buffer_data[start_read_idx..end_read_idx];
815 let crc = Crc32::checksum(logical_page);
816
817 let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
820 Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
821 } else {
822 Checksum::new(logical_page_size_u16, crc)
823 };
824 crcs.put_slice(&crc_record.to_bytes());
825 }
826 let crc_blob = crcs.freeze();
827
828 for page in 0..pages_to_write {
830 let start_read_idx = page * logical_page_size;
831 let end_read_idx = start_read_idx + logical_page_size;
832 write_buffer.append(buffer.slice(start_read_idx..end_read_idx));
833
834 let crc_start = page * CHECKSUM_SIZE as usize;
835 write_buffer.append(crc_blob.slice(crc_start..crc_start + CHECKSUM_SIZE as usize));
836 }
837 }
838
839 if !include_partial_page {
840 return (write_buffer, None);
841 }
842
843 let partial_page = &buffer_data[pages_to_write * logical_page_size..];
844 if partial_page.is_empty() {
845 return (write_buffer, None);
847 }
848
849 if pages_to_write == 0 {
852 if let Some(old_crc) = old_crc_record {
853 let (old_len, _) = old_crc.get_crc();
854 if partial_page.len() == old_len as usize {
855 return (write_buffer, None);
856 }
857 }
858 }
859 let partial_len = partial_page.len();
860 let crc = Crc32::checksum(partial_page);
861
862 let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
865 Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
866 } else {
867 Checksum::new(partial_len as u16, crc)
868 };
869
870 let mut padded = self.cache_ref.pool().alloc(physical_page_size);
873 padded.put_slice(partial_page);
874 let zero_count = logical_page_size - partial_len;
875 if zero_count > 0 {
876 padded.put_bytes(0, zero_count);
877 }
878 padded.put_slice(&crc_record.to_bytes());
879 write_buffer.append(padded.freeze());
880
881 (write_buffer, Some(crc_record))
884 }
885
886 fn checksum_slot_bytes(len: u16, crc: u32) -> [u8; CHECKSUM_SLOT_SIZE] {
888 let mut bytes = [0u8; CHECKSUM_SLOT_SIZE];
889 bytes[..2].copy_from_slice(&len.to_be_bytes());
890 bytes[2..].copy_from_slice(&crc.to_be_bytes());
891 bytes
892 }
893
894 const fn build_crc_record_preserving_old(
897 new_len: u16,
898 new_crc: u32,
899 old_crc: &Checksum,
900 ) -> Checksum {
901 let (old_len, old_crc_val) = old_crc.get_crc();
902 if old_crc.len1 >= old_crc.len2 {
904 Checksum {
906 len1: old_len,
907 crc1: old_crc_val,
908 len2: new_len,
909 crc2: new_crc,
910 }
911 } else {
912 Checksum {
914 len1: new_len,
915 crc1: new_crc,
916 len2: old_len,
917 crc2: old_crc_val,
918 }
919 }
920 }
921
922 async fn sync_partial_page_shrink(
924 blob_state: &mut BlobState<B>,
925 page: u64,
926 logical_page_size: u64,
927 new_len: u16,
928 new_crc: u32,
929 old_crc: &Checksum,
930 ) -> Result<Checksum, Error> {
931 let physical_page_size = logical_page_size
936 .checked_add(CHECKSUM_SIZE)
937 .ok_or(Error::OffsetOverflow)?;
938 let crc_start = page
939 .checked_mul(physical_page_size)
940 .and_then(|start| start.checked_add(logical_page_size))
941 .ok_or(Error::OffsetOverflow)?;
942 let (new_slot_start, old_slot_start) = if old_crc.len1 >= old_crc.len2 {
943 (CHECKSUM_SLOT_SIZE, 0)
944 } else {
945 (0, CHECKSUM_SLOT_SIZE)
946 };
947
948 let new_slot_offset = crc_start
951 .checked_add(new_slot_start as u64)
952 .ok_or(Error::OffsetOverflow)?;
953 let staged_slot = Self::checksum_slot_bytes(0, new_crc);
954 blob_state
955 .write_at_sync(new_slot_offset, staged_slot.to_vec())
956 .await?;
957
958 blob_state
961 .write_at_sync(new_slot_offset, new_len.to_be_bytes().to_vec())
962 .await?;
963
964 let old_slot_offset = crc_start
968 .checked_add(old_slot_start as u64)
969 .ok_or(Error::OffsetOverflow)?;
970 let len_size = std::mem::size_of::<u16>();
971 blob_state
972 .write_at_sync(old_slot_offset, vec![0u8; len_size])
973 .await?;
974
975 let final_record = if new_slot_start == 0 {
976 Checksum {
977 len1: new_len,
978 crc1: new_crc,
979 len2: 0,
980 crc2: 0,
981 }
982 } else {
983 Checksum {
984 len1: 0,
985 crc1: 0,
986 len2: new_len,
987 crc2: new_crc,
988 }
989 };
990 Ok(final_record)
991 }
992
993 pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
1001 let logical_page_size = self.cache_ref.page_size();
1002 let logical_page_size_nz =
1003 NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
1004
1005 {
1007 let buf_guard = self.buffer.write().await;
1008 self.flush_internal(buf_guard, true, false).await?;
1009 }
1010
1011 let physical_page_size = logical_page_size + CHECKSUM_SIZE;
1013 let prefetch_pages = buffer_size.get() / physical_page_size as usize;
1014 let prefetch_pages = prefetch_pages.max(1); let blob_guard = self.blob_state.read().await;
1016
1017 let (physical_blob_size, logical_blob_size) =
1019 blob_guard.partial_page_state.as_ref().map_or_else(
1020 || {
1021 let physical = physical_page_size * blob_guard.current_page;
1023 let logical = logical_page_size * blob_guard.current_page;
1024 (physical, logical)
1025 },
1026 |crc_record| {
1027 let (partial_len, _) = crc_record.get_crc();
1029 let partial_len = partial_len as u64;
1030 let physical = physical_page_size * (blob_guard.current_page + 1);
1032 let logical = logical_page_size * blob_guard.current_page + partial_len;
1034 (physical, logical)
1035 },
1036 );
1037
1038 let reader = PageReader::new(
1039 blob_guard.blob.clone(),
1040 physical_blob_size,
1041 logical_blob_size,
1042 prefetch_pages,
1043 logical_page_size_nz,
1044 );
1045 Ok(Replay::new(reader))
1046 }
1047}
1048
1049impl<B: Blob> Append<B> {
1050 pub async fn sync(&self) -> Result<(), Error> {
1056 let buf_guard = self.buffer.write().await;
1058
1059 if self.flush_internal(buf_guard, true, true).await? {
1061 return Ok(());
1062 }
1063
1064 let mut blob_state = self.blob_state.write().await;
1067 blob_state.sync().await
1068 }
1069
1070 pub async fn resize(&self, size: u64) -> Result<(), Error> {
1081 let current_size = self.size().await;
1082 if size == current_size {
1083 return Ok(());
1084 }
1085
1086 if size > current_size {
1088 let zeros_needed = (size - current_size) as usize;
1089 let mut zeros = self.cache_ref.pool().alloc(zeros_needed);
1090 zeros.put_bytes(0, zeros_needed);
1091 self.append(zeros.as_ref()).await?;
1092 return Ok(());
1093 }
1094
1095 self.shrink(size).await
1096 }
1097
1098 async fn shrink(&self, target_size: u64) -> Result<(), Error> {
1100 let logical_page_size = self.cache_ref.page_size();
1101 let physical_page_size = logical_page_size
1102 .checked_add(CHECKSUM_SIZE)
1103 .ok_or(Error::OffsetOverflow)?;
1104
1105 self.sync().await?;
1107
1108 let mut buf_guard = self.buffer.write().await;
1110 let mut blob_guard = self.blob_state.write().await;
1111
1112 let full_pages = target_size / logical_page_size;
1114 let partial_bytes = target_size % logical_page_size;
1115 let physical_pages = full_pages
1116 .checked_add(u64::from(partial_bytes > 0))
1117 .ok_or(Error::OffsetOverflow)?;
1118 let new_physical_size = physical_pages
1119 .checked_mul(physical_page_size)
1120 .ok_or(Error::OffsetOverflow)?;
1121 let tail_offset = full_pages
1122 .checked_mul(logical_page_size)
1123 .ok_or(Error::OffsetOverflow)?;
1124 let current_physical_size = if blob_guard.partial_page_state.is_some() {
1125 blob_guard
1126 .current_page
1127 .checked_add(1)
1128 .and_then(|pages| pages.checked_mul(physical_page_size))
1129 .ok_or(Error::OffsetOverflow)?
1130 } else {
1131 blob_guard
1132 .current_page
1133 .checked_mul(physical_page_size)
1134 .ok_or(Error::OffsetOverflow)?
1135 };
1136
1137 if new_physical_size != current_physical_size {
1140 blob_guard.resize(new_physical_size).await?;
1141 }
1142
1143 self.cache_ref.invalidate_from(self.id, full_pages);
1149
1150 if partial_bytes > 0 {
1151 return self
1152 .shrink_to_partial(
1153 &mut buf_guard,
1154 &mut blob_guard,
1155 full_pages,
1156 partial_bytes,
1157 logical_page_size,
1158 tail_offset,
1159 )
1160 .await;
1161 }
1162
1163 blob_guard.partial_page_state = None;
1165 blob_guard.current_page = full_pages;
1166 buf_guard.offset = tail_offset;
1167 buf_guard.clear();
1168
1169 Ok(())
1170 }
1171
1172 async fn shrink_to_partial(
1174 &self,
1175 buf_guard: &mut Buffer,
1176 blob_guard: &mut BlobState<B>,
1177 full_pages: u64,
1178 partial_bytes: u64,
1179 logical_page_size: u64,
1180 tail_offset: u64,
1181 ) -> Result<(), Error> {
1182 blob_guard.current_page = full_pages;
1185 buf_guard.offset = tail_offset;
1186
1187 let (page_data, old_crc) = super::get_page_with_checksum_from_blob(
1188 &blob_guard.blob,
1189 full_pages,
1190 logical_page_size,
1191 )
1192 .await?;
1193
1194 if (page_data.len() as u64) < partial_bytes {
1196 return Err(Error::InvalidChecksum);
1197 }
1198
1199 buf_guard.clear();
1200 let new_data = &page_data.as_ref()[..partial_bytes as usize];
1201 let over_capacity = buf_guard.append(new_data);
1202 assert!(!over_capacity);
1203
1204 let final_record = Self::sync_partial_page_shrink(
1205 blob_guard,
1206 full_pages,
1207 logical_page_size,
1208 partial_bytes as u16,
1209 Crc32::checksum(new_data),
1210 &old_crc,
1211 )
1212 .await?;
1213 blob_guard.partial_page_state = Some(final_record);
1214
1215 Ok(())
1216 }
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use super::*;
1222 use crate::{
1223 buffer::tests::SyncTrackingBlob, deterministic, telemetry::metrics::Registry, Buf,
1224 BufferPool, BufferPoolConfig, IoBufsMut, Runner as _, Storage as _,
1225 };
1226 use commonware_codec::ReadExt;
1227 use commonware_macros::test_traced;
1228 use commonware_utils::{NZUsize, NZU16, NZU32};
1229 use std::{
1230 num::NonZeroU16,
1231 sync::{
1232 atomic::{AtomicUsize, Ordering},
1233 Arc,
1234 },
1235 };
1236
1237 const PAGE_SIZE: NonZeroU16 = NZU16!(103); const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
1239
1240 #[test_traced("DEBUG")]
1241 fn test_read_many_into_empty() {
1242 let executor = deterministic::Runner::default();
1243 executor.start(|context: deterministic::Context| async move {
1244 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1245 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1246 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1247 .await
1248 .unwrap();
1249
1250 append.append(&[0u8; 8]).await.unwrap();
1251 assert_eq!(append.size().await, 8);
1252
1253 let mut buf = [];
1255 append.read_many_into(&mut buf, &[], 4).await.unwrap();
1256 });
1257 }
1258
1259 #[test_traced("DEBUG")]
1260 fn test_read_many_into_all_in_tip() {
1261 let executor = deterministic::Runner::default();
1263 executor.start(|context: deterministic::Context| async move {
1264 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1265 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1266 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1267 .await
1268 .unwrap();
1269
1270 let data: Vec<u8> = (0..20).collect();
1271 append.append(&data).await.unwrap();
1272 assert_eq!(append.size().await, 20);
1273
1274 let offsets = [0u64, 4, 8, 12, 16];
1276 let mut buf = vec![0u8; 5 * 4];
1277 append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1278
1279 for (i, &off) in offsets.iter().enumerate() {
1280 assert_eq!(
1281 &buf[i * 4..(i + 1) * 4],
1282 &data[off as usize..off as usize + 4],
1283 );
1284 }
1285 });
1286 }
1287
1288 #[test_traced("DEBUG")]
1289 fn test_try_read_sync_all_in_tip() {
1290 let executor = deterministic::Runner::default();
1291 executor.start(|context: deterministic::Context| async move {
1292 let (blob, blob_size) = context
1293 .open("test_partition", b"try_read_sync_tip")
1294 .await
1295 .unwrap();
1296 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1297 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1298 .await
1299 .unwrap();
1300
1301 let data: Vec<u8> = (0..20).collect();
1302 append.append(&data).await.unwrap();
1303
1304 let mut buf = vec![0u8; data.len()];
1305 assert!(append.try_read_sync(0, &mut buf));
1306 assert_eq!(buf, data);
1307 });
1308 }
1309
1310 #[test_traced("DEBUG")]
1311 fn test_read_many_into_all_from_cache() {
1312 let executor = deterministic::Runner::default();
1314 executor.start(|context: deterministic::Context| async move {
1315 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1316 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1317 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1318 .await
1319 .unwrap();
1320
1321 let data: Vec<u8> = (0..20).collect();
1322 append.append(&data).await.unwrap();
1323 append.sync().await.unwrap();
1324 assert_eq!(append.size().await, 20);
1325
1326 let offsets = [0u64, 8, 16];
1327 let mut buf = vec![0u8; 3 * 4];
1328 append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1329
1330 for (i, &off) in offsets.iter().enumerate() {
1331 assert_eq!(
1332 &buf[i * 4..(i + 1) * 4],
1333 &data[off as usize..off as usize + 4],
1334 );
1335 }
1336 });
1337 }
1338
1339 #[test_traced("DEBUG")]
1340 fn test_read_many_into_mixed_tip_and_cache() {
1341 let executor = deterministic::Runner::default();
1343 executor.start(|context: deterministic::Context| async move {
1344 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1345 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1346 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1347 .await
1348 .unwrap();
1349
1350 let first: Vec<u8> = (0..16).collect();
1351 append.append(&first).await.unwrap();
1352 append.sync().await.unwrap();
1353
1354 let second: Vec<u8> = (16..32).collect();
1355 append.append(&second).await.unwrap();
1356 assert_eq!(append.size().await, 32);
1357
1358 let offsets = [0u64, 4, 16, 24];
1360 let mut buf = vec![0u8; 4 * 4];
1361 append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1362
1363 let all: Vec<u8> = (0..32).collect();
1364 for (i, &off) in offsets.iter().enumerate() {
1365 assert_eq!(
1366 &buf[i * 4..(i + 1) * 4],
1367 &all[off as usize..off as usize + 4],
1368 );
1369 }
1370 });
1371 }
1372
1373 #[test_traced("DEBUG")]
1374 fn test_read_many_into_out_of_bounds() {
1375 let executor = deterministic::Runner::default();
1376 executor.start(|context: deterministic::Context| async move {
1377 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1378 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1379 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1380 .await
1381 .unwrap();
1382
1383 append.append(&[0u8; 8]).await.unwrap();
1384 assert_eq!(append.size().await, 8);
1385
1386 let mut buf = vec![0u8; 4];
1388 let err = append.read_many_into(&mut buf, &[8], 4).await.unwrap_err();
1389 assert!(matches!(err, Error::BlobInsufficientLength));
1390 });
1391 }
1392
1393 #[test_traced("DEBUG")]
1394 fn test_read_many_into_single_item() {
1395 let executor = deterministic::Runner::default();
1396 executor.start(|context: deterministic::Context| async move {
1397 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1398 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1399 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1400 .await
1401 .unwrap();
1402
1403 let data = vec![0xAA; 8];
1404 append.append(&data).await.unwrap();
1405 assert_eq!(append.size().await, 8);
1406
1407 let mut buf = vec![0u8; 8];
1408 append.read_many_into(&mut buf, &[0], 8).await.unwrap();
1409 assert_eq!(&buf, &data);
1410 });
1411 }
1412
1413 #[test]
1414 #[should_panic(expected = "read_many_into requires buf.len() == offsets.len() * item_size")]
1415 fn test_read_many_into_short_buffer_panics() {
1416 let executor = deterministic::Runner::default();
1417 executor.start(|context: deterministic::Context| async move {
1418 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1419 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1420 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1421 .await
1422 .unwrap();
1423
1424 let data: Vec<u8> = (0..16).collect();
1425 append.append(&data).await.unwrap();
1426
1427 let offsets = [0u64, 4];
1428 let mut buf = vec![0u8; 7];
1429 append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1430 });
1431 }
1432
1433 #[test_traced("DEBUG")]
1434 fn test_read_many_into_matches_read_at() {
1435 let executor = deterministic::Runner::default();
1437 executor.start(|context: deterministic::Context| async move {
1438 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1439 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1440 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1441 .await
1442 .unwrap();
1443
1444 let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
1446 append.append(&data).await.unwrap();
1447 append.sync().await.unwrap();
1448 let more: Vec<u8> = (0u8..50).collect();
1450 append.append(&more).await.unwrap();
1451 assert_eq!(append.size().await, 350);
1452
1453 let item_size = 10;
1454 let offsets: Vec<u64> = (0..35).map(|i| i * item_size as u64).collect();
1455 let mut batch_buf = vec![0u8; offsets.len() * item_size];
1456 append
1457 .read_many_into(&mut batch_buf, &offsets, item_size)
1458 .await
1459 .unwrap();
1460
1461 for (i, &off) in offsets.iter().enumerate() {
1463 let single = append.read_at(off, item_size).await.unwrap().coalesce();
1464 assert_eq!(
1465 &batch_buf[i * item_size..(i + 1) * item_size],
1466 single.as_ref(),
1467 "mismatch at offset {off}",
1468 );
1469 }
1470 });
1471 }
1472
1473 #[test_traced("DEBUG")]
1474 fn test_read_many_into_scattered_cache_misses() {
1475 let executor = deterministic::Runner::default();
1479 executor.start(|context: deterministic::Context| async move {
1480 let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1481 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(2));
1483 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1484 .await
1485 .unwrap();
1486
1487 let synced: Vec<u8> = (0u8..=255)
1489 .cycle()
1490 .take(PAGE_SIZE.get() as usize * 3)
1491 .collect();
1492 append.append(&synced).await.unwrap();
1493 append.sync().await.unwrap();
1494
1495 let item_size = 10;
1498 let tip_len = PAGE_SIZE.get() as usize / 2;
1499 let tip: Vec<u8> = (100u8..=255).cycle().take(tip_len).collect();
1500 append.append(&tip).await.unwrap();
1501
1502 let _ = append.read_at(0, item_size).await.unwrap();
1504 let _ = append
1505 .read_at(PAGE_SIZE.get() as u64 * 2, item_size)
1506 .await
1507 .unwrap();
1508
1509 let straddle_off = synced.len() as u64 - (item_size as u64 / 2);
1512 let tip_off = synced.len() as u64 + item_size as u64;
1513 let offsets = [
1514 0u64, PAGE_SIZE.get() as u64, PAGE_SIZE.get() as u64 * 2, straddle_off, tip_off, ];
1520 let mut buf = vec![0u8; offsets.len() * item_size];
1521 append
1522 .read_many_into(&mut buf, &offsets, item_size)
1523 .await
1524 .unwrap();
1525
1526 let read: Vec<u8> = synced.iter().chain(tip.iter()).copied().collect();
1527 for (i, &off) in offsets.iter().enumerate() {
1528 assert_eq!(
1529 &buf[i * item_size..(i + 1) * item_size],
1530 &read[off as usize..off as usize + item_size],
1531 );
1532 }
1533 });
1534 }
1535
1536 #[test_traced("DEBUG")]
1537 fn test_append_crc_empty() {
1538 let executor = deterministic::Runner::default();
1539 executor.start(|context: deterministic::Context| async move {
1540 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1542 assert_eq!(blob_size, 0);
1543
1544 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1546
1547 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1549 .await
1550 .unwrap();
1551
1552 assert_eq!(append.size().await, 0);
1554
1555 append.sync().await.unwrap();
1557 drop(append);
1558
1559 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1560 assert_eq!(blob_size, 0); let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1563 .await
1564 .unwrap();
1565
1566 assert_eq!(append.size().await, 0);
1567 });
1568 }
1569
1570 #[test_traced("DEBUG")]
1571 fn test_append_crc_basic() {
1572 let executor = deterministic::Runner::default();
1573 executor.start(|context: deterministic::Context| async move {
1574 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1576 assert_eq!(blob_size, 0);
1577
1578 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1580
1581 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1583 .await
1584 .unwrap();
1585
1586 assert_eq!(append.size().await, 0);
1588
1589 let data = vec![1, 2, 3, 4, 5];
1591 append.append(&data).await.unwrap();
1592
1593 assert_eq!(append.size().await, 5);
1595
1596 let more_data = vec![6, 7, 8, 9, 10];
1598 append.append(&more_data).await.unwrap();
1599
1600 assert_eq!(append.size().await, 10);
1602
1603 let read_buf = append.read_at(0, 5).await.unwrap().coalesce();
1605 assert_eq!(read_buf, &data[..]);
1606
1607 let read_buf = append.read_at(5, 5).await.unwrap().coalesce();
1609 assert_eq!(read_buf, &more_data[..]);
1610
1611 let read_buf = append.read_at(0, 10).await.unwrap().coalesce();
1613 assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1614
1615 append.sync().await.unwrap();
1618 drop(append);
1619
1620 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1621 assert_eq!(blob_size, 115);
1623 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1624 .await
1625 .unwrap();
1626 assert_eq!(append.size().await, 10); let spanning_data: Vec<u8> = (11..=110).collect();
1632 append.append(&spanning_data).await.unwrap();
1633 assert_eq!(append.size().await, 110);
1634
1635 let read_buf = append.read_at(10, 100).await.unwrap().coalesce();
1637 assert_eq!(read_buf, &spanning_data[..]);
1638
1639 let read_buf = append.read_at(0, 110).await.unwrap().coalesce();
1641 let expected: Vec<u8> = (1..=110).collect();
1642 assert_eq!(read_buf, &expected[..]);
1643
1644 append.sync().await.unwrap();
1646 drop(append);
1647
1648 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1649 assert_eq!(blob_size, 230);
1651 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1652 .await
1653 .unwrap();
1654 assert_eq!(append.size().await, 110);
1655
1656 let boundary_data: Vec<u8> = (111..=206).collect();
1660 assert_eq!(boundary_data.len(), 96);
1661 append.append(&boundary_data).await.unwrap();
1662 assert_eq!(append.size().await, 206);
1663
1664 let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
1666 let expected: Vec<u8> = (1..=206).collect();
1667 assert_eq!(read_buf, &expected[..]);
1668
1669 append.sync().await.unwrap();
1671 drop(append);
1672
1673 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1674 assert_eq!(blob_size, 230);
1676 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1677 .await
1678 .unwrap();
1679 assert_eq!(append.size().await, 206);
1680
1681 let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
1683 assert_eq!(read_buf, &expected[..]);
1684 });
1685 }
1686
1687 #[test_traced("DEBUG")]
1688 fn test_sync_releases_tip_pool_slot_after_full_drain() {
1689 let executor = deterministic::Runner::default();
1690 executor.start(|context: deterministic::Context| async move {
1691 let mut registry = Registry::default();
1692 let pool = BufferPool::new(
1693 BufferPoolConfig::for_storage()
1694 .with_pool_min_size(PAGE_SIZE.get() as usize)
1695 .with_max_per_class(NZU32!(2)),
1696 &mut registry,
1697 );
1698 let cache_ref = CacheRef::new(pool.clone(), PAGE_SIZE, NZUsize!(1));
1699
1700 let (blob, blob_size) = context
1701 .open("test_partition", b"release_tip_backing")
1702 .await
1703 .unwrap();
1704 assert_eq!(blob_size, 0);
1705
1706 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1707 .await
1708 .unwrap();
1709
1710 append
1711 .append(&vec![7; PAGE_SIZE.get() as usize])
1712 .await
1713 .unwrap();
1714
1715 assert!(
1717 matches!(
1718 pool.try_alloc(BUFFER_SIZE),
1719 Err(crate::iobuf::PoolError::Exhausted)
1720 ),
1721 "full-page tip should occupy the remaining pooled slot before sync"
1722 );
1723
1724 append.sync().await.unwrap();
1725
1726 assert!(
1728 pool.try_alloc(BUFFER_SIZE).is_ok(),
1729 "sync should release pooled backing when no partial tail remains"
1730 );
1731 });
1732 }
1733
1734 #[test_traced("DEBUG")]
1735 fn test_sync_uses_range_sync_for_single_flush() {
1736 let executor = deterministic::Runner::default();
1737 executor.start(|context: deterministic::Context| async move {
1738 let blob = SyncTrackingBlob::new();
1739 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1740 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1741 .await
1742 .unwrap();
1743
1744 append.sync().await.unwrap();
1746 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1747 assert_eq!(writes, 0);
1748 assert_eq!(full_syncs, 1);
1749 assert_eq!(range_syncs, 0);
1750
1751 let data = b"hello world";
1753 append.append(data).await.unwrap();
1754 append.sync().await.unwrap();
1755
1756 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1757 assert_eq!(writes, 1);
1758 assert_eq!(full_syncs, 1);
1759 assert_eq!(range_syncs, 1);
1760
1761 append.sync().await.unwrap();
1763 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1764 assert_eq!(writes, 1);
1765 assert_eq!(full_syncs, 1);
1766 assert_eq!(range_syncs, 1);
1767
1768 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1769 let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1770 .await
1771 .unwrap();
1772 let read = reopened.read_at(0, data.len()).await.unwrap().coalesce();
1773 assert_eq!(read.as_ref(), data);
1774 });
1775 }
1776
1777 #[test_traced("DEBUG")]
1778 fn test_sync_failed_range_sync_does_not_mark_clean() {
1779 let executor = deterministic::Runner::default();
1780 executor.start(|context: deterministic::Context| async move {
1781 let name = b"failed_range_sync";
1782 let (blob, size) = context.open("test_partition", name).await.unwrap();
1783 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1784 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
1785 .await
1786 .unwrap();
1787
1788 append.append(b"abc").await.unwrap();
1790
1791 context.remove("test_partition", Some(name)).await.unwrap();
1793 assert!(append.sync().await.is_err());
1794
1795 assert!(append.sync().await.is_err());
1798 });
1799 }
1800
1801 #[test_traced("DEBUG")]
1802 fn test_sync_uses_full_sync_after_prior_plain_flush() {
1803 let executor = deterministic::Runner::default();
1804 executor.start(|context: deterministic::Context| async move {
1805 let blob = SyncTrackingBlob::new();
1806 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1807 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1808 .await
1809 .unwrap();
1810
1811 let data = vec![7u8; BUFFER_SIZE + 1];
1814 append.append(&data).await.unwrap();
1815 append.sync().await.unwrap();
1816
1817 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1818 assert_eq!(writes, 2);
1819 assert_eq!(full_syncs, 1);
1820 assert_eq!(range_syncs, 0);
1821
1822 append.sync().await.unwrap();
1824 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1825 assert_eq!(writes, 2);
1826 assert_eq!(full_syncs, 1);
1827 assert_eq!(range_syncs, 0);
1828
1829 append.append(b"tip").await.unwrap();
1832 append.sync().await.unwrap();
1833
1834 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1835 assert_eq!(writes, 4);
1836 assert_eq!(full_syncs, 2);
1837 assert_eq!(range_syncs, 0);
1838
1839 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1840 let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1841 .await
1842 .unwrap();
1843 let mut expected = data;
1844 expected.extend_from_slice(b"tip");
1845 let read = reopened
1846 .read_at(0, expected.len())
1847 .await
1848 .unwrap()
1849 .coalesce();
1850 assert_eq!(read.as_ref(), expected.as_slice());
1851 });
1852 }
1853
1854 #[test_traced("DEBUG")]
1855 fn test_sync_uses_full_sync_after_replay_plain_flush() {
1856 let executor = deterministic::Runner::default();
1857 executor.start(|context: deterministic::Context| async move {
1858 let blob = SyncTrackingBlob::new();
1859 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1860 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1861 .await
1862 .unwrap();
1863
1864 append.append(b"replayed").await.unwrap();
1866
1867 let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1869 assert!(replay.ensure(b"replayed".len()).await.unwrap());
1870 assert_eq!(replay.remaining(), b"replayed".len());
1871 assert_eq!(replay.chunk(), b"replayed");
1872
1873 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1874 assert_eq!(writes, 1);
1875 assert_eq!(full_syncs, 0);
1876 assert_eq!(range_syncs, 0);
1877
1878 append.sync().await.unwrap();
1880 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1881 assert_eq!(writes, 1);
1882 assert_eq!(full_syncs, 1);
1883 assert_eq!(range_syncs, 0);
1884 });
1885 }
1886
1887 #[test_traced("DEBUG")]
1888 fn test_recreated_sync_preserves_replay_plain_flush_barrier() {
1889 let executor = deterministic::Runner::default();
1890 executor.start(|context: deterministic::Context| async move {
1891 let blob = SyncTrackingBlob::new();
1892 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1893 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1894 .await
1895 .unwrap();
1896
1897 append.append(b"replayed").await.unwrap();
1898 let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1899 assert!(replay.ensure(b"replayed".len()).await.unwrap());
1900 assert_eq!(replay.remaining(), b"replayed".len());
1901 assert_eq!(replay.chunk(), b"replayed");
1902 drop(replay);
1903 drop(append);
1904
1905 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1906 assert!(durable.is_empty());
1907 assert_eq!(writes, 1);
1908 assert_eq!(full_syncs, 0);
1909 assert_eq!(range_syncs, 0);
1910
1911 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1912 let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1913 .await
1914 .unwrap();
1915 assert_eq!(reopened.size().await, b"replayed".len() as u64);
1916 reopened.sync().await.unwrap();
1917
1918 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1919 assert_eq!(durable.len(), blob.size() as usize);
1920 assert_eq!(writes, 1);
1921 assert_eq!(full_syncs, 1);
1922 assert_eq!(range_syncs, 0);
1923 });
1924 }
1925
1926 #[test_traced("DEBUG")]
1927 fn test_recreated_sync_skips_barrier_after_invalid_truncation() {
1928 let executor = deterministic::Runner::default();
1929 executor.start(|context: deterministic::Context| async move {
1930 let blob = SyncTrackingBlob::new();
1931 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1932 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1933 .await
1934 .unwrap();
1935 append.sync().await.unwrap();
1936 append.append(b"valid").await.unwrap();
1937 append.sync().await.unwrap();
1938 drop(append);
1939
1940 blob.write_at(blob.size(), b"junk").await.unwrap();
1941
1942 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1943 let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1944 .await
1945 .unwrap();
1946 assert_eq!(reopened.size().await, b"valid".len() as u64);
1947
1948 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1949 assert_eq!(writes, 2);
1950 assert_eq!(full_syncs, 2);
1951 assert_eq!(range_syncs, 1);
1952
1953 reopened.sync().await.unwrap();
1954
1955 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1956 assert_eq!(writes, 2);
1957 assert_eq!(full_syncs, 2);
1958 assert_eq!(range_syncs, 1);
1959 });
1960 }
1961
1962 #[test_traced("DEBUG")]
1963 fn test_sync_batches_split_protected_writes_with_full_sync() {
1964 let executor = deterministic::Runner::default();
1965 executor.start(|context: deterministic::Context| async move {
1966 let blob = SyncTrackingBlob::new();
1967 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1968 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1969 .await
1970 .unwrap();
1971 append.sync().await.unwrap();
1972
1973 append.append(b"abc").await.unwrap();
1975 append.sync().await.unwrap();
1976
1977 append.append(b"de").await.unwrap();
1980 append.sync().await.unwrap();
1981
1982 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1983 assert_eq!(writes, 3);
1984 assert_eq!(full_syncs, 2);
1985 assert_eq!(range_syncs, 1);
1986
1987 append.append(b"fg").await.unwrap();
1990 append.sync().await.unwrap();
1991
1992 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1993 assert_eq!(writes, 4);
1994 assert_eq!(full_syncs, 2);
1995 assert_eq!(range_syncs, 2);
1996
1997 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1998 let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1999 .await
2000 .unwrap();
2001 let read = reopened.read_at(0, 7).await.unwrap().coalesce();
2002 assert_eq!(read.as_ref(), b"abcdefg");
2003 });
2004 }
2005
2006 #[test_traced("DEBUG")]
2007 fn test_read_up_to_zero_len_truncates_buffer() {
2008 let executor = deterministic::Runner::default();
2009 executor.start(|context: deterministic::Context| async move {
2010 let (blob, blob_size) = context
2012 .open("test_partition", b"read_up_to_zero_len")
2013 .await
2014 .unwrap();
2015 assert_eq!(blob_size, 0);
2016
2017 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2019
2020 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
2022 .await
2023 .unwrap();
2024 append.append(&[1, 2, 3, 4]).await.unwrap();
2025
2026 let stale = vec![9, 8, 7, 6];
2028 let (buf, read) = append.read_up_to(0, 0, stale).await.unwrap();
2029
2030 assert_eq!(read, 0);
2031 assert_eq!(buf.len(), 0, "read_up_to must truncate returned buffer");
2032 assert_eq!(buf.freeze().as_ref(), b"");
2033 });
2034 }
2035
2036 fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
2038 let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
2039 Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
2040 }
2041
2042 #[derive(Clone)]
2044 struct PartialWriteBlob<B: Blob> {
2045 inner: B,
2046 writes: Arc<AtomicUsize>,
2047 failed_write_len: Arc<AtomicUsize>,
2048 fail_on: usize,
2049 partial_len: usize,
2050 }
2051
2052 impl<B: Blob> PartialWriteBlob<B> {
2053 fn new(inner: B, fail_on: usize, partial_len: usize) -> Self {
2054 Self {
2055 inner,
2056 writes: Arc::new(AtomicUsize::new(0)),
2057 failed_write_len: Arc::new(AtomicUsize::new(0)),
2058 fail_on,
2059 partial_len,
2060 }
2061 }
2062
2063 fn failed_write_len(&self) -> Arc<AtomicUsize> {
2064 self.failed_write_len.clone()
2065 }
2066
2067 fn write_count(&self) -> Arc<AtomicUsize> {
2068 self.writes.clone()
2069 }
2070 }
2071
2072 impl<B: Blob> crate::Blob for PartialWriteBlob<B> {
2073 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
2074 self.inner.read_at(offset, len).await
2075 }
2076
2077 async fn read_at_buf(
2078 &self,
2079 offset: u64,
2080 len: usize,
2081 bufs: impl Into<IoBufsMut> + Send,
2082 ) -> Result<IoBufsMut, Error> {
2083 self.inner.read_at_buf(offset, len, bufs).await
2084 }
2085
2086 async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
2087 let bufs = bufs.into();
2088 let write = self.writes.fetch_add(1, Ordering::SeqCst) + 1;
2089 if write == self.fail_on {
2090 let bytes = bufs.coalesce();
2091 self.failed_write_len.store(bytes.len(), Ordering::SeqCst);
2092 let partial_len = self.partial_len.min(bytes.len());
2093 self.inner
2094 .write_at(offset, bytes.slice(..partial_len))
2095 .await?;
2096 self.inner.sync().await?;
2097 return Err(Error::Io(std::io::Error::other("injected partial write")));
2098 }
2099
2100 self.inner.write_at(offset, bufs).await
2101 }
2102
2103 async fn write_at_sync(
2104 &self,
2105 offset: u64,
2106 bufs: impl Into<IoBufs> + Send,
2107 ) -> Result<(), Error> {
2108 let bufs = bufs.into();
2109 let write = self.writes.fetch_add(1, Ordering::SeqCst) + 1;
2110 if write == self.fail_on {
2111 let bytes = bufs.coalesce();
2112 self.failed_write_len.store(bytes.len(), Ordering::SeqCst);
2113 let partial_len = self.partial_len.min(bytes.len());
2114 self.inner
2115 .write_at_sync(offset, bytes.slice(..partial_len))
2116 .await?;
2117 return Err(Error::Io(std::io::Error::other("injected partial write")));
2118 }
2119
2120 self.inner.write_at_sync(offset, bufs).await
2121 }
2122
2123 async fn resize(&self, len: u64) -> Result<(), Error> {
2124 self.inner.resize(len).await
2125 }
2126
2127 async fn sync(&self) -> Result<(), Error> {
2128 self.inner.sync().await
2129 }
2130 }
2131
2132 const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
2135
2136 #[test]
2137 fn test_identify_protected_regions_equal_lengths() {
2138 let record = Checksum {
2140 len1: 50,
2141 crc1: 0xAAAAAAAA,
2142 len2: 50,
2143 crc2: 0xBBBBBBBB,
2144 };
2145
2146 let result =
2147 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
2148 assert!(result.is_some());
2149 let (prefix_len, protected_crc) = result.unwrap();
2150 assert_eq!(prefix_len, 50);
2151 assert!(
2152 matches!(protected_crc, ProtectedCrc::First),
2153 "First CRC should be protected when lengths are equal"
2154 );
2155 }
2156
2157 #[test]
2158 fn test_identify_protected_regions_len1_larger() {
2159 let record = Checksum {
2161 len1: 100,
2162 crc1: 0xAAAAAAAA,
2163 len2: 50,
2164 crc2: 0xBBBBBBBB,
2165 };
2166
2167 let result =
2168 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
2169 assert!(result.is_some());
2170 let (prefix_len, protected_crc) = result.unwrap();
2171 assert_eq!(prefix_len, 100);
2172 assert!(
2173 matches!(protected_crc, ProtectedCrc::First),
2174 "First CRC should be protected when len1 > len2"
2175 );
2176 }
2177
2178 #[test]
2179 fn test_identify_protected_regions_len2_larger() {
2180 let record = Checksum {
2182 len1: 50,
2183 crc1: 0xAAAAAAAA,
2184 len2: 100,
2185 crc2: 0xBBBBBBBB,
2186 };
2187
2188 let result =
2189 Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
2190 assert!(result.is_some());
2191 let (prefix_len, protected_crc) = result.unwrap();
2192 assert_eq!(prefix_len, 100);
2193 assert!(
2194 matches!(protected_crc, ProtectedCrc::Second),
2195 "Second CRC should be protected when len2 > len1"
2196 );
2197 }
2198
2199 #[test_traced("DEBUG")]
2202 fn test_to_physical_pages_zero_copy_full_pages_and_materialized_partial() {
2203 let executor = deterministic::Runner::default();
2210 executor.start(|context: deterministic::Context| async move {
2211 let (blob, blob_size) = context
2213 .open("test_partition", b"to_physical_pages_zero_copy")
2214 .await
2215 .unwrap();
2216 assert_eq!(blob_size, 0);
2217
2218 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2220
2221 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
2223 .await
2224 .unwrap();
2225
2226 let logical_page_size = PAGE_SIZE.get() as usize;
2229 let partial_len = 17usize;
2230 let data: Vec<u8> = (0..(logical_page_size * 2 + partial_len))
2231 .map(|i| (i % 251) as u8)
2232 .collect();
2233
2234 let mut buffer = Buffer::new(0, data.len(), cache_ref.pool().clone());
2236 let over_capacity = buffer.append(&data);
2237 assert!(!over_capacity);
2238
2239 let (physical_pages, partial_page_state) =
2241 append.to_physical_pages(&buffer, true, None);
2242
2243 assert_eq!(physical_pages.chunk_count(), 5);
2246
2247 let crc_record = partial_page_state.expect("partial page state must be returned");
2249 let (len, _) = crc_record.get_crc();
2250 assert_eq!(len as usize, partial_len);
2251
2252 let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
2255 let coalesced = physical_pages.coalesce();
2256 assert_eq!(coalesced.len(), physical_page_size * 3);
2257
2258 assert_eq!(
2260 &coalesced.as_ref()[..logical_page_size],
2261 &data[..logical_page_size]
2262 );
2263 assert_eq!(
2264 &coalesced.as_ref()[physical_page_size..physical_page_size + logical_page_size],
2265 &data[logical_page_size..logical_page_size * 2],
2266 );
2267
2268 let partial_start = physical_page_size * 2;
2271 assert_eq!(
2272 &coalesced.as_ref()[partial_start..partial_start + partial_len],
2273 &data[logical_page_size * 2..],
2274 );
2275 assert!(coalesced.as_ref()
2276 [partial_start + partial_len..partial_start + logical_page_size]
2277 .iter()
2278 .all(|byte| *byte == 0));
2279
2280 assert!(Checksum::validate_page(&coalesced.as_ref()[..physical_page_size]).is_some());
2282 assert!(Checksum::validate_page(
2283 &coalesced.as_ref()[physical_page_size..physical_page_size * 2]
2284 )
2285 .is_some());
2286 assert!(Checksum::validate_page(
2287 &coalesced.as_ref()[physical_page_size * 2..physical_page_size * 3]
2288 )
2289 .is_some());
2290 });
2291 }
2292
2293 #[test_traced("DEBUG")]
2299 fn test_crc_slot1_protected() {
2300 let executor = deterministic::Runner::default();
2301 executor.start(|context: deterministic::Context| async move {
2302 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2303 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2304 let slot0_offset = PAGE_SIZE.get() as u64;
2305 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
2306
2307 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
2309 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2310 .await
2311 .unwrap();
2312 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
2313 append.sync().await.unwrap();
2314 drop(append);
2315
2316 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
2318 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2319 .await
2320 .unwrap();
2321 append
2322 .append(&(11..=30).collect::<Vec<u8>>())
2323 .await
2324 .unwrap();
2325 append.sync().await.unwrap();
2326 drop(append);
2327
2328 let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
2330 let page = blob
2331 .read_at(0, physical_page_size)
2332 .await
2333 .unwrap()
2334 .coalesce();
2335 let crc = read_crc_record_from_page(page.as_ref());
2336 assert!(
2337 crc.len2 > crc.len1,
2338 "Slot 1 should be authoritative (len2={} > len1={})",
2339 crc.len2,
2340 crc.len1
2341 );
2342
2343 let slot1_before: Vec<u8> = blob
2345 .read_at(slot1_offset, 6)
2346 .await
2347 .unwrap()
2348 .coalesce()
2349 .freeze()
2350 .into();
2351
2352 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
2354 .await
2355 .unwrap();
2356 blob.sync().await.unwrap();
2357
2358 let slot0_mangled: Vec<u8> = blob
2360 .read_at(slot0_offset, 6)
2361 .await
2362 .unwrap()
2363 .coalesce()
2364 .freeze()
2365 .into();
2366 assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
2367
2368 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2370 .await
2371 .unwrap();
2372 append
2373 .append(&(31..=50).collect::<Vec<u8>>())
2374 .await
2375 .unwrap();
2376 append.sync().await.unwrap();
2377 drop(append);
2378
2379 let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
2381
2382 let slot0_after: Vec<u8> = blob
2384 .read_at(slot0_offset, 6)
2385 .await
2386 .unwrap()
2387 .coalesce()
2388 .freeze()
2389 .into();
2390 assert_ne!(
2391 slot0_after, DUMMY_MARKER,
2392 "Slot 0 should have been overwritten with new CRC"
2393 );
2394
2395 let slot1_after: Vec<u8> = blob
2397 .read_at(slot1_offset, 6)
2398 .await
2399 .unwrap()
2400 .coalesce()
2401 .freeze()
2402 .into();
2403 assert_eq!(
2404 slot1_before, slot1_after,
2405 "Slot 1 was modified! Protected region violated."
2406 );
2407
2408 let page = blob
2410 .read_at(0, physical_page_size)
2411 .await
2412 .unwrap()
2413 .coalesce();
2414 let crc = read_crc_record_from_page(page.as_ref());
2415 assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
2416 });
2417 }
2418
2419 #[test_traced("DEBUG")]
2425 fn test_crc_slot0_protected() {
2426 let executor = deterministic::Runner::default();
2427 executor.start(|context: deterministic::Context| async move {
2428 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2429 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2430 let slot0_offset = PAGE_SIZE.get() as u64;
2431 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
2432
2433 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
2435 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2436 .await
2437 .unwrap();
2438 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
2439 append.sync().await.unwrap();
2440 drop(append);
2441
2442 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
2444 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2445 .await
2446 .unwrap();
2447 append
2448 .append(&(11..=30).collect::<Vec<u8>>())
2449 .await
2450 .unwrap();
2451 append.sync().await.unwrap();
2452 drop(append);
2453
2454 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
2456 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2457 .await
2458 .unwrap();
2459 append
2460 .append(&(31..=50).collect::<Vec<u8>>())
2461 .await
2462 .unwrap();
2463 append.sync().await.unwrap();
2464 drop(append);
2465
2466 let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
2468 let page = blob
2469 .read_at(0, physical_page_size)
2470 .await
2471 .unwrap()
2472 .coalesce();
2473 let crc = read_crc_record_from_page(page.as_ref());
2474 assert!(
2475 crc.len1 > crc.len2,
2476 "Slot 0 should be authoritative (len1={} > len2={})",
2477 crc.len1,
2478 crc.len2
2479 );
2480
2481 let slot0_before: Vec<u8> = blob
2483 .read_at(slot0_offset, 6)
2484 .await
2485 .unwrap()
2486 .coalesce()
2487 .freeze()
2488 .into();
2489
2490 blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
2492 .await
2493 .unwrap();
2494 blob.sync().await.unwrap();
2495
2496 let slot1_mangled: Vec<u8> = blob
2498 .read_at(slot1_offset, 6)
2499 .await
2500 .unwrap()
2501 .coalesce()
2502 .freeze()
2503 .into();
2504 assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
2505
2506 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2508 .await
2509 .unwrap();
2510 append
2511 .append(&(51..=70).collect::<Vec<u8>>())
2512 .await
2513 .unwrap();
2514 append.sync().await.unwrap();
2515 drop(append);
2516
2517 let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
2519
2520 let slot1_after: Vec<u8> = blob
2522 .read_at(slot1_offset, 6)
2523 .await
2524 .unwrap()
2525 .coalesce()
2526 .freeze()
2527 .into();
2528 assert_ne!(
2529 slot1_after, DUMMY_MARKER,
2530 "Slot 1 should have been overwritten with new CRC"
2531 );
2532
2533 let slot0_after: Vec<u8> = blob
2535 .read_at(slot0_offset, 6)
2536 .await
2537 .unwrap()
2538 .coalesce()
2539 .freeze()
2540 .into();
2541 assert_eq!(
2542 slot0_before, slot0_after,
2543 "Slot 0 was modified! Protected region violated."
2544 );
2545
2546 let page = blob
2548 .read_at(0, physical_page_size)
2549 .await
2550 .unwrap()
2551 .coalesce();
2552 let crc = read_crc_record_from_page(page.as_ref());
2553 assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
2554 });
2555 }
2556
2557 #[test_traced("DEBUG")]
2563 fn test_data_prefix_not_overwritten() {
2564 let executor = deterministic::Runner::default();
2565 executor.start(|context: deterministic::Context| async move {
2566 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2567 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2568
2569 let (blob, _) = context
2571 .open("test_partition", b"prefix_test")
2572 .await
2573 .unwrap();
2574 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2575 .await
2576 .unwrap();
2577 let data1: Vec<u8> = (1..=20).collect();
2578 append.append(&data1).await.unwrap();
2579 append.sync().await.unwrap();
2580 drop(append);
2581
2582 let (blob, size) = context
2584 .open("test_partition", b"prefix_test")
2585 .await
2586 .unwrap();
2587 assert_eq!(size, physical_page_size as u64);
2588
2589 let prefix_before: Vec<u8> = blob
2590 .read_at(0, 20)
2591 .await
2592 .unwrap()
2593 .coalesce()
2594 .freeze()
2595 .into();
2596
2597 blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
2599 blob.sync().await.unwrap();
2600
2601 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2603 .await
2604 .unwrap();
2605 append
2606 .append(&(21..=40).collect::<Vec<u8>>())
2607 .await
2608 .unwrap();
2609 append.sync().await.unwrap();
2610 drop(append);
2611
2612 let (blob, _) = context
2614 .open("test_partition", b"prefix_test")
2615 .await
2616 .unwrap();
2617
2618 let prefix_after: Vec<u8> = blob
2620 .read_at(0, 20)
2621 .await
2622 .unwrap()
2623 .coalesce()
2624 .freeze()
2625 .into();
2626 assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
2627
2628 let overwritten: Vec<u8> = blob
2630 .read_at(25, 6)
2631 .await
2632 .unwrap()
2633 .coalesce()
2634 .freeze()
2635 .into();
2636 assert_eq!(
2637 overwritten,
2638 vec![26, 27, 28, 29, 30, 31],
2639 "New data should overwrite padding area"
2640 );
2641 });
2642 }
2643
2644 #[test_traced("DEBUG")]
2650 fn test_crc_slot_protection_across_page_boundary() {
2651 let executor = deterministic::Runner::default();
2652 executor.start(|context: deterministic::Context| async move {
2653 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2654 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2655 let slot0_offset = PAGE_SIZE.get() as u64;
2656 let slot1_offset = PAGE_SIZE.get() as u64 + 6;
2657
2658 let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
2660 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2661 .await
2662 .unwrap();
2663 append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
2664 append.sync().await.unwrap();
2665 drop(append);
2666
2667 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
2669 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2670 .await
2671 .unwrap();
2672 append
2673 .append(&(51..=80).collect::<Vec<u8>>())
2674 .await
2675 .unwrap();
2676 append.sync().await.unwrap();
2677 drop(append);
2678
2679 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
2681 let page = blob
2682 .read_at(0, physical_page_size)
2683 .await
2684 .unwrap()
2685 .coalesce();
2686 let crc = read_crc_record_from_page(page.as_ref());
2687 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
2688
2689 let slot1_before: Vec<u8> = blob
2691 .read_at(slot1_offset, 6)
2692 .await
2693 .unwrap()
2694 .coalesce()
2695 .freeze()
2696 .into();
2697
2698 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
2700 .await
2701 .unwrap();
2702 blob.sync().await.unwrap();
2703
2704 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2706 .await
2707 .unwrap();
2708 append
2709 .append(&(81..=120).collect::<Vec<u8>>())
2710 .await
2711 .unwrap();
2712 append.sync().await.unwrap();
2713 drop(append);
2714
2715 let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
2717 assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
2718
2719 let slot0_after: Vec<u8> = blob
2721 .read_at(slot0_offset, 6)
2722 .await
2723 .unwrap()
2724 .coalesce()
2725 .freeze()
2726 .into();
2727 assert_ne!(
2728 slot0_after, DUMMY_MARKER,
2729 "Slot 0 should have full-page CRC"
2730 );
2731
2732 let slot1_after: Vec<u8> = blob
2734 .read_at(slot1_offset, 6)
2735 .await
2736 .unwrap()
2737 .coalesce()
2738 .freeze()
2739 .into();
2740 assert_eq!(
2741 slot1_before, slot1_after,
2742 "Slot 1 was modified during page boundary crossing!"
2743 );
2744
2745 let page0 = blob
2747 .read_at(0, physical_page_size)
2748 .await
2749 .unwrap()
2750 .coalesce();
2751 let crc0 = read_crc_record_from_page(page0.as_ref());
2752 assert_eq!(
2753 crc0.len1,
2754 PAGE_SIZE.get(),
2755 "Slot 0 should have full page length"
2756 );
2757
2758 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2760 .await
2761 .unwrap();
2762 assert_eq!(append.size().await, 120);
2763 let all_data: Vec<u8> = append.read_at(0, 120).await.unwrap().coalesce().into();
2764 let expected: Vec<u8> = (1..=120).collect();
2765 assert_eq!(all_data, expected);
2766 });
2767 }
2768
2769 #[test_traced("DEBUG")]
2778 fn test_crc_fallback_on_corrupted_primary() {
2779 let executor = deterministic::Runner::default();
2780 executor.start(|context: deterministic::Context| async move {
2781 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2782 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2783 let crc2_offset = PAGE_SIZE.get() as u64 + 8;
2785
2786 let (blob, _) = context
2788 .open("test_partition", b"crc_fallback")
2789 .await
2790 .unwrap();
2791 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2792 .await
2793 .unwrap();
2794 let data1: Vec<u8> = (1..=10).collect();
2795 append.append(&data1).await.unwrap();
2796 append.sync().await.unwrap();
2797 drop(append);
2798
2799 let (blob, size) = context
2801 .open("test_partition", b"crc_fallback")
2802 .await
2803 .unwrap();
2804 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2805 .await
2806 .unwrap();
2807 append
2808 .append(&(11..=30).collect::<Vec<u8>>())
2809 .await
2810 .unwrap();
2811 append.sync().await.unwrap();
2812 drop(append);
2813
2814 let (blob, size) = context
2816 .open("test_partition", b"crc_fallback")
2817 .await
2818 .unwrap();
2819 assert_eq!(size, physical_page_size as u64);
2820
2821 let page = blob
2822 .read_at(0, physical_page_size)
2823 .await
2824 .unwrap()
2825 .coalesce();
2826 let crc = read_crc_record_from_page(page.as_ref());
2827 assert!(
2828 crc.len2 > crc.len1,
2829 "Slot 1 should be authoritative (len2={} > len1={})",
2830 crc.len2,
2831 crc.len1
2832 );
2833 assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
2834 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
2835
2836 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
2838 .await
2839 .unwrap();
2840 assert_eq!(append.size().await, 30);
2841 let all_data: Vec<u8> = append.read_at(0, 30).await.unwrap().coalesce().into();
2842 let expected: Vec<u8> = (1..=30).collect();
2843 assert_eq!(all_data, expected);
2844 drop(append);
2845
2846 blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
2849 .await
2850 .unwrap();
2851 blob.sync().await.unwrap();
2852
2853 let page = blob
2855 .read_at(0, physical_page_size)
2856 .await
2857 .unwrap()
2858 .coalesce();
2859 let crc = read_crc_record_from_page(page.as_ref());
2860 assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
2861 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
2862
2863 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2865 .await
2866 .unwrap();
2867
2868 assert_eq!(
2870 append.size().await,
2871 10,
2872 "Should fall back to slot 0's 10 bytes after primary CRC corruption"
2873 );
2874
2875 let fallback_data: Vec<u8> = append.read_at(0, 10).await.unwrap().coalesce().into();
2877 assert_eq!(
2878 fallback_data, data1,
2879 "Fallback data should match original 10 bytes"
2880 );
2881
2882 let result = append.read_at(0, 11).await;
2884 assert!(result.is_err(), "Reading beyond fallback size should fail");
2885 });
2886 }
2887
2888 #[test_traced("DEBUG")]
2900 fn test_non_last_page_rejects_partial_fallback() {
2901 let executor = deterministic::Runner::default();
2902 executor.start(|context: deterministic::Context| async move {
2903 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2904 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2905 let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
2907
2908 let (blob, _) = context
2910 .open("test_partition", b"non_last_page")
2911 .await
2912 .unwrap();
2913 let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2914 .await
2915 .unwrap();
2916 append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
2917 append.sync().await.unwrap();
2918 drop(append);
2919
2920 let (blob, size) = context
2922 .open("test_partition", b"non_last_page")
2923 .await
2924 .unwrap();
2925 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2926 .await
2927 .unwrap();
2928 append
2930 .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
2931 .await
2932 .unwrap();
2933 append.sync().await.unwrap();
2934 drop(append);
2935
2936 let (blob, size) = context
2938 .open("test_partition", b"non_last_page")
2939 .await
2940 .unwrap();
2941 let page = blob
2942 .read_at(0, physical_page_size)
2943 .await
2944 .unwrap()
2945 .coalesce();
2946 let crc = read_crc_record_from_page(page.as_ref());
2947 assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
2948 assert_eq!(
2949 crc.len2,
2950 PAGE_SIZE.get(),
2951 "Slot 1 should have len=103 (full page)"
2952 );
2953 assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
2954
2955 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2957 .await
2958 .unwrap();
2959 append
2961 .append(&(104..=113).collect::<Vec<u8>>())
2962 .await
2963 .unwrap();
2964 append.sync().await.unwrap();
2965 drop(append);
2966
2967 let (blob, size) = context
2969 .open("test_partition", b"non_last_page")
2970 .await
2971 .unwrap();
2972 assert_eq!(
2973 size,
2974 (physical_page_size * 2) as u64,
2975 "Should have 2 physical pages"
2976 );
2977
2978 let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
2980 .await
2981 .unwrap();
2982 assert_eq!(append.size().await, 113);
2983 let all_data: Vec<u8> = append.read_at(0, 113).await.unwrap().coalesce().into();
2984 let expected: Vec<u8> = (1..=113).collect();
2985 assert_eq!(all_data, expected);
2986 drop(append);
2987
2988 blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
2990 .await
2991 .unwrap();
2992 blob.sync().await.unwrap();
2993
2994 let page = blob
2996 .read_at(0, physical_page_size)
2997 .await
2998 .unwrap()
2999 .coalesce();
3000 let crc = read_crc_record_from_page(page.as_ref());
3001 assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
3002 assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
3003 assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
3005
3006 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3012 .await
3013 .unwrap();
3014
3015 assert_eq!(append.size().await, 113);
3018
3019 let result = append.read_at(0, 10).await;
3022 assert!(
3023 result.is_err(),
3024 "Reading from corrupted non-last page via Append should fail, but got: {:?}",
3025 result
3026 );
3027 drop(append);
3028
3029 let (blob, size) = context
3031 .open("test_partition", b"non_last_page")
3032 .await
3033 .unwrap();
3034 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3035 .await
3036 .unwrap();
3037 let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
3038
3039 let result = replay.ensure(1).await;
3041 assert!(
3042 result.is_err(),
3043 "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
3044 result
3045 );
3046 });
3047 }
3048
3049 #[test]
3050 fn test_resize_shrink_validates_crc() {
3051 let executor = deterministic::Runner::default();
3054
3055 executor.start(|context| async move {
3056 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3057 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
3058
3059 let (blob, size) = context
3060 .open("test_partition", b"resize_crc_test")
3061 .await
3062 .unwrap();
3063
3064 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3065 .await
3066 .unwrap();
3067
3068 let data: Vec<u8> = (0..=249).collect();
3071 append.append(&data).await.unwrap();
3072 append.sync().await.unwrap();
3073 assert_eq!(append.size().await, 250);
3074 drop(append);
3075
3076 let (blob, size) = context
3078 .open("test_partition", b"resize_crc_test")
3079 .await
3080 .unwrap();
3081 assert_eq!(size as usize, physical_page_size * 3);
3082
3083 let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
3085 blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
3086 .await
3087 .unwrap();
3088 blob.sync().await.unwrap();
3089
3090 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3093 .await
3094 .unwrap();
3095 assert_eq!(append.size().await, 250);
3096
3097 let result = append.resize(150).await;
3101 assert!(
3102 matches!(result, Err(crate::Error::InvalidChecksum)),
3103 "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
3104 result
3105 );
3106 });
3107 }
3108
3109 #[test]
3110 fn test_resize_invalidates_cache() {
3111 let executor = deterministic::Runner::default();
3115 executor.start(|context: deterministic::Context| async move {
3116 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3117 let (blob, blob_size) = context
3118 .open("test_partition", b"resize_invalidates_cache")
3119 .await
3120 .unwrap();
3121 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
3122 .await
3123 .unwrap();
3124
3125 let page_size = PAGE_SIZE.get() as usize;
3128 let old_bytes = vec![0xAAu8; page_size];
3129 append.append(&old_bytes).await.unwrap();
3130 append.sync().await.unwrap();
3131
3132 let mut probe = vec![0u8; 16];
3134 assert!(append.try_read_sync(0, &mut probe));
3135 assert_eq!(probe, vec![0xAAu8; 16]);
3136
3137 append.resize(0).await.unwrap();
3139 let new_bytes = vec![0xBBu8; 16];
3140 append.append(&new_bytes).await.unwrap();
3141
3142 let mut probe = vec![0u8; 16];
3145 let hit = append.try_read_sync(0, &mut probe);
3146 assert!(
3147 !hit || probe == new_bytes,
3148 "try_read_sync served stale pre-resize bytes: {probe:?}"
3149 );
3150 });
3151 }
3152
3153 #[test]
3154 fn test_resize_same_size_is_noop() {
3155 let executor = deterministic::Runner::default();
3156 executor.start(|context: deterministic::Context| async move {
3157 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3158 let (blob, blob_size) = context
3159 .open("test_partition", b"resize_same_size")
3160 .await
3161 .unwrap();
3162 let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
3163 .await
3164 .unwrap();
3165
3166 append.append(b"hello world").await.unwrap();
3167 assert_eq!(append.size().await, 11);
3168
3169 append.resize(11).await.unwrap();
3171 assert_eq!(append.size().await, 11);
3172
3173 let read = append.read_at(0, 11).await.unwrap().coalesce();
3175 assert_eq!(read.as_ref(), b"hello world");
3176 });
3177 }
3178
3179 #[test]
3180 fn test_resize_same_page_shrink_reopens_at_shorter_size() {
3181 let executor = deterministic::Runner::default();
3182
3183 executor.start(|context| async move {
3184 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3185 let data: Vec<u8> = (0..50).collect();
3186
3187 let (blob, size) = context
3188 .open("test_partition", b"same_page_shrink")
3189 .await
3190 .unwrap();
3191 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3192 .await
3193 .unwrap();
3194
3195 append.append(&data).await.unwrap();
3198 append.sync().await.unwrap();
3199
3200 append.resize(45).await.unwrap();
3201 drop(append);
3202
3203 let (blob, size) = context
3204 .open("test_partition", b"same_page_shrink")
3205 .await
3206 .unwrap();
3207 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3208 .await
3209 .unwrap();
3210 assert_eq!(append.size().await, 45);
3211 let read = append.read_at(0, 45).await.unwrap().coalesce();
3212 assert_eq!(read.as_ref(), &data[..45]);
3213 });
3214 }
3215
3216 #[test]
3217 fn test_resize_same_page_shrink_survives_interrupted_crc_stage() {
3218 let executor = deterministic::Runner::default();
3219
3220 executor.start(|context| async move {
3221 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3222 let data: Vec<u8> = (0..50).collect();
3223
3224 let (blob, size) = context
3225 .open("test_partition", b"same_page_shrink_interrupted")
3226 .await
3227 .unwrap();
3228 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3229 .await
3230 .unwrap();
3231 append.append(&data[..40]).await.unwrap();
3232 append.sync().await.unwrap();
3233 append.append(&data[40..]).await.unwrap();
3234 append.sync().await.unwrap();
3235 drop(append);
3236
3237 let (blob, size) = context
3238 .open("test_partition", b"same_page_shrink_interrupted")
3239 .await
3240 .unwrap();
3241 let faulty_blob = PartialWriteBlob::new(blob, 1, 3);
3242 let write_count = faulty_blob.write_count();
3243 let failed_write_len = faulty_blob.failed_write_len();
3244 let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
3245 .await
3246 .unwrap();
3247
3248 assert!(
3249 append.resize(45).await.is_err(),
3250 "phase-1 partial write should fail"
3251 );
3252 assert_eq!(write_count.load(Ordering::SeqCst), 1);
3253 assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
3254 drop(append);
3255
3256 let (blob, size) = context
3257 .open("test_partition", b"same_page_shrink_interrupted")
3258 .await
3259 .unwrap();
3260 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3261 .await
3262 .unwrap();
3263 assert_eq!(append.size().await, 50);
3264 let read = append.read_at(0, 50).await.unwrap().coalesce();
3265 assert_eq!(read.as_ref(), &data);
3266 });
3267 }
3268
3269 #[test]
3270 fn test_resize_same_page_shrink_survives_interrupted_len_stage() {
3271 let executor = deterministic::Runner::default();
3272
3273 executor.start(|context| async move {
3274 const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(600);
3275 const LARGE_BUFFER_SIZE: usize = 1_200;
3276
3277 let cache_ref =
3278 CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(LARGE_BUFFER_SIZE));
3279 let data: Vec<u8> = (0..300).map(|i| (i % 251) as u8).collect();
3280
3281 let (blob, size) = context
3282 .open("test_partition", b"same_page_shrink_len_stage")
3283 .await
3284 .unwrap();
3285 let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3286 .await
3287 .unwrap();
3288 append.append(&data[..255]).await.unwrap();
3289 append.sync().await.unwrap();
3290 append.append(&data[255..]).await.unwrap();
3291 append.sync().await.unwrap();
3292 drop(append);
3293
3294 let (blob, size) = context
3295 .open("test_partition", b"same_page_shrink_len_stage")
3296 .await
3297 .unwrap();
3298 let faulty_blob = PartialWriteBlob::new(blob, 2, 1);
3299 let write_count = faulty_blob.write_count();
3300 let failed_write_len = faulty_blob.failed_write_len();
3301 let append = Append::new(faulty_blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3302 .await
3303 .unwrap();
3304
3305 assert!(
3306 append.resize(257).await.is_err(),
3307 "length-stage partial write should fail"
3308 );
3309 assert_eq!(write_count.load(Ordering::SeqCst), 2);
3310 assert_eq!(failed_write_len.load(Ordering::SeqCst), 2);
3311 drop(append);
3312
3313 let (blob, size) = context
3314 .open("test_partition", b"same_page_shrink_len_stage")
3315 .await
3316 .unwrap();
3317 let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref)
3318 .await
3319 .unwrap();
3320 assert_eq!(append.size().await, 300);
3321 let read = append.read_at(0, 300).await.unwrap().coalesce();
3322 assert_eq!(read.as_ref(), &data);
3323 });
3324 }
3325
3326 #[test]
3327 fn test_resize_same_page_shrink_preserves_validated_fallback_slot() {
3328 let executor = deterministic::Runner::default();
3329
3330 executor.start(|context| async move {
3331 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3332 let data: Vec<u8> = (0..52).collect();
3333
3334 let (blob, size) = context
3335 .open("test_partition", b"same_page_shrink_fallback_slot")
3336 .await
3337 .unwrap();
3338 let faulty_blob = PartialWriteBlob::new(blob.clone(), 5, 3);
3339 let write_count = faulty_blob.write_count();
3340 let failed_write_len = faulty_blob.failed_write_len();
3341 let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
3342 .await
3343 .unwrap();
3344 append.append(&data[..48]).await.unwrap();
3345 append.sync().await.unwrap();
3346 assert_eq!(write_count.load(Ordering::SeqCst), 1);
3347
3348 append.append(&data[48..50]).await.unwrap();
3349 append.sync().await.unwrap();
3350 assert_eq!(write_count.load(Ordering::SeqCst), 3);
3351
3352 append.append(&data[50..]).await.unwrap();
3353 append.sync().await.unwrap();
3354 assert_eq!(write_count.load(Ordering::SeqCst), 4);
3355
3356 let slot0_offset = PAGE_SIZE.get() as u64;
3361 blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
3362 .await
3363 .unwrap();
3364 blob.sync().await.unwrap();
3365
3366 assert!(
3367 append.resize(45).await.is_err(),
3368 "phase-1 partial write should fail"
3369 );
3370 assert_eq!(write_count.load(Ordering::SeqCst), 5);
3371 assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
3372 drop(append);
3373
3374 let (blob, size) = context
3375 .open("test_partition", b"same_page_shrink_fallback_slot")
3376 .await
3377 .unwrap();
3378 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3379 .await
3380 .unwrap();
3381 assert_eq!(append.size().await, 50);
3382 let read = append.read_at(0, 50).await.unwrap().coalesce();
3383 assert_eq!(read.as_ref(), &data[..50]);
3384 });
3385 }
3386
3387 #[test]
3388 fn test_resize_full_page_to_partial_reopens_at_shorter_size() {
3389 let executor = deterministic::Runner::default();
3390
3391 executor.start(|context| async move {
3392 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3393 let page_size = PAGE_SIZE.get() as u64;
3394 let target = page_size + 45;
3395 let data: Vec<u8> = (0..page_size * 2).map(|i| (i % 251) as u8).collect();
3396
3397 let (blob, size) = context
3398 .open("test_partition", b"full_page_to_partial")
3399 .await
3400 .unwrap();
3401 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3402 .await
3403 .unwrap();
3404 append.append(&data).await.unwrap();
3405 append.sync().await.unwrap();
3406
3407 append.resize(target).await.unwrap();
3408 drop(append);
3409
3410 let (blob, size) = context
3411 .open("test_partition", b"full_page_to_partial")
3412 .await
3413 .unwrap();
3414 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3415 .await
3416 .unwrap();
3417 assert_eq!(append.size().await, target);
3418 let read = append.read_at(0, target as usize).await.unwrap().coalesce();
3419 assert_eq!(read.as_ref(), &data[..target as usize]);
3420 });
3421 }
3422
3423 #[test]
3424 fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() {
3425 let executor = deterministic::Runner::default();
3426
3427 executor.start(|context| async move {
3428 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3429 let page_size = PAGE_SIZE.get() as u64;
3430 let target = page_size + 45;
3431 let data: Vec<u8> = (0..page_size * 3).map(|i| (i % 251) as u8).collect();
3432
3433 let (blob, size) = context
3434 .open("test_partition", b"full_page_to_partial_interrupted")
3435 .await
3436 .unwrap();
3437 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3438 .await
3439 .unwrap();
3440 append.append(&data).await.unwrap();
3441 append.sync().await.unwrap();
3442 drop(append);
3443
3444 let (blob, size) = context
3445 .open("test_partition", b"full_page_to_partial_interrupted")
3446 .await
3447 .unwrap();
3448 let faulty_blob = PartialWriteBlob::new(blob, 1, 3);
3449 let write_count = faulty_blob.write_count();
3450 let failed_write_len = faulty_blob.failed_write_len();
3451 let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
3452 .await
3453 .unwrap();
3454
3455 assert!(
3456 append.resize(target).await.is_err(),
3457 "phase-1 partial write should fail"
3458 );
3459 assert_eq!(write_count.load(Ordering::SeqCst), 1);
3460 assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
3461 drop(append);
3462
3463 let (blob, size) = context
3464 .open("test_partition", b"full_page_to_partial_interrupted")
3465 .await
3466 .unwrap();
3467 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3468 .await
3469 .unwrap();
3470 assert_eq!(append.size().await, page_size * 2);
3471 let read = append
3472 .read_at(0, (page_size * 2) as usize)
3473 .await
3474 .unwrap()
3475 .coalesce();
3476 assert_eq!(read.as_ref(), &data[..(page_size * 2) as usize]);
3477 });
3478 }
3479
3480 #[test]
3481 fn test_resize_same_page_shrink_survives_interrupted_length_invalidation() {
3482 let executor = deterministic::Runner::default();
3483
3484 executor.start(|context| async move {
3485 const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(600);
3486 const LARGE_BUFFER_SIZE: usize = 1_200;
3487
3488 let cache_ref =
3489 CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(LARGE_BUFFER_SIZE));
3490 let data: Vec<u8> = (0..300).map(|i| (i % 251) as u8).collect();
3491
3492 let (blob, size) = context
3493 .open(
3494 "test_partition",
3495 b"same_page_shrink_interrupted_len_invalidation",
3496 )
3497 .await
3498 .unwrap();
3499 let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3500 .await
3501 .unwrap();
3502 append.append(&data[..255]).await.unwrap();
3505 append.sync().await.unwrap();
3506 append.append(&data[255..]).await.unwrap();
3507 append.sync().await.unwrap();
3508 drop(append);
3509
3510 let (blob, size) = context
3511 .open(
3512 "test_partition",
3513 b"same_page_shrink_interrupted_len_invalidation",
3514 )
3515 .await
3516 .unwrap();
3517 let faulty_blob = PartialWriteBlob::new(blob, 3, 1);
3518 let write_count = faulty_blob.write_count();
3519 let failed_write_len = faulty_blob.failed_write_len();
3520 let append = Append::new(faulty_blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3521 .await
3522 .unwrap();
3523
3524 assert!(
3525 append.resize(40).await.is_err(),
3526 "old-slot length invalidation should fail"
3527 );
3528 assert_eq!(write_count.load(Ordering::SeqCst), 3);
3529 assert_eq!(
3530 failed_write_len.load(Ordering::SeqCst),
3531 std::mem::size_of::<u16>()
3532 );
3533 drop(append);
3534
3535 let (blob, size) = context
3536 .open(
3537 "test_partition",
3538 b"same_page_shrink_interrupted_len_invalidation",
3539 )
3540 .await
3541 .unwrap();
3542 let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref)
3543 .await
3544 .unwrap();
3545 assert_eq!(append.size().await, 40);
3546 let read = append.read_at(0, 40).await.unwrap().coalesce();
3547 assert_eq!(read.as_ref(), &data[..40]);
3548 });
3549 }
3550
3551 #[test_traced("DEBUG")]
3552 fn test_resize_partial_shrink_without_physical_resize_uses_range_sync() {
3553 let executor = deterministic::Runner::default();
3554 executor.start(|context: deterministic::Context| async move {
3555 let blob = SyncTrackingBlob::new();
3556 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3557 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
3558 .await
3559 .unwrap();
3560 append.sync().await.unwrap();
3561
3562 let data = vec![5u8; PAGE_SIZE.get() as usize];
3563 append.append(&data).await.unwrap();
3564 append.sync().await.unwrap();
3565
3566 append.resize(50).await.unwrap();
3568 append.sync().await.unwrap();
3569
3570 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3571 assert_eq!(writes, 4);
3572 assert_eq!(full_syncs, 1);
3573 assert_eq!(range_syncs, 4);
3574 });
3575 }
3576
3577 #[test_traced("DEBUG")]
3578 fn test_resize_partial_shrink_with_physical_resize_clears_full_sync_requirement() {
3579 let executor = deterministic::Runner::default();
3580 executor.start(|context: deterministic::Context| async move {
3581 let blob = SyncTrackingBlob::new();
3582 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3583 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
3584 .await
3585 .unwrap();
3586 append.sync().await.unwrap();
3587
3588 let data = vec![9u8; PAGE_SIZE.get() as usize * 2];
3589 append.append(&data).await.unwrap();
3590 append.sync().await.unwrap();
3591
3592 append.resize(50).await.unwrap();
3595 append.sync().await.unwrap();
3596
3597 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3598 assert_eq!(writes, 4);
3599 assert_eq!(full_syncs, 2);
3600 assert_eq!(range_syncs, 3);
3601
3602 append.append(b"x").await.unwrap();
3604 append.sync().await.unwrap();
3605
3606 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3607 assert_eq!(writes, 5);
3608 assert_eq!(full_syncs, 2);
3609 assert_eq!(range_syncs, 4);
3610
3611 let mut expected = data[..50].to_vec();
3612 expected.push(b'x');
3613 let read = append.read_at(0, expected.len()).await.unwrap().coalesce();
3614 assert_eq!(read.as_ref(), expected.as_slice());
3615 });
3616 }
3617
3618 #[test_traced("DEBUG")]
3619 fn test_resize_page_boundary_shrink_uses_full_sync() {
3620 let executor = deterministic::Runner::default();
3621 executor.start(|context: deterministic::Context| async move {
3622 let blob = SyncTrackingBlob::new();
3623 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3624 let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
3625 .await
3626 .unwrap();
3627 append.sync().await.unwrap();
3628
3629 let page_size = PAGE_SIZE.get() as usize;
3632 let data = vec![11u8; page_size * 2];
3633 append.append(&data).await.unwrap();
3634 append.sync().await.unwrap();
3635
3636 append.resize(PAGE_SIZE.get() as u64).await.unwrap();
3638 append.sync().await.unwrap();
3639
3640 let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3642 assert_eq!(writes, 1);
3643 assert_eq!(full_syncs, 2);
3644 assert_eq!(range_syncs, 1);
3645
3646 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3647 let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
3648 .await
3649 .unwrap();
3650 assert_eq!(reopened.size().await, PAGE_SIZE.get() as u64);
3651 let read = reopened.read_at(0, page_size).await.unwrap().coalesce();
3652 assert_eq!(read.as_ref(), &data[..page_size]);
3653 });
3654 }
3655
3656 #[test]
3657 fn test_reopen_partial_tail_append_and_resize() {
3658 let executor = deterministic::Runner::default();
3659
3660 executor.start(|context| async move {
3661 const PAGE_SIZE: NonZeroU16 = NZU16!(64);
3662 const BUFFER_SIZE: usize = 256;
3663
3664 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(4));
3665
3666 let (blob, size) = context
3667 .open("test_partition", b"partial_tail_test")
3668 .await
3669 .unwrap();
3670
3671 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3672 .await
3673 .unwrap();
3674
3675 append.append(&[1, 2, 3, 4, 5]).await.unwrap();
3677 append.sync().await.unwrap();
3678 assert_eq!(append.size().await, 5);
3679 drop(append);
3680
3681 let (blob, size) = context
3682 .open("test_partition", b"partial_tail_test")
3683 .await
3684 .unwrap();
3685
3686 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3687 .await
3688 .unwrap();
3689 assert_eq!(append.size().await, 5);
3690
3691 append.append(&[6, 7, 8]).await.unwrap();
3692 append.resize(6).await.unwrap();
3693 append.sync().await.unwrap();
3694
3695 let data: Vec<u8> = append.read_at(0, 6).await.unwrap().coalesce().into();
3696 assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
3697 });
3698 }
3699
3700 #[test]
3701 fn test_corrupted_crc_len_too_large() {
3702 let executor = deterministic::Runner::default();
3703
3704 executor.start(|context| async move {
3705 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3706 let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
3707
3708 let (blob, size) = context
3710 .open("test_partition", b"crc_len_test")
3711 .await
3712 .unwrap();
3713
3714 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3715 .await
3716 .unwrap();
3717
3718 append.append(&[0x42; 50]).await.unwrap();
3719 append.sync().await.unwrap();
3720 drop(append);
3721
3722 let (blob, size) = context
3724 .open("test_partition", b"crc_len_test")
3725 .await
3726 .unwrap();
3727 assert_eq!(size as usize, physical_page_size);
3728
3729 let crc_offset = PAGE_SIZE.get() as u64;
3731
3732 let bad_crc_record: [u8; 12] = [
3735 0xFF, 0xFF, 0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ];
3740 blob.write_at(crc_offset, bad_crc_record.to_vec())
3741 .await
3742 .unwrap();
3743 blob.sync().await.unwrap();
3744
3745 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
3747
3748 match result {
3751 Ok(append) => {
3752 let recovered_size = append.size().await;
3754 assert_eq!(
3755 recovered_size, 0,
3756 "Corrupted page should be truncated, size should be 0"
3757 );
3758 }
3759 Err(e) => {
3760 assert!(
3762 matches!(e, crate::Error::InvalidChecksum),
3763 "Expected InvalidChecksum error, got: {:?}",
3764 e
3765 );
3766 }
3767 }
3768 });
3769 }
3770
3771 #[test]
3772 fn test_corrupted_crc_both_slots_len_too_large() {
3773 let executor = deterministic::Runner::default();
3774
3775 executor.start(|context| async move {
3776 let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3777
3778 let (blob, size) = context
3780 .open("test_partition", b"crc_both_bad")
3781 .await
3782 .unwrap();
3783
3784 let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3785 .await
3786 .unwrap();
3787
3788 append.append(&[0x42; 50]).await.unwrap();
3789 append.sync().await.unwrap();
3790 drop(append);
3791
3792 let (blob, size) = context
3794 .open("test_partition", b"crc_both_bad")
3795 .await
3796 .unwrap();
3797
3798 let crc_offset = PAGE_SIZE.get() as u64;
3799
3800 let bad_crc_record: [u8; 12] = [
3802 0x01, 0x00, 0xDE, 0xAD, 0xBE, 0xEF, 0x02, 0x00, 0xCA, 0xFE, 0xBA, 0xBE, ];
3807 blob.write_at(crc_offset, bad_crc_record.to_vec())
3808 .await
3809 .unwrap();
3810 blob.sync().await.unwrap();
3811
3812 let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
3814
3815 match result {
3816 Ok(append) => {
3817 assert_eq!(append.size().await, 0);
3819 }
3820 Err(e) => {
3821 assert!(
3822 matches!(e, crate::Error::InvalidChecksum),
3823 "Expected InvalidChecksum, got: {:?}",
3824 e
3825 );
3826 }
3827 }
3828 });
3829 }
3830}