Skip to main content

commonware_runtime/utils/buffer/paged/
append.rs

1//! The [Append] wrapper consists of a [Blob] and a write buffer, and provides a logical view over
2//! the underlying blob which has a page-oriented structure that provides integrity guarantees. The
3//! wrapper also provides read caching managed by a page cache.
4//!
5//! # Recovery
6//!
7//! On `sync`, this wrapper will durably write buffered data to the underlying blob in pages. All
8//! pages have a [Checksum] at the end. If no CRC record existed before for the page being written,
9//! then one of the checksums will be all zero. If a checksum already existed for the page being
10//! written, then the write will overwrite only the checksum with the lesser length value. Should
11//! this write fail, the previously committed page state can still be recovered. Partial-page
12//! shrink makes the shorter checksum durable before invalidating the old longer checksum.
13//!
14//! During initialization, the wrapper will back up over any page that is not accompanied by a
15//! valid CRC, treating it as the result of an incomplete write that may be invalid.
16//!
17//! # Blob Semantics
18//!
19//! [Append] owns the physical page layout, read cache, and durability bookkeeping for the wrapped
20//! [Blob]. Cloned [Append] handles share that state and are safe to use concurrently. Raw [Blob]
21//! handles cloned before wrapping operate on physical bytes, including checksum records, rather
22//! than [Append]'s logical view, and they do not observe buffered data until it is flushed.
23//!
24//! Raw [Blob] handles must not be used to write, resize, or otherwise mutate the blob while an
25//! [Append] exists. Those mutations bypass the buffer and page cache, can invalidate checksum
26//! recovery, and are not covered by [Append]'s [`Blob::write_at_sync`] fast paths.
27
28use super::read::{PageReader, Replay};
29use crate::{
30    buffer::{
31        paged::{CacheRef, Checksum, CHECKSUM_SIZE, CHECKSUM_SLOT_SIZE},
32        tip::Buffer,
33    },
34    Blob, Error, IoBuf, IoBufMut, IoBufs,
35};
36use bytes::BufMut;
37use commonware_cryptography::Crc32;
38use commonware_utils::sync::{AsyncRwLock, AsyncRwLockWriteGuard};
39use futures::stream::{FuturesUnordered, StreamExt};
40use std::{
41    num::{NonZeroU16, NonZeroUsize},
42    sync::Arc,
43};
44use tracing::warn;
45
46/// Indicates which CRC slot in a page record must not be overwritten.
47#[derive(Clone, Copy)]
48enum ProtectedCrc {
49    First,
50    Second,
51}
52
53/// Describes the state of the underlying blob with respect to the buffer.
54#[derive(Clone)]
55struct BlobState<B: Blob> {
56    blob: B,
57
58    /// The page where the next appended byte will be written to.
59    current_page: u64,
60
61    /// The state of the partial page in the blob. If it was written due to a sync call, then this
62    /// will contain its CRC record.
63    partial_page_state: Option<Checksum>,
64
65    /// Whether prior plain writes or resizes must be made durable by a full sync.
66    needs_sync: bool,
67}
68
69impl<B: Blob> BlobState<B> {
70    /// Write bytes to the underlying blob and mark them as needing sync.
71    async fn write_at(&mut self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
72        self.blob.write_at(offset, bufs).await?;
73        self.needs_sync = true;
74        Ok(())
75    }
76
77    /// Write bytes to the underlying blob and make them durable.
78    ///
79    /// Uses [`Blob::write_at_sync`] when there are no earlier unsynced
80    /// mutations. Otherwise, writes the bytes and then syncs the blob.
81    async fn write_at_sync(
82        &mut self,
83        offset: u64,
84        bufs: impl Into<IoBufs> + Send,
85    ) -> Result<(), Error> {
86        if self.needs_sync {
87            self.write_at(offset, bufs).await?;
88            self.sync().await
89        } else {
90            // If `write_at_sync` fails, a later sync must not treat the drained
91            // buffer as durable.
92            self.needs_sync = true;
93            self.blob.write_at_sync(offset, bufs).await?;
94            self.needs_sync = false;
95            Ok(())
96        }
97    }
98
99    /// Write bytes to the underlying blob, optionally making them durable.
100    async fn write_at_maybe_sync(
101        &mut self,
102        offset: u64,
103        bufs: impl Into<IoBufs> + Send,
104        sync: bool,
105    ) -> Result<(), Error> {
106        if sync {
107            self.write_at_sync(offset, bufs).await
108        } else {
109            self.write_at(offset, bufs).await
110        }
111    }
112
113    /// Resize the underlying blob and mark it as needing sync.
114    async fn resize(&mut self, len: u64) -> Result<(), Error> {
115        self.blob.resize(len).await?;
116        self.needs_sync = true;
117        Ok(())
118    }
119
120    /// Sync the underlying blob if there are unsynced mutations.
121    async fn sync(&mut self) -> Result<(), Error> {
122        if !self.needs_sync {
123            return Ok(());
124        }
125        self.blob.sync().await?;
126        self.needs_sync = false;
127        Ok(())
128    }
129}
130
131/// A [Blob] wrapper that supports write-cached appending of data, with checksums for data integrity
132/// and page cache managed caching.
133#[derive(Clone)]
134pub struct Append<B: Blob> {
135    /// The underlying blob being wrapped.
136    blob_state: Arc<AsyncRwLock<BlobState<B>>>,
137
138    /// Unique id assigned to this blob by the page cache.
139    id: u64,
140
141    /// A reference to the page cache that manages read caching for this blob.
142    cache_ref: CacheRef,
143
144    /// The write buffer containing any logical bytes following the last full page boundary in the
145    /// underlying blob.
146    buffer: Arc<AsyncRwLock<Buffer>>,
147}
148
149/// Returns the capacity with a floor applied to ensure it can hold at least one full page of new
150/// data even when caching a nearly-full page of already written data.
151fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
152    let floor = page_size as usize * 2;
153    if capacity < floor {
154        warn!(
155            floor,
156            "requested buffer capacity is too low, increasing it to floor"
157        );
158        floor
159    } else {
160        capacity
161    }
162}
163
164impl<B: Blob> Append<B> {
165    /// Create a new [Append] wrapper of the provided `blob` that is known to have `blob_size`
166    /// underlying physical bytes, using the provided `cache_ref` for read caching, and a write
167    /// buffer with capacity `capacity`. Rewinds the blob if necessary to ensure it only contains
168    /// checksum-validated data.
169    pub async fn new(
170        blob: B,
171        original_blob_size: u64,
172        capacity: usize,
173        cache_ref: CacheRef,
174    ) -> Result<Self, Error> {
175        let (partial_page_state, pages, invalid_data_found) =
176            Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
177        if invalid_data_found {
178            // Invalid data was detected, trim it from the blob.
179            let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
180            warn!(
181                original_blob_size,
182                new_blob_size, "truncating blob to remove invalid data"
183            );
184            blob.resize(new_blob_size).await?;
185            blob.sync().await?;
186        }
187
188        let capacity = capacity_with_floor(capacity, cache_ref.page_size());
189        let needs_sync = !invalid_data_found; // ensure pending writes on the wrapped blob are synced
190
191        let (blob_state, partial_data) = match partial_page_state {
192            Some((partial_page, crc_record)) => (
193                BlobState {
194                    blob,
195                    current_page: pages - 1,
196                    partial_page_state: Some(crc_record),
197                    needs_sync,
198                },
199                Some(partial_page),
200            ),
201            None => (
202                BlobState {
203                    blob,
204                    current_page: pages,
205                    partial_page_state: None,
206                    needs_sync,
207                },
208                None,
209            ),
210        };
211
212        let buffer = Buffer::from(
213            blob_state.current_page * cache_ref.page_size(),
214            partial_data.unwrap_or_default(),
215            capacity,
216            cache_ref.pool().clone(),
217        );
218
219        Ok(Self {
220            blob_state: Arc::new(AsyncRwLock::new(blob_state)),
221            id: cache_ref.next_id(),
222            cache_ref,
223            buffer: Arc::new(AsyncRwLock::new(buffer)),
224        })
225    }
226
227    /// Scans backwards from the end of the blob, stopping when it finds a valid page.
228    ///
229    /// # Returns
230    ///
231    /// A tuple of `(partial_page, page_count, invalid_data_found)`:
232    ///
233    /// - `partial_page`: If the last valid page is partial (contains fewer than `page_size` logical
234    ///   bytes), returns `Some((data, crc_record))` containing the logical data and its CRC record.
235    ///   Returns `None` if the last valid page is full or if no valid pages exist.
236    ///
237    /// - `page_count`: The number of pages in the blob up to and including the last valid page
238    ///   found (whether or not it's partial). Note that it's possible earlier pages may be invalid
239    ///   since this function stops scanning when it finds one valid page.
240    ///
241    /// - `invalid_data_found`: `true` if there are any bytes in the blob that follow the last valid
242    ///   page. Typically the blob should be resized to eliminate them since their integrity cannot
243    ///   be guaranteed.
244    async fn read_last_valid_page(
245        blob: &B,
246        blob_size: u64,
247        page_size: u64,
248    ) -> Result<(Option<(IoBuf, Checksum)>, u64, bool), Error> {
249        let physical_page_size = page_size + CHECKSUM_SIZE;
250        let partial_bytes = blob_size % physical_page_size;
251        let mut last_page_end = blob_size - partial_bytes;
252
253        // If the last physical page in the blob is truncated, it can't have a valid CRC record and
254        // must be invalid.
255        let mut invalid_data_found = partial_bytes != 0;
256
257        while last_page_end != 0 {
258            // Read the last page and parse its CRC record.
259            let page_start = last_page_end - physical_page_size;
260            let buf = blob
261                .read_at(page_start, physical_page_size as usize)
262                .await?
263                .coalesce()
264                .freeze();
265
266            match Checksum::validate_page(buf.as_ref()) {
267                Some(crc_record) => {
268                    // Found a valid page.
269                    let (len, _) = crc_record.get_crc();
270                    let len = len as u64;
271                    if len != page_size {
272                        // The page is partial (logical data doesn't fill the page).
273                        let logical_bytes = buf.slice(..len as usize);
274                        return Ok((
275                            Some((logical_bytes, crc_record)),
276                            last_page_end / physical_page_size,
277                            invalid_data_found,
278                        ));
279                    }
280                    // The page is full.
281                    return Ok((None, last_page_end / physical_page_size, invalid_data_found));
282                }
283                None => {
284                    // The page is invalid.
285                    last_page_end = page_start;
286                    invalid_data_found = true;
287                }
288            }
289        }
290
291        // No valid page exists in the blob.
292        Ok((None, 0, invalid_data_found))
293    }
294
295    /// Append all bytes in `buf` to the tip of the blob.
296    pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
297        let mut buffer = self.buffer.write().await;
298
299        if !buffer.append(buf) {
300            return Ok(());
301        }
302
303        // Buffer is over capacity, so we need to write data to the blob.
304        self.flush_internal(buffer, false, false).await?;
305        Ok(())
306    }
307
308    /// Flush all full pages from the buffer to disk, resetting the buffer to contain only the bytes
309    /// in any final partial page.
310    ///
311    /// If `write_partial_page` is true, the partial page will be written to the blob as well along
312    /// with a CRC record.
313    ///
314    /// If `sync` is true and the flush emits a single write, that write is made durable
315    /// immediately: with [`Blob::write_at_sync`] when there are no earlier unsynced mutations, or
316    /// by writing it and syncing the blob when there are. Flushes split around a protected CRC use
317    /// plain writes so the caller can make them durable with one sync.
318    ///
319    /// # Serialization
320    ///
321    /// This method reads `partial_page_state` from `blob_state` under a read lock, then later
322    /// acquires `blob_state` as a write lock to commit the new state. This is safe because the
323    /// caller always holds the buffer write lock (`buf_guard`), and all paths into `flush_internal`
324    /// require that lock, so concurrent flushes are impossible.
325    ///
326    /// Returns `true` if the flush made its writes durable, so no additional sync is needed.
327    async fn flush_internal(
328        &self,
329        mut buf_guard: AsyncRwLockWriteGuard<'_, Buffer>,
330        write_partial_page: bool,
331        sync: bool,
332    ) -> Result<bool, Error> {
333        let buffer = &mut *buf_guard;
334
335        // Read the old partial page state before doing the heavy work of preparing physical pages.
336        // This is safe because partial_page_state is only modified by flush_internal, and we hold
337        // the buffer write lock which prevents concurrent flushes.
338        let old_partial_page_state = {
339            let blob_state = self.blob_state.read().await;
340            blob_state.partial_page_state.clone()
341        };
342
343        // Prepare the *physical* pages corresponding to the data in the buffer.
344        // Pass the old partial page state so the CRC record is constructed correctly.
345        let (mut physical_pages, partial_page_state) = self.to_physical_pages(
346            &*buffer,
347            write_partial_page,
348            old_partial_page_state.as_ref(),
349        );
350
351        // If there's nothing to write, return early.
352        if physical_pages.is_empty() {
353            return Ok(false);
354        }
355
356        // Split buffered bytes into full logical pages to hand off now, leaving any trailing
357        // partial page in tip for continued buffering.
358        let logical_page_size = self.cache_ref.page_size() as usize;
359        let pages_to_cache = buffer.len() / logical_page_size;
360        let bytes_to_drain = pages_to_cache * logical_page_size;
361
362        // Remember the logical start offset and page bytes for caching of flushed full pages.
363        let cache_pages = if pages_to_cache > 0 {
364            Some((buffer.offset, buffer.slice(..bytes_to_drain)))
365        } else {
366            None
367        };
368
369        // Drain full pages from the buffered logical data. If the tip is fully drained, detach its
370        // backing so empty append buffers don't retain pooled storage.
371        if bytes_to_drain == buffer.len() && bytes_to_drain != 0 {
372            let _ = buffer
373                .take()
374                .expect("take must succeed when flush drains all buffered bytes");
375        } else if bytes_to_drain != 0 {
376            buffer.drop_prefix(bytes_to_drain);
377            buffer.offset += bytes_to_drain as u64;
378        }
379        let new_offset = buffer.offset;
380
381        // Cache full pages before releasing the tip lock so reads don't observe stale persisted
382        // bytes during the handoff from tip to cache.
383        if let Some((cache_offset, pages)) = cache_pages {
384            let remaining = self.cache_ref.cache(self.id, pages.as_ref(), cache_offset);
385            assert_eq!(remaining, 0, "cached full-page prefix must be page-aligned");
386        }
387
388        // Acquire a write lock on the blob state so nobody tries to read or modify the blob while
389        // we're writing to it.
390        let mut blob_state = self.blob_state.write().await;
391
392        // Release the buffer lock to allow for concurrent reads & buffered writes while we write
393        // the physical pages.
394        drop(buf_guard);
395
396        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
397        let write_at_offset = blob_state.current_page * physical_page_size as u64;
398
399        // Identify protected regions based on the OLD partial page state.
400        let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
401
402        // Update state before writing. This may appear to risk data loss if writes fail,
403        // but write failures are fatal per this codebase's design - callers must not use
404        // the blob after any mutable method returns an error.
405        blob_state.current_page += pages_to_cache as u64;
406        blob_state.partial_page_state = partial_page_state;
407
408        // Make sure the buffer offset and underlying blob agree on the state of the tip.
409        assert_eq!(
410            blob_state.current_page * self.cache_ref.page_size(),
411            new_offset
412        );
413
414        // Write the physical pages to the blob.
415        // If there are protected regions in the first page, we need to write around them.
416        match protected_regions {
417            Some((prefix_len, ProtectedCrc::First)) => {
418                // Protected CRC is first: [page_size..page_size+6].
419                //
420                // If only one of these writes is emitted, it can be made durable here. If
421                // both are emitted, keep them plain so one later sync covers both.
422                //
423                // Write 1: new data in first page [prefix_len..page_size].
424                let has_first_write = prefix_len < logical_page_size;
425                if has_first_write {
426                    let _ = physical_pages.split_to(prefix_len);
427                    let first_payload = physical_pages.split_to(logical_page_size - prefix_len);
428                    let has_second_write = physical_pages.len() > CHECKSUM_SLOT_SIZE;
429                    blob_state
430                        .write_at_maybe_sync(
431                            write_at_offset + prefix_len as u64,
432                            first_payload,
433                            sync && !has_second_write,
434                        )
435                        .await?;
436                    if !has_second_write {
437                        return Ok(sync);
438                    }
439                } else {
440                    // Skip the protected first page bytes when they are fully covered.
441                    let _ = physical_pages.split_to(logical_page_size);
442                }
443
444                // Write 2: second CRC of first page + all remaining pages [page_size+6..end].
445                if physical_pages.len() > CHECKSUM_SLOT_SIZE {
446                    let _ = physical_pages.split_to(CHECKSUM_SLOT_SIZE);
447                    blob_state
448                        .write_at_maybe_sync(
449                            write_at_offset + (logical_page_size + CHECKSUM_SLOT_SIZE) as u64,
450                            physical_pages,
451                            sync && !has_first_write,
452                        )
453                        .await?;
454                    if !has_first_write {
455                        return Ok(sync);
456                    }
457                }
458
459                Ok(false)
460            }
461            Some((prefix_len, ProtectedCrc::Second)) => {
462                // Protected CRC is second: [page_size+6..page_size+12].
463                //
464                // If only one of these writes is emitted, it can be made durable here. If
465                // both are emitted, keep them plain so one later sync covers both.
466                //
467                // Write 1: new data + first CRC of first page [prefix_len..page_size+6].
468                let first_crc_end = logical_page_size + CHECKSUM_SLOT_SIZE;
469                let skip = physical_page_size - first_crc_end;
470                let has_first_write = prefix_len < first_crc_end;
471                if has_first_write {
472                    let _ = physical_pages.split_to(prefix_len);
473                    let first_payload = physical_pages.split_to(first_crc_end - prefix_len);
474                    let has_second_write = physical_pages.len() > skip;
475                    blob_state
476                        .write_at_maybe_sync(
477                            write_at_offset + prefix_len as u64,
478                            first_payload,
479                            sync && !has_second_write,
480                        )
481                        .await?;
482                    if !has_second_write {
483                        return Ok(sync);
484                    }
485                } else {
486                    // Skip the fully protected first segment when no bytes from it need update.
487                    let _ = physical_pages.split_to(first_crc_end);
488                }
489
490                // Write 2: all remaining pages (if any) [physical_page_size..end].
491                if physical_pages.len() > skip {
492                    let _ = physical_pages.split_to(skip);
493                    blob_state
494                        .write_at_maybe_sync(
495                            write_at_offset + physical_page_size as u64,
496                            physical_pages,
497                            sync && !has_first_write,
498                        )
499                        .await?;
500                    if !has_first_write {
501                        return Ok(sync);
502                    }
503                }
504
505                Ok(false)
506            }
507            None => {
508                // No protected regions, write everything in one operation
509                blob_state
510                    .write_at_maybe_sync(write_at_offset, physical_pages, sync)
511                    .await?;
512                Ok(sync)
513            }
514        }
515    }
516
517    /// Returns the logical size of the blob. This accounts for both written and buffered data.
518    pub async fn size(&self) -> u64 {
519        let buffer = self.buffer.read().await;
520        buffer.size()
521    }
522
523    /// Returns the logical size of the blob if it can be observed without waiting.
524    ///
525    /// This is useful for opportunistic fast paths that should fall back rather than contend with
526    /// concurrent writers.
527    pub fn try_size(&self) -> Option<u64> {
528        let buffer = self.buffer.try_read().ok()?;
529        Some(buffer.size())
530    }
531
532    /// Read into `buf` if it can be done synchronously (e.g. without I/O), returning `false` otherwise.
533    ///
534    /// Returns `true` only if all `buf.len()` bytes were satisfied. The caller must have
535    /// already validated that `offset + buf.len()` is within the blob's logical size.
536    ///
537    /// The page cache is consulted first to minimize the risk of writer starvation from a
538    /// burst of buffer reads (which jump ahead of queued writers on the buffer lock).
539    pub fn try_read_sync(&self, offset: u64, buf: &mut [u8]) -> bool {
540        if self.cache_ref.read_cached(self.id, buf, offset) == buf.len() {
541            return true;
542        }
543        let Some(end_offset) = offset.checked_add(buf.len() as u64) else {
544            return false;
545        };
546        let Ok(buffer) = self.buffer.try_read() else {
547            return false;
548        };
549        if offset < buffer.offset || end_offset > buffer.size() {
550            return false;
551        }
552        let src_start = (offset - buffer.offset) as usize;
553        buf.copy_from_slice(&buffer.as_ref()[src_start..src_start + buf.len()]);
554        true
555    }
556
557    /// Read exactly `len` immutable bytes starting at `offset`.
558    pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
559        // Read into a temporary contiguous buffer and copy back to preserve structure.
560        // SAFETY: read_into below initializes all `len` bytes.
561        let mut buf = unsafe { self.cache_ref.pool().alloc_len(len) };
562        self.read_into(buf.as_mut(), offset).await?;
563        Ok(buf.into())
564    }
565
566    /// Reads up to `buf.len()` bytes starting at `logical_offset`, but only as many as are
567    /// available.
568    ///
569    /// This is useful for reading variable-length prefixes (like varints) where you want to read
570    /// up to a maximum number of bytes but the actual data might be shorter.
571    ///
572    /// Returns the buffer (truncated to actual bytes read) and the number of bytes read.
573    /// Returns an error if no bytes are available at the given offset.
574    pub async fn read_up_to(
575        &self,
576        logical_offset: u64,
577        len: usize,
578        bufs: impl Into<IoBufMut> + Send,
579    ) -> Result<(IoBufMut, usize), Error> {
580        let mut bufs = bufs.into();
581        if len == 0 {
582            bufs.truncate(0);
583            return Ok((bufs, 0));
584        }
585        let blob_size = self.size().await;
586        let available = (blob_size.saturating_sub(logical_offset) as usize).min(len);
587        if available == 0 {
588            return Err(Error::BlobInsufficientLength);
589        }
590        // SAFETY: read_into below fills all `available` bytes.
591        unsafe { bufs.set_len(available) };
592        self.read_into(bufs.as_mut(), logical_offset).await?;
593
594        Ok((bufs, available))
595    }
596
597    /// Read multiple fixed-size items at sorted byte offsets into a contiguous caller buffer.
598    ///
599    /// `buf` must be exactly `offsets.len() * item_size` bytes. All offsets must be sorted,
600    /// non-overlapping, and within bounds. This amortizes lock acquisition and avoids
601    /// per-item buffer allocation compared to calling [`read_at`](Self::read_at) in a loop.
602    pub async fn read_many_into(
603        &self,
604        buf: &mut [u8],
605        offsets: &[u64],
606        item_size: usize,
607    ) -> Result<(), Error> {
608        assert_eq!(
609            buf.len(),
610            offsets
611                .len()
612                .checked_mul(item_size)
613                .expect("read_many_into buffer length overflow"),
614            "read_many_into requires buf.len() == offsets.len() * item_size"
615        );
616        if offsets.is_empty() {
617            return Ok(());
618        }
619
620        let last_end = offsets[offsets.len() - 1]
621            .checked_add(item_size as u64)
622            .ok_or(Error::OffsetOverflow)?;
623
624        // Acquire the buffer lock once for all items.
625        let buffer = self.buffer.read().await;
626        if last_end > buffer.size() {
627            return Err(Error::BlobInsufficientLength);
628        }
629
630        // Iterate over fixed-size output slots and copy items that overlap with the
631        // tip buffer directly into place. Items fully or partially below the tip
632        // need cache/blob reads and are recorded as (slice, offset) pairs.
633        // `chunks_exact_mut` yields disjoint per-item slots, so we never have to
634        // reborrow the parent buffer while cache/blob destinations remain live.
635        if item_size == 0 {
636            return Ok(());
637        }
638        let mut cache_ranges: Vec<(&mut [u8], u64)> = Vec::new();
639        for (item_buf, &offset) in buf.chunks_exact_mut(item_size).zip(offsets.iter()) {
640            let end = offset + item_size as u64;
641
642            if end <= buffer.offset {
643                // Entirely below tip -- needs cache read.
644                cache_ranges.push((item_buf, offset));
645            } else if offset >= buffer.offset {
646                // Entirely in tip buffer.
647                let src = (offset - buffer.offset) as usize;
648                item_buf.copy_from_slice(&buffer.as_ref()[src..src + item_size]);
649            } else {
650                // Straddles tip boundary: copy suffix from tip, record prefix for cache.
651                let prefix_len = (buffer.offset - offset) as usize;
652                item_buf[prefix_len..].copy_from_slice(&buffer.as_ref()[..item_size - prefix_len]);
653                cache_ranges.push((&mut item_buf[..prefix_len], offset));
654            }
655        }
656
657        drop(buffer);
658
659        if cache_ranges.is_empty() {
660            return Ok(());
661        }
662
663        // Fast path: try page cache for all ranges in a single lock acquisition.
664        // Fully-cached ranges are removed from cache_ranges; only misses remain.
665        self.cache_ref.read_cached_many(self.id, &mut cache_ranges);
666        if cache_ranges.is_empty() {
667            return Ok(());
668        }
669
670        // Slow path: read only the ranges that had cache misses, concurrently.
671        let blob_guard = self.blob_state.read().await;
672        let mut reads = cache_ranges
673            .iter_mut()
674            .map(|(item_buf, offset)| {
675                self.cache_ref
676                    .read(&blob_guard.blob, self.id, item_buf, *offset)
677            })
678            .collect::<FuturesUnordered<_>>();
679        while let Some(result) = reads.next().await {
680            result?;
681        }
682
683        Ok(())
684    }
685
686    /// Reads bytes starting at `logical_offset` into `buf`.
687    ///
688    /// This method allows reading directly into a mutable slice without taking ownership of the
689    /// buffer or requiring a specific buffer type.
690    pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
691        // Ensure the read doesn't overflow.
692        let end_offset = logical_offset
693            .checked_add(buf.len() as u64)
694            .ok_or(Error::OffsetOverflow)?;
695
696        // Acquire a read lock on the buffer.
697        let buffer = self.buffer.read().await;
698
699        // If the data required is beyond the size of the blob, return an error.
700        if end_offset > buffer.size() {
701            return Err(Error::BlobInsufficientLength);
702        }
703
704        // Extract any bytes from the buffer that overlap with the requested range.
705        let remaining = if end_offset <= buffer.offset {
706            // No overlap with tip.
707            buf.len()
708        } else {
709            // Overlap is always a suffix of requested range.
710            let overlap_start = buffer.offset.max(logical_offset);
711            let dst_start = (overlap_start - logical_offset) as usize;
712            let src_start = (overlap_start - buffer.offset) as usize;
713            let copied = buf.len() - dst_start;
714            buf[dst_start..].copy_from_slice(&buffer.as_ref()[src_start..src_start + copied]);
715            dst_start
716        };
717
718        // Release buffer lock before potential I/O.
719        drop(buffer);
720
721        if remaining == 0 {
722            return Ok(());
723        }
724
725        // Fast path: try to read *only* from page cache without acquiring blob lock. This allows
726        // concurrent reads even while a flush is in progress.
727        let cached = self
728            .cache_ref
729            .read_cached(self.id, &mut buf[..remaining], logical_offset);
730
731        if cached == remaining {
732            // All bytes found in cache.
733            return Ok(());
734        }
735
736        // Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight
737        // write completes before we read from the blob.
738        let blob_guard = self.blob_state.read().await;
739
740        // Read remaining bytes that were not already obtained from the earlier cache read.
741        let uncached_offset = logical_offset + cached as u64;
742        let uncached_len = remaining - cached;
743        self.cache_ref
744            .read(
745                &blob_guard.blob,
746                self.id,
747                &mut buf[cached..cached + uncached_len],
748                uncached_offset,
749            )
750            .await
751    }
752
753    /// Returns the protected region info for a partial page, if any.
754    ///
755    /// # Returns
756    ///
757    /// `None` if there's no existing partial page.
758    ///
759    /// `Some((prefix_len, protected_crc))` where:
760    /// - `prefix_len`: bytes `[0..prefix_len]` were already written and can be substituted with
761    ///   zeros (skip writing)
762    /// - `protected_crc`: which CRC slot must not be overwritten
763    fn identify_protected_regions(
764        partial_page_state: Option<&Checksum>,
765    ) -> Option<(usize, ProtectedCrc)> {
766        let crc_record = partial_page_state?;
767        let (old_len, _) = crc_record.get_crc();
768        // The protected CRC is the one with the larger (authoritative) length.
769        let protected_crc = if crc_record.len1 >= crc_record.len2 {
770            ProtectedCrc::First
771        } else {
772            ProtectedCrc::Second
773        };
774        Some((old_len as usize, protected_crc))
775    }
776
777    /// Prepare physical-page writes from buffered logical bytes.
778    ///
779    /// Each physical page contains one logical page plus CRC record. If the last page is not yet
780    /// full, it will be included only if `include_partial_page` is true.
781    ///
782    /// # Arguments
783    ///
784    /// * `buffer` - The buffer containing logical page data
785    /// * `include_partial_page` - Whether to include a partial page if one exists
786    /// * `old_crc_record` - The CRC record from a previously committed partial page, if any.
787    ///   When present, the first page's CRC record will preserve the old CRC in its original slot
788    ///   and place the new CRC in the other slot.
789    fn to_physical_pages(
790        &self,
791        buffer: &Buffer,
792        include_partial_page: bool,
793        old_crc_record: Option<&Checksum>,
794    ) -> (IoBufs, Option<Checksum>) {
795        let logical_page_size = self.cache_ref.page_size() as usize;
796        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
797        let pages_to_write = buffer.len() / logical_page_size;
798        let mut write_buffer = IoBufs::default();
799        let buffer_data = buffer.as_ref();
800
801        if pages_to_write > 0 {
802            let logical_page_size_u16 =
803                u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
804
805            // Build CRC bytes for full pages once. Full-page payload bytes are appended below as
806            // slices from tip, so we avoid copying logical payload here.
807            let mut crcs = self
808                .cache_ref
809                .pool()
810                .alloc(CHECKSUM_SIZE as usize * pages_to_write);
811            for page in 0..pages_to_write {
812                let start_read_idx = page * logical_page_size;
813                let end_read_idx = start_read_idx + logical_page_size;
814                let logical_page = &buffer_data[start_read_idx..end_read_idx];
815                let crc = Crc32::checksum(logical_page);
816
817                // For the first page, if there's an old partial page CRC, construct the record
818                // to preserve the old CRC in its original slot.
819                let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
820                    Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
821                } else {
822                    Checksum::new(logical_page_size_u16, crc)
823                };
824                crcs.put_slice(&crc_record.to_bytes());
825            }
826            let crc_blob = crcs.freeze();
827
828            // Physical full-page layout is [logical_page_bytes, crc_record_bytes].
829            for page in 0..pages_to_write {
830                let start_read_idx = page * logical_page_size;
831                let end_read_idx = start_read_idx + logical_page_size;
832                write_buffer.append(buffer.slice(start_read_idx..end_read_idx));
833
834                let crc_start = page * CHECKSUM_SIZE as usize;
835                write_buffer.append(crc_blob.slice(crc_start..crc_start + CHECKSUM_SIZE as usize));
836            }
837        }
838
839        if !include_partial_page {
840            return (write_buffer, None);
841        }
842
843        let partial_page = &buffer_data[pages_to_write * logical_page_size..];
844        if partial_page.is_empty() {
845            // No partial page data to write.
846            return (write_buffer, None);
847        }
848
849        // If there are no full pages and the partial page length matches what was already
850        // written, there's nothing new to write.
851        if pages_to_write == 0 {
852            if let Some(old_crc) = old_crc_record {
853                let (old_len, _) = old_crc.get_crc();
854                if partial_page.len() == old_len as usize {
855                    return (write_buffer, None);
856                }
857            }
858        }
859        let partial_len = partial_page.len();
860        let crc = Crc32::checksum(partial_page);
861
862        // For partial pages: if this is the first page and there's an old CRC, preserve it.
863        // Otherwise just use the new CRC in slot 0.
864        let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
865            Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
866        } else {
867            Checksum::new(partial_len as u16, crc)
868        };
869
870        // A persisted partial page still occupies one full physical page:
871        // [partial logical bytes, zero padding, crc record].
872        let mut padded = self.cache_ref.pool().alloc(physical_page_size);
873        padded.put_slice(partial_page);
874        let zero_count = logical_page_size - partial_len;
875        if zero_count > 0 {
876            padded.put_bytes(0, zero_count);
877        }
878        padded.put_slice(&crc_record.to_bytes());
879        write_buffer.append(padded.freeze());
880
881        // Return the CRC record that matches what we wrote to disk, so that future flushes
882        // correctly identify which slot is protected.
883        (write_buffer, Some(crc_record))
884    }
885
886    /// Encode one checksum slot as `[len: u16][crc: u32]`, matching `Checksum::write`.
887    fn checksum_slot_bytes(len: u16, crc: u32) -> [u8; CHECKSUM_SLOT_SIZE] {
888        let mut bytes = [0u8; CHECKSUM_SLOT_SIZE];
889        bytes[..2].copy_from_slice(&len.to_be_bytes());
890        bytes[2..].copy_from_slice(&crc.to_be_bytes());
891        bytes
892    }
893
894    /// Build a CRC record that preserves the old CRC in its original slot and places
895    /// the new CRC in the other slot.
896    const fn build_crc_record_preserving_old(
897        new_len: u16,
898        new_crc: u32,
899        old_crc: &Checksum,
900    ) -> Checksum {
901        let (old_len, old_crc_val) = old_crc.get_crc();
902        // The old CRC is in the slot with the larger length value (first slot wins ties).
903        if old_crc.len1 >= old_crc.len2 {
904            // Old CRC is in slot 0, put new CRC in slot 1
905            Checksum {
906                len1: old_len,
907                crc1: old_crc_val,
908                len2: new_len,
909                crc2: new_crc,
910            }
911        } else {
912            // Old CRC is in slot 1, put new CRC in slot 0
913            Checksum {
914                len1: new_len,
915                crc1: new_crc,
916                len2: old_len,
917                crc2: old_crc_val,
918            }
919        }
920    }
921
922    /// Durably rewrite a committed page to a shorter partial length.
923    async fn sync_partial_page_shrink(
924        blob_state: &mut BlobState<B>,
925        page: u64,
926        logical_page_size: u64,
927        new_len: u16,
928        new_crc: u32,
929        old_crc: &Checksum,
930    ) -> Result<Checksum, Error> {
931        // Recovery chooses the valid slot with the larger length. While shrinking, the new
932        // checksum must be made durable without becoming authoritative until the old longer slot
933        // can be disabled. The sequence below therefore lets recovery observe either the old page
934        // or the new shorter page, but not a footer where both slots were damaged by one torn write.
935        let physical_page_size = logical_page_size
936            .checked_add(CHECKSUM_SIZE)
937            .ok_or(Error::OffsetOverflow)?;
938        let crc_start = page
939            .checked_mul(physical_page_size)
940            .and_then(|start| start.checked_add(logical_page_size))
941            .ok_or(Error::OffsetOverflow)?;
942        let (new_slot_start, old_slot_start) = if old_crc.len1 >= old_crc.len2 {
943            (CHECKSUM_SLOT_SIZE, 0)
944        } else {
945            (0, CHECKSUM_SLOT_SIZE)
946        };
947
948        // Stage the new slot with a 0 length and the shrunken page CRC. A crash here leaves the
949        // old slot as the only non-zero valid slot.
950        let new_slot_offset = crc_start
951            .checked_add(new_slot_start as u64)
952            .ok_or(Error::OffsetOverflow)?;
953        let staged_slot = Self::checksum_slot_bytes(0, new_crc);
954        blob_state
955            .write_at_sync(new_slot_offset, staged_slot.to_vec())
956            .await?;
957
958        // Publish the new shrunken length. If a crash happens before the old slot is invalidated,
959        // both slots may be valid, but recovery still chooses the old longer length.
960        blob_state
961            .write_at_sync(new_slot_offset, new_len.to_be_bytes().to_vec())
962            .await?;
963
964        // Clear only the old slot's length bytes. Rewriting the whole footer here could tear across
965        // both slots and lose the already-durable shorter checksum. Once this lands, length 0 is
966        // never authoritative, so the shrunken slot wins.
967        let old_slot_offset = crc_start
968            .checked_add(old_slot_start as u64)
969            .ok_or(Error::OffsetOverflow)?;
970        let len_size = std::mem::size_of::<u16>();
971        blob_state
972            .write_at_sync(old_slot_offset, vec![0u8; len_size])
973            .await?;
974
975        let final_record = if new_slot_start == 0 {
976            Checksum {
977                len1: new_len,
978                crc1: new_crc,
979                len2: 0,
980                crc2: 0,
981            }
982        } else {
983            Checksum {
984                len1: 0,
985                crc1: 0,
986                len2: new_len,
987                crc2: new_crc,
988            }
989        };
990        Ok(final_record)
991    }
992
993    /// Flushes any buffered data, then returns a [Replay] for the underlying blob.
994    ///
995    /// The returned replay can be used to sequentially read all pages from the blob while ensuring
996    /// all data passes integrity verification. CRCs are validated but not included in the output.
997    ///
998    /// This is not a durable operation. Buffered data may be plainly written so the replay can
999    /// read it, but callers must still use [`sync`](Self::sync) if that data must survive a crash.
1000    pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
1001        let logical_page_size = self.cache_ref.page_size();
1002        let logical_page_size_nz =
1003            NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
1004
1005        // Flush any buffered data (without fsync) so the reader sees all written data.
1006        {
1007            let buf_guard = self.buffer.write().await;
1008            self.flush_internal(buf_guard, true, false).await?;
1009        }
1010
1011        // Convert buffer size (bytes) to page count
1012        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
1013        let prefetch_pages = buffer_size.get() / physical_page_size as usize;
1014        let prefetch_pages = prefetch_pages.max(1); // At least 1 page
1015        let blob_guard = self.blob_state.read().await;
1016
1017        // Compute both physical and logical blob sizes.
1018        let (physical_blob_size, logical_blob_size) =
1019            blob_guard.partial_page_state.as_ref().map_or_else(
1020                || {
1021                    // All pages are full.
1022                    let physical = physical_page_size * blob_guard.current_page;
1023                    let logical = logical_page_size * blob_guard.current_page;
1024                    (physical, logical)
1025                },
1026                |crc_record| {
1027                    // There's a partial page with a checksum.
1028                    let (partial_len, _) = crc_record.get_crc();
1029                    let partial_len = partial_len as u64;
1030                    // Physical: all pages including the partial one (which is padded to full size).
1031                    let physical = physical_page_size * (blob_guard.current_page + 1);
1032                    // Logical: full pages before this + partial page's actual data length.
1033                    let logical = logical_page_size * blob_guard.current_page + partial_len;
1034                    (physical, logical)
1035                },
1036            );
1037
1038        let reader = PageReader::new(
1039            blob_guard.blob.clone(),
1040            physical_blob_size,
1041            logical_blob_size,
1042            prefetch_pages,
1043            logical_page_size_nz,
1044        );
1045        Ok(Replay::new(reader))
1046    }
1047}
1048
1049impl<B: Blob> Append<B> {
1050    /// Flushes buffered data and makes all pending mutations durable.
1051    ///
1052    /// A single physical write can be persisted with [`Blob::write_at_sync`]. If there
1053    /// are earlier unsynced mutations, or if the flush emits multiple physical writes,
1054    /// durability is completed with [`Blob::sync`].
1055    pub async fn sync(&self) -> Result<(), Error> {
1056        // Flush any buffered data, including any partial page.
1057        let buf_guard = self.buffer.write().await;
1058
1059        // A single emitted write can be made durable directly during the flush.
1060        if self.flush_internal(buf_guard, true, true).await? {
1061            return Ok(());
1062        }
1063
1064        // Otherwise, the flush either had no bytes to write or used plain writes. Sync only if a
1065        // durability barrier is still pending.
1066        let mut blob_state = self.blob_state.write().await;
1067        blob_state.sync().await
1068    }
1069
1070    /// Resize the blob to the provided logical `size`.
1071    ///
1072    /// This truncates the blob to contain only `size` logical bytes. The physical blob size will
1073    /// be adjusted to include the necessary CRC records for the remaining pages.
1074    ///
1075    /// # Warning
1076    ///
1077    /// - Concurrent mutable operations (append, resize) are not supported and will cause data loss.
1078    /// - Concurrent readers which try to read past the new size during the resize may error.
1079    /// - The resize is not guaranteed durable until the next sync.
1080    pub async fn resize(&self, size: u64) -> Result<(), Error> {
1081        let current_size = self.size().await;
1082        if size == current_size {
1083            return Ok(());
1084        }
1085
1086        // Handle growing by appending zero bytes.
1087        if size > current_size {
1088            let zeros_needed = (size - current_size) as usize;
1089            let mut zeros = self.cache_ref.pool().alloc(zeros_needed);
1090            zeros.put_bytes(0, zeros_needed);
1091            self.append(zeros.as_ref()).await?;
1092            return Ok(());
1093        }
1094
1095        self.shrink(size).await
1096    }
1097
1098    /// Coordinate the locking and dispatch logic for shrinking the blob.
1099    async fn shrink(&self, target_size: u64) -> Result<(), Error> {
1100        let logical_page_size = self.cache_ref.page_size();
1101        let physical_page_size = logical_page_size
1102            .checked_add(CHECKSUM_SIZE)
1103            .ok_or(Error::OffsetOverflow)?;
1104
1105        // Flush any buffered data first to ensure we have a consistent state on disk.
1106        self.sync().await?;
1107
1108        // Acquire both locks to prevent concurrent operations.
1109        let mut buf_guard = self.buffer.write().await;
1110        let mut blob_guard = self.blob_state.write().await;
1111
1112        // Calculate the physical size needed for the new logical size.
1113        let full_pages = target_size / logical_page_size;
1114        let partial_bytes = target_size % logical_page_size;
1115        let physical_pages = full_pages
1116            .checked_add(u64::from(partial_bytes > 0))
1117            .ok_or(Error::OffsetOverflow)?;
1118        let new_physical_size = physical_pages
1119            .checked_mul(physical_page_size)
1120            .ok_or(Error::OffsetOverflow)?;
1121        let tail_offset = full_pages
1122            .checked_mul(logical_page_size)
1123            .ok_or(Error::OffsetOverflow)?;
1124        let current_physical_size = if blob_guard.partial_page_state.is_some() {
1125            blob_guard
1126                .current_page
1127                .checked_add(1)
1128                .and_then(|pages| pages.checked_mul(physical_page_size))
1129                .ok_or(Error::OffsetOverflow)?
1130        } else {
1131            blob_guard
1132                .current_page
1133                .checked_mul(physical_page_size)
1134                .ok_or(Error::OffsetOverflow)?
1135        };
1136
1137        // A logical shrink can leave the physical page count unchanged. Only real physical
1138        // resizes need to create a pending sync.
1139        if new_physical_size != current_physical_size {
1140            blob_guard.resize(new_physical_size).await?;
1141        }
1142
1143        // Evict cached pages at or beyond the new full-page boundary. The page at
1144        // `full_pages` (if partial) is now owned by the tip buffer, and anything above is
1145        // beyond the new logical size. Leaving their pre-resize contents in the cache
1146        // lets `try_read_sync` (which bypasses the tip buffer) observe stale bytes once
1147        // the tip is repopulated.
1148        self.cache_ref.invalidate_from(self.id, full_pages);
1149
1150        if partial_bytes > 0 {
1151            return self
1152                .shrink_to_partial(
1153                    &mut buf_guard,
1154                    &mut blob_guard,
1155                    full_pages,
1156                    partial_bytes,
1157                    logical_page_size,
1158                    tail_offset,
1159                )
1160                .await;
1161        }
1162
1163        // Shrink the blob to a page boundary, which requires no CRC-slot rewrite.
1164        blob_guard.partial_page_state = None;
1165        blob_guard.current_page = full_pages;
1166        buf_guard.offset = tail_offset;
1167        buf_guard.clear();
1168
1169        Ok(())
1170    }
1171
1172    /// Perform a shrink to a partial page tip and make the shorter CRC slot authoritative.
1173    async fn shrink_to_partial(
1174        &self,
1175        buf_guard: &mut Buffer,
1176        blob_guard: &mut BlobState<B>,
1177        full_pages: u64,
1178        partial_bytes: u64,
1179        logical_page_size: u64,
1180        tail_offset: u64,
1181    ) -> Result<(), Error> {
1182        // Update blob state and buffer based on the desired logical size. The page data is
1183        // read with CRC validation, then durably rewritten below with a shorter CRC.
1184        blob_guard.current_page = full_pages;
1185        buf_guard.offset = tail_offset;
1186
1187        let (page_data, old_crc) = super::get_page_with_checksum_from_blob(
1188            &blob_guard.blob,
1189            full_pages,
1190            logical_page_size,
1191        )
1192        .await?;
1193
1194        // Ensure the validated data covers what we need.
1195        if (page_data.len() as u64) < partial_bytes {
1196            return Err(Error::InvalidChecksum);
1197        }
1198
1199        buf_guard.clear();
1200        let new_data = &page_data.as_ref()[..partial_bytes as usize];
1201        let over_capacity = buf_guard.append(new_data);
1202        assert!(!over_capacity);
1203
1204        let final_record = Self::sync_partial_page_shrink(
1205            blob_guard,
1206            full_pages,
1207            logical_page_size,
1208            partial_bytes as u16,
1209            Crc32::checksum(new_data),
1210            &old_crc,
1211        )
1212        .await?;
1213        blob_guard.partial_page_state = Some(final_record);
1214
1215        Ok(())
1216    }
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use super::*;
1222    use crate::{
1223        buffer::tests::SyncTrackingBlob, deterministic, telemetry::metrics::Registry, Buf,
1224        BufferPool, BufferPoolConfig, IoBufsMut, Runner as _, Storage as _,
1225    };
1226    use commonware_codec::ReadExt;
1227    use commonware_macros::test_traced;
1228    use commonware_utils::{NZUsize, NZU16, NZU32};
1229    use std::{
1230        num::NonZeroU16,
1231        sync::{
1232            atomic::{AtomicUsize, Ordering},
1233            Arc,
1234        },
1235    };
1236
1237    const PAGE_SIZE: NonZeroU16 = NZU16!(103); // janky size to ensure we test page alignment
1238    const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
1239
1240    #[test_traced("DEBUG")]
1241    fn test_read_many_into_empty() {
1242        let executor = deterministic::Runner::default();
1243        executor.start(|context: deterministic::Context| async move {
1244            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1245            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1246            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1247                .await
1248                .unwrap();
1249
1250            append.append(&[0u8; 8]).await.unwrap();
1251            assert_eq!(append.size().await, 8);
1252
1253            // Empty offsets should succeed immediately.
1254            let mut buf = [];
1255            append.read_many_into(&mut buf, &[], 4).await.unwrap();
1256        });
1257    }
1258
1259    #[test_traced("DEBUG")]
1260    fn test_read_many_into_all_in_tip() {
1261        // All items reside in the unflushed tip buffer.
1262        let executor = deterministic::Runner::default();
1263        executor.start(|context: deterministic::Context| async move {
1264            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1265            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1266            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1267                .await
1268                .unwrap();
1269
1270            let data: Vec<u8> = (0..20).collect();
1271            append.append(&data).await.unwrap();
1272            assert_eq!(append.size().await, 20);
1273
1274            // Read 4-byte items at offsets 0, 4, 8, 12, 16.
1275            let offsets = [0u64, 4, 8, 12, 16];
1276            let mut buf = vec![0u8; 5 * 4];
1277            append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1278
1279            for (i, &off) in offsets.iter().enumerate() {
1280                assert_eq!(
1281                    &buf[i * 4..(i + 1) * 4],
1282                    &data[off as usize..off as usize + 4],
1283                );
1284            }
1285        });
1286    }
1287
1288    #[test_traced("DEBUG")]
1289    fn test_try_read_sync_all_in_tip() {
1290        let executor = deterministic::Runner::default();
1291        executor.start(|context: deterministic::Context| async move {
1292            let (blob, blob_size) = context
1293                .open("test_partition", b"try_read_sync_tip")
1294                .await
1295                .unwrap();
1296            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1297            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1298                .await
1299                .unwrap();
1300
1301            let data: Vec<u8> = (0..20).collect();
1302            append.append(&data).await.unwrap();
1303
1304            let mut buf = vec![0u8; data.len()];
1305            assert!(append.try_read_sync(0, &mut buf));
1306            assert_eq!(buf, data);
1307        });
1308    }
1309
1310    #[test_traced("DEBUG")]
1311    fn test_read_many_into_all_from_cache() {
1312        // Sync data to disk so tip buffer is empty; reads go through page cache / blob.
1313        let executor = deterministic::Runner::default();
1314        executor.start(|context: deterministic::Context| async move {
1315            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1316            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1317            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1318                .await
1319                .unwrap();
1320
1321            let data: Vec<u8> = (0..20).collect();
1322            append.append(&data).await.unwrap();
1323            append.sync().await.unwrap();
1324            assert_eq!(append.size().await, 20);
1325
1326            let offsets = [0u64, 8, 16];
1327            let mut buf = vec![0u8; 3 * 4];
1328            append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1329
1330            for (i, &off) in offsets.iter().enumerate() {
1331                assert_eq!(
1332                    &buf[i * 4..(i + 1) * 4],
1333                    &data[off as usize..off as usize + 4],
1334                );
1335            }
1336        });
1337    }
1338
1339    #[test_traced("DEBUG")]
1340    fn test_read_many_into_mixed_tip_and_cache() {
1341        // First chunk synced to disk, second chunk still in tip buffer.
1342        let executor = deterministic::Runner::default();
1343        executor.start(|context: deterministic::Context| async move {
1344            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1345            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1346            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1347                .await
1348                .unwrap();
1349
1350            let first: Vec<u8> = (0..16).collect();
1351            append.append(&first).await.unwrap();
1352            append.sync().await.unwrap();
1353
1354            let second: Vec<u8> = (16..32).collect();
1355            append.append(&second).await.unwrap();
1356            assert_eq!(append.size().await, 32);
1357
1358            // Offsets span both synced and unsynced regions.
1359            let offsets = [0u64, 4, 16, 24];
1360            let mut buf = vec![0u8; 4 * 4];
1361            append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1362
1363            let all: Vec<u8> = (0..32).collect();
1364            for (i, &off) in offsets.iter().enumerate() {
1365                assert_eq!(
1366                    &buf[i * 4..(i + 1) * 4],
1367                    &all[off as usize..off as usize + 4],
1368                );
1369            }
1370        });
1371    }
1372
1373    #[test_traced("DEBUG")]
1374    fn test_read_many_into_out_of_bounds() {
1375        let executor = deterministic::Runner::default();
1376        executor.start(|context: deterministic::Context| async move {
1377            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1378            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1379            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1380                .await
1381                .unwrap();
1382
1383            append.append(&[0u8; 8]).await.unwrap();
1384            assert_eq!(append.size().await, 8);
1385
1386            // Last offset's end (8 + 4 = 12) exceeds size (8).
1387            let mut buf = vec![0u8; 4];
1388            let err = append.read_many_into(&mut buf, &[8], 4).await.unwrap_err();
1389            assert!(matches!(err, Error::BlobInsufficientLength));
1390        });
1391    }
1392
1393    #[test_traced("DEBUG")]
1394    fn test_read_many_into_single_item() {
1395        let executor = deterministic::Runner::default();
1396        executor.start(|context: deterministic::Context| async move {
1397            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1398            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1399            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1400                .await
1401                .unwrap();
1402
1403            let data = vec![0xAA; 8];
1404            append.append(&data).await.unwrap();
1405            assert_eq!(append.size().await, 8);
1406
1407            let mut buf = vec![0u8; 8];
1408            append.read_many_into(&mut buf, &[0], 8).await.unwrap();
1409            assert_eq!(&buf, &data);
1410        });
1411    }
1412
1413    #[test]
1414    #[should_panic(expected = "read_many_into requires buf.len() == offsets.len() * item_size")]
1415    fn test_read_many_into_short_buffer_panics() {
1416        let executor = deterministic::Runner::default();
1417        executor.start(|context: deterministic::Context| async move {
1418            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1419            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1420            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1421                .await
1422                .unwrap();
1423
1424            let data: Vec<u8> = (0..16).collect();
1425            append.append(&data).await.unwrap();
1426
1427            let offsets = [0u64, 4];
1428            let mut buf = vec![0u8; 7];
1429            append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1430        });
1431    }
1432
1433    #[test_traced("DEBUG")]
1434    fn test_read_many_into_matches_read_at() {
1435        // Verify read_many_into returns the same bytes as individual read_at calls.
1436        let executor = deterministic::Runner::default();
1437        executor.start(|context: deterministic::Context| async move {
1438            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1439            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1440            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1441                .await
1442                .unwrap();
1443
1444            // Write enough data to span multiple pages (PAGE_SIZE=103).
1445            let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
1446            append.append(&data).await.unwrap();
1447            append.sync().await.unwrap();
1448            // Add more in tip buffer.
1449            let more: Vec<u8> = (0u8..50).collect();
1450            append.append(&more).await.unwrap();
1451            assert_eq!(append.size().await, 350);
1452
1453            let item_size = 10;
1454            let offsets: Vec<u64> = (0..35).map(|i| i * item_size as u64).collect();
1455            let mut batch_buf = vec![0u8; offsets.len() * item_size];
1456            append
1457                .read_many_into(&mut batch_buf, &offsets, item_size)
1458                .await
1459                .unwrap();
1460
1461            // Compare each item against individual read_at.
1462            for (i, &off) in offsets.iter().enumerate() {
1463                let single = append.read_at(off, item_size).await.unwrap().coalesce();
1464                assert_eq!(
1465                    &batch_buf[i * item_size..(i + 1) * item_size],
1466                    single.as_ref(),
1467                    "mismatch at offset {off}",
1468                );
1469            }
1470        });
1471    }
1472
1473    #[test_traced("DEBUG")]
1474    fn test_read_many_into_scattered_cache_misses() {
1475        // Exercises all three source paths in a single read_many_into call:
1476        // tip buffer, page cache hit, and page cache miss (blob I/O).
1477        // The tip holds a partial page so one item straddles the tip boundary.
1478        let executor = deterministic::Runner::default();
1479        executor.start(|context: deterministic::Context| async move {
1480            let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1481            // Small cache: only 2 pages, so we can force eviction.
1482            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(2));
1483            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1484                .await
1485                .unwrap();
1486
1487            // Write 3 pages of data and sync to disk.
1488            let synced: Vec<u8> = (0u8..=255)
1489                .cycle()
1490                .take(PAGE_SIZE.get() as usize * 3)
1491                .collect();
1492            append.append(&synced).await.unwrap();
1493            append.sync().await.unwrap();
1494
1495            // Write a partial page that stays in the tip buffer. The item_size
1496            // is chosen so the last item straddles the synced/tip boundary.
1497            let item_size = 10;
1498            let tip_len = PAGE_SIZE.get() as usize / 2;
1499            let tip: Vec<u8> = (100u8..=255).cycle().take(tip_len).collect();
1500            append.append(&tip).await.unwrap();
1501
1502            // Prime pages 0 and 2 into cache, leaving page 1 uncached.
1503            let _ = append.read_at(0, item_size).await.unwrap();
1504            let _ = append
1505                .read_at(PAGE_SIZE.get() as u64 * 2, item_size)
1506                .await
1507                .unwrap();
1508
1509            // Offset that straddles the synced/tip boundary: starts in the last
1510            // synced page, ends in the tip buffer.
1511            let straddle_off = synced.len() as u64 - (item_size as u64 / 2);
1512            let tip_off = synced.len() as u64 + item_size as u64;
1513            let offsets = [
1514                0u64,                       // page 0 (cached)
1515                PAGE_SIZE.get() as u64,     // page 1 (not cached - blob I/O)
1516                PAGE_SIZE.get() as u64 * 2, // page 2 (cached)
1517                straddle_off,               // straddles synced/tip boundary
1518                tip_off,                    // entirely in tip buffer
1519            ];
1520            let mut buf = vec![0u8; offsets.len() * item_size];
1521            append
1522                .read_many_into(&mut buf, &offsets, item_size)
1523                .await
1524                .unwrap();
1525
1526            let read: Vec<u8> = synced.iter().chain(tip.iter()).copied().collect();
1527            for (i, &off) in offsets.iter().enumerate() {
1528                assert_eq!(
1529                    &buf[i * item_size..(i + 1) * item_size],
1530                    &read[off as usize..off as usize + item_size],
1531                );
1532            }
1533        });
1534    }
1535
1536    #[test_traced("DEBUG")]
1537    fn test_append_crc_empty() {
1538        let executor = deterministic::Runner::default();
1539        executor.start(|context: deterministic::Context| async move {
1540            // Open a new blob.
1541            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1542            assert_eq!(blob_size, 0);
1543
1544            // Create a page cache reference.
1545            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1546
1547            // Create an Append wrapper.
1548            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1549                .await
1550                .unwrap();
1551
1552            // Verify initial size is 0.
1553            assert_eq!(append.size().await, 0);
1554
1555            // Close & re-open.
1556            append.sync().await.unwrap();
1557            drop(append);
1558
1559            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1560            assert_eq!(blob_size, 0); // There was no need to write a crc since there was no data.
1561
1562            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1563                .await
1564                .unwrap();
1565
1566            assert_eq!(append.size().await, 0);
1567        });
1568    }
1569
1570    #[test_traced("DEBUG")]
1571    fn test_append_crc_basic() {
1572        let executor = deterministic::Runner::default();
1573        executor.start(|context: deterministic::Context| async move {
1574            // Open a new blob.
1575            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1576            assert_eq!(blob_size, 0);
1577
1578            // Create a page cache reference.
1579            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1580
1581            // Create an Append wrapper.
1582            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1583                .await
1584                .unwrap();
1585
1586            // Verify initial size is 0.
1587            assert_eq!(append.size().await, 0);
1588
1589            // Append some bytes.
1590            let data = vec![1, 2, 3, 4, 5];
1591            append.append(&data).await.unwrap();
1592
1593            // Verify size reflects appended data.
1594            assert_eq!(append.size().await, 5);
1595
1596            // Append more bytes.
1597            let more_data = vec![6, 7, 8, 9, 10];
1598            append.append(&more_data).await.unwrap();
1599
1600            // Verify size is cumulative.
1601            assert_eq!(append.size().await, 10);
1602
1603            // Read back the first chunk and verify.
1604            let read_buf = append.read_at(0, 5).await.unwrap().coalesce();
1605            assert_eq!(read_buf, &data[..]);
1606
1607            // Read back the second chunk and verify.
1608            let read_buf = append.read_at(5, 5).await.unwrap().coalesce();
1609            assert_eq!(read_buf, &more_data[..]);
1610
1611            // Read all data at once and verify.
1612            let read_buf = append.read_at(0, 10).await.unwrap().coalesce();
1613            assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1614
1615            // Close and reopen the blob and make sure the data is still there and the trailing
1616            // checksum is written & stripped as expected.
1617            append.sync().await.unwrap();
1618            drop(append);
1619
1620            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1621            // Physical page = 103 logical + 12 Checksum = 115 bytes (padded partial page)
1622            assert_eq!(blob_size, 115);
1623            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1624                .await
1625                .unwrap();
1626            assert_eq!(append.size().await, 10); // CRC should be stripped after verification
1627
1628            // Append data that spans a page boundary.
1629            // PAGE_SIZE=103 is the logical page size. We have 10 bytes, so writing
1630            // 100 more bytes (total 110) will cross the page boundary at byte 103.
1631            let spanning_data: Vec<u8> = (11..=110).collect();
1632            append.append(&spanning_data).await.unwrap();
1633            assert_eq!(append.size().await, 110);
1634
1635            // Read back data that spans the page boundary.
1636            let read_buf = append.read_at(10, 100).await.unwrap().coalesce();
1637            assert_eq!(read_buf, &spanning_data[..]);
1638
1639            // Read all 110 bytes at once.
1640            let read_buf = append.read_at(0, 110).await.unwrap().coalesce();
1641            let expected: Vec<u8> = (1..=110).collect();
1642            assert_eq!(read_buf, &expected[..]);
1643
1644            // Drop and re-open and make sure bytes are still there.
1645            append.sync().await.unwrap();
1646            drop(append);
1647
1648            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1649            // 2 physical pages: 2 * 115 = 230 bytes
1650            assert_eq!(blob_size, 230);
1651            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1652                .await
1653                .unwrap();
1654            assert_eq!(append.size().await, 110);
1655
1656            // Append data to reach exactly a page boundary.
1657            // Logical page size is 103. We have 110 bytes, next boundary is 206 (103 * 2).
1658            // So we need 96 more bytes.
1659            let boundary_data: Vec<u8> = (111..=206).collect();
1660            assert_eq!(boundary_data.len(), 96);
1661            append.append(&boundary_data).await.unwrap();
1662            assert_eq!(append.size().await, 206);
1663
1664            // Verify we can read it back.
1665            let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
1666            let expected: Vec<u8> = (1..=206).collect();
1667            assert_eq!(read_buf, &expected[..]);
1668
1669            // Drop and re-open at the page boundary.
1670            append.sync().await.unwrap();
1671            drop(append);
1672
1673            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1674            // Physical size should be exactly 2 pages: 115 * 2 = 230 bytes
1675            assert_eq!(blob_size, 230);
1676            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1677                .await
1678                .unwrap();
1679            assert_eq!(append.size().await, 206);
1680
1681            // Verify data is still readable after reopen.
1682            let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
1683            assert_eq!(read_buf, &expected[..]);
1684        });
1685    }
1686
1687    #[test_traced("DEBUG")]
1688    fn test_sync_releases_tip_pool_slot_after_full_drain() {
1689        let executor = deterministic::Runner::default();
1690        executor.start(|context: deterministic::Context| async move {
1691            let mut registry = Registry::default();
1692            let pool = BufferPool::new(
1693                BufferPoolConfig::for_storage()
1694                    .with_pool_min_size(PAGE_SIZE.get() as usize)
1695                    .with_max_per_class(NZU32!(2)),
1696                &mut registry,
1697            );
1698            let cache_ref = CacheRef::new(pool.clone(), PAGE_SIZE, NZUsize!(1));
1699
1700            let (blob, blob_size) = context
1701                .open("test_partition", b"release_tip_backing")
1702                .await
1703                .unwrap();
1704            assert_eq!(blob_size, 0);
1705
1706            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1707                .await
1708                .unwrap();
1709
1710            append
1711                .append(&vec![7; PAGE_SIZE.get() as usize])
1712                .await
1713                .unwrap();
1714
1715            // One pooled slot backs the page cache and one backs the mutable tip.
1716            assert!(
1717                matches!(
1718                    pool.try_alloc(BUFFER_SIZE),
1719                    Err(crate::iobuf::PoolError::Exhausted)
1720                ),
1721                "full-page tip should occupy the remaining pooled slot before sync"
1722            );
1723
1724            append.sync().await.unwrap();
1725
1726            // After a full drain, the tip should no longer pin that slot.
1727            assert!(
1728                pool.try_alloc(BUFFER_SIZE).is_ok(),
1729                "sync should release pooled backing when no partial tail remains"
1730            );
1731        });
1732    }
1733
1734    #[test_traced("DEBUG")]
1735    fn test_sync_uses_range_sync_for_single_flush() {
1736        let executor = deterministic::Runner::default();
1737        executor.start(|context: deterministic::Context| async move {
1738            let blob = SyncTrackingBlob::new();
1739            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1740            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1741                .await
1742                .unwrap();
1743
1744            // A newly wrapped blob preserves one full barrier before range sync is used.
1745            append.sync().await.unwrap();
1746            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1747            assert_eq!(writes, 0);
1748            assert_eq!(full_syncs, 1);
1749            assert_eq!(range_syncs, 0);
1750
1751            // A single buffered write with no remaining dirty state can be made durable directly.
1752            let data = b"hello world";
1753            append.append(data).await.unwrap();
1754            append.sync().await.unwrap();
1755
1756            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1757            assert_eq!(writes, 1);
1758            assert_eq!(full_syncs, 1);
1759            assert_eq!(range_syncs, 1);
1760
1761            // With no new writes and no pending full-sync barrier, sync has no work left.
1762            append.sync().await.unwrap();
1763            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1764            assert_eq!(writes, 1);
1765            assert_eq!(full_syncs, 1);
1766            assert_eq!(range_syncs, 1);
1767
1768            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1769            let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1770                .await
1771                .unwrap();
1772            let read = reopened.read_at(0, data.len()).await.unwrap().coalesce();
1773            assert_eq!(read.as_ref(), data);
1774        });
1775    }
1776
1777    #[test_traced("DEBUG")]
1778    fn test_sync_failed_range_sync_does_not_mark_clean() {
1779        let executor = deterministic::Runner::default();
1780        executor.start(|context: deterministic::Context| async move {
1781            let name = b"failed_range_sync";
1782            let (blob, size) = context.open("test_partition", name).await.unwrap();
1783            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1784            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
1785                .await
1786                .unwrap();
1787
1788            // Keep the write buffered so sync attempts the clean `write_at_sync` path.
1789            append.append(b"abc").await.unwrap();
1790
1791            // Removing the blob makes the range-sync flush fail.
1792            context.remove("test_partition", Some(name)).await.unwrap();
1793            assert!(append.sync().await.is_err());
1794
1795            // The failed `write_at_sync` must leave a pending full-sync barrier, so a
1796            // later sync cannot report success.
1797            assert!(append.sync().await.is_err());
1798        });
1799    }
1800
1801    #[test_traced("DEBUG")]
1802    fn test_sync_uses_full_sync_after_prior_plain_flush() {
1803        let executor = deterministic::Runner::default();
1804        executor.start(|context: deterministic::Context| async move {
1805            let blob = SyncTrackingBlob::new();
1806            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1807            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1808                .await
1809                .unwrap();
1810
1811            // This append overflows the buffer, so a plain flush happens before sync writes the
1812            // remaining tip.
1813            let data = vec![7u8; BUFFER_SIZE + 1];
1814            append.append(&data).await.unwrap();
1815            append.sync().await.unwrap();
1816
1817            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1818            assert_eq!(writes, 2);
1819            assert_eq!(full_syncs, 1);
1820            assert_eq!(range_syncs, 0);
1821
1822            // With no new work, sync should not issue another durability operation.
1823            append.sync().await.unwrap();
1824            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1825            assert_eq!(writes, 2);
1826            assert_eq!(full_syncs, 1);
1827            assert_eq!(range_syncs, 0);
1828
1829            // The next sync still needs a full barrier because the append path flushed the full
1830            // page before the final partial tip.
1831            append.append(b"tip").await.unwrap();
1832            append.sync().await.unwrap();
1833
1834            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1835            assert_eq!(writes, 4);
1836            assert_eq!(full_syncs, 2);
1837            assert_eq!(range_syncs, 0);
1838
1839            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1840            let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1841                .await
1842                .unwrap();
1843            let mut expected = data;
1844            expected.extend_from_slice(b"tip");
1845            let read = reopened
1846                .read_at(0, expected.len())
1847                .await
1848                .unwrap()
1849                .coalesce();
1850            assert_eq!(read.as_ref(), expected.as_slice());
1851        });
1852    }
1853
1854    #[test_traced("DEBUG")]
1855    fn test_sync_uses_full_sync_after_replay_plain_flush() {
1856        let executor = deterministic::Runner::default();
1857        executor.start(|context: deterministic::Context| async move {
1858            let blob = SyncTrackingBlob::new();
1859            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1860            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1861                .await
1862                .unwrap();
1863
1864            // Keep data buffered so replay has to flush it without syncing.
1865            append.append(b"replayed").await.unwrap();
1866
1867            // Replay flushes buffered data for reading, but does not make that write durable.
1868            let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1869            assert!(replay.ensure(b"replayed".len()).await.unwrap());
1870            assert_eq!(replay.remaining(), b"replayed".len());
1871            assert_eq!(replay.chunk(), b"replayed");
1872
1873            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1874            assert_eq!(writes, 1);
1875            assert_eq!(full_syncs, 0);
1876            assert_eq!(range_syncs, 0);
1877
1878            // A later sync must use a full barrier for the plain replay flush.
1879            append.sync().await.unwrap();
1880            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1881            assert_eq!(writes, 1);
1882            assert_eq!(full_syncs, 1);
1883            assert_eq!(range_syncs, 0);
1884        });
1885    }
1886
1887    #[test_traced("DEBUG")]
1888    fn test_recreated_sync_preserves_replay_plain_flush_barrier() {
1889        let executor = deterministic::Runner::default();
1890        executor.start(|context: deterministic::Context| async move {
1891            let blob = SyncTrackingBlob::new();
1892            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1893            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1894                .await
1895                .unwrap();
1896
1897            append.append(b"replayed").await.unwrap();
1898            let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1899            assert!(replay.ensure(b"replayed".len()).await.unwrap());
1900            assert_eq!(replay.remaining(), b"replayed".len());
1901            assert_eq!(replay.chunk(), b"replayed");
1902            drop(replay);
1903            drop(append);
1904
1905            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1906            assert!(durable.is_empty());
1907            assert_eq!(writes, 1);
1908            assert_eq!(full_syncs, 0);
1909            assert_eq!(range_syncs, 0);
1910
1911            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1912            let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1913                .await
1914                .unwrap();
1915            assert_eq!(reopened.size().await, b"replayed".len() as u64);
1916            reopened.sync().await.unwrap();
1917
1918            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1919            assert_eq!(durable.len(), blob.size() as usize);
1920            assert_eq!(writes, 1);
1921            assert_eq!(full_syncs, 1);
1922            assert_eq!(range_syncs, 0);
1923        });
1924    }
1925
1926    #[test_traced("DEBUG")]
1927    fn test_recreated_sync_skips_barrier_after_invalid_truncation() {
1928        let executor = deterministic::Runner::default();
1929        executor.start(|context: deterministic::Context| async move {
1930            let blob = SyncTrackingBlob::new();
1931            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1932            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1933                .await
1934                .unwrap();
1935            append.sync().await.unwrap();
1936            append.append(b"valid").await.unwrap();
1937            append.sync().await.unwrap();
1938            drop(append);
1939
1940            blob.write_at(blob.size(), b"junk").await.unwrap();
1941
1942            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1943            let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1944                .await
1945                .unwrap();
1946            assert_eq!(reopened.size().await, b"valid".len() as u64);
1947
1948            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1949            assert_eq!(writes, 2);
1950            assert_eq!(full_syncs, 2);
1951            assert_eq!(range_syncs, 1);
1952
1953            reopened.sync().await.unwrap();
1954
1955            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1956            assert_eq!(writes, 2);
1957            assert_eq!(full_syncs, 2);
1958            assert_eq!(range_syncs, 1);
1959        });
1960    }
1961
1962    #[test_traced("DEBUG")]
1963    fn test_sync_batches_split_protected_writes_with_full_sync() {
1964        let executor = deterministic::Runner::default();
1965        executor.start(|context: deterministic::Context| async move {
1966            let blob = SyncTrackingBlob::new();
1967            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1968            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
1969                .await
1970                .unwrap();
1971            append.sync().await.unwrap();
1972
1973            // Establish a persisted partial page with one authoritative CRC slot.
1974            append.append(b"abc").await.unwrap();
1975            append.sync().await.unwrap();
1976
1977            // Extending that partial page must write around the protected slot, so the two emitted
1978            // writes are batched behind one full sync.
1979            append.append(b"de").await.unwrap();
1980            append.sync().await.unwrap();
1981
1982            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1983            assert_eq!(writes, 3);
1984            assert_eq!(full_syncs, 2);
1985            assert_eq!(range_syncs, 1);
1986
1987            // On the next extension, the protected slot is the second CRC, so only the prefix
1988            // write is needed.
1989            append.append(b"fg").await.unwrap();
1990            append.sync().await.unwrap();
1991
1992            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
1993            assert_eq!(writes, 4);
1994            assert_eq!(full_syncs, 2);
1995            assert_eq!(range_syncs, 2);
1996
1997            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1998            let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
1999                .await
2000                .unwrap();
2001            let read = reopened.read_at(0, 7).await.unwrap().coalesce();
2002            assert_eq!(read.as_ref(), b"abcdefg");
2003        });
2004    }
2005
2006    #[test_traced("DEBUG")]
2007    fn test_read_up_to_zero_len_truncates_buffer() {
2008        let executor = deterministic::Runner::default();
2009        executor.start(|context: deterministic::Context| async move {
2010            // Open a new blob.
2011            let (blob, blob_size) = context
2012                .open("test_partition", b"read_up_to_zero_len")
2013                .await
2014                .unwrap();
2015            assert_eq!(blob_size, 0);
2016
2017            // Create a page cache reference.
2018            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2019
2020            // Create an Append wrapper and write some data.
2021            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
2022                .await
2023                .unwrap();
2024            append.append(&[1, 2, 3, 4]).await.unwrap();
2025
2026            // Request a zero-length read with a reused, non-empty buffer.
2027            let stale = vec![9, 8, 7, 6];
2028            let (buf, read) = append.read_up_to(0, 0, stale).await.unwrap();
2029
2030            assert_eq!(read, 0);
2031            assert_eq!(buf.len(), 0, "read_up_to must truncate returned buffer");
2032            assert_eq!(buf.freeze().as_ref(), b"");
2033        });
2034    }
2035
2036    /// Helper to read the CRC record from raw blob bytes at the end of a physical page.
2037    fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
2038        let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
2039        Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
2040    }
2041
2042    /// Blob wrapper that turns one write into a durable partial write followed by an error.
2043    #[derive(Clone)]
2044    struct PartialWriteBlob<B: Blob> {
2045        inner: B,
2046        writes: Arc<AtomicUsize>,
2047        failed_write_len: Arc<AtomicUsize>,
2048        fail_on: usize,
2049        partial_len: usize,
2050    }
2051
2052    impl<B: Blob> PartialWriteBlob<B> {
2053        fn new(inner: B, fail_on: usize, partial_len: usize) -> Self {
2054            Self {
2055                inner,
2056                writes: Arc::new(AtomicUsize::new(0)),
2057                failed_write_len: Arc::new(AtomicUsize::new(0)),
2058                fail_on,
2059                partial_len,
2060            }
2061        }
2062
2063        fn failed_write_len(&self) -> Arc<AtomicUsize> {
2064            self.failed_write_len.clone()
2065        }
2066
2067        fn write_count(&self) -> Arc<AtomicUsize> {
2068            self.writes.clone()
2069        }
2070    }
2071
2072    impl<B: Blob> crate::Blob for PartialWriteBlob<B> {
2073        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
2074            self.inner.read_at(offset, len).await
2075        }
2076
2077        async fn read_at_buf(
2078            &self,
2079            offset: u64,
2080            len: usize,
2081            bufs: impl Into<IoBufsMut> + Send,
2082        ) -> Result<IoBufsMut, Error> {
2083            self.inner.read_at_buf(offset, len, bufs).await
2084        }
2085
2086        async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
2087            let bufs = bufs.into();
2088            let write = self.writes.fetch_add(1, Ordering::SeqCst) + 1;
2089            if write == self.fail_on {
2090                let bytes = bufs.coalesce();
2091                self.failed_write_len.store(bytes.len(), Ordering::SeqCst);
2092                let partial_len = self.partial_len.min(bytes.len());
2093                self.inner
2094                    .write_at(offset, bytes.slice(..partial_len))
2095                    .await?;
2096                self.inner.sync().await?;
2097                return Err(Error::Io(std::io::Error::other("injected partial write")));
2098            }
2099
2100            self.inner.write_at(offset, bufs).await
2101        }
2102
2103        async fn write_at_sync(
2104            &self,
2105            offset: u64,
2106            bufs: impl Into<IoBufs> + Send,
2107        ) -> Result<(), Error> {
2108            let bufs = bufs.into();
2109            let write = self.writes.fetch_add(1, Ordering::SeqCst) + 1;
2110            if write == self.fail_on {
2111                let bytes = bufs.coalesce();
2112                self.failed_write_len.store(bytes.len(), Ordering::SeqCst);
2113                let partial_len = self.partial_len.min(bytes.len());
2114                self.inner
2115                    .write_at_sync(offset, bytes.slice(..partial_len))
2116                    .await?;
2117                return Err(Error::Io(std::io::Error::other("injected partial write")));
2118            }
2119
2120            self.inner.write_at_sync(offset, bufs).await
2121        }
2122
2123        async fn resize(&self, len: u64) -> Result<(), Error> {
2124            self.inner.resize(len).await
2125        }
2126
2127        async fn sync(&self) -> Result<(), Error> {
2128            self.inner.sync().await
2129        }
2130    }
2131
2132    /// Dummy marker bytes with len=0 so the mangled slot is never authoritative.
2133    /// Format: [len_hi=0, len_lo=0, 0xDE, 0xAD, 0xBE, 0xEF]
2134    const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
2135
2136    #[test]
2137    fn test_identify_protected_regions_equal_lengths() {
2138        // When lengths are equal, the first CRC should be protected (tie-breaking rule).
2139        let record = Checksum {
2140            len1: 50,
2141            crc1: 0xAAAAAAAA,
2142            len2: 50,
2143            crc2: 0xBBBBBBBB,
2144        };
2145
2146        let result =
2147            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
2148        assert!(result.is_some());
2149        let (prefix_len, protected_crc) = result.unwrap();
2150        assert_eq!(prefix_len, 50);
2151        assert!(
2152            matches!(protected_crc, ProtectedCrc::First),
2153            "First CRC should be protected when lengths are equal"
2154        );
2155    }
2156
2157    #[test]
2158    fn test_identify_protected_regions_len1_larger() {
2159        // When len1 > len2, the first CRC should be protected.
2160        let record = Checksum {
2161            len1: 100,
2162            crc1: 0xAAAAAAAA,
2163            len2: 50,
2164            crc2: 0xBBBBBBBB,
2165        };
2166
2167        let result =
2168            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
2169        assert!(result.is_some());
2170        let (prefix_len, protected_crc) = result.unwrap();
2171        assert_eq!(prefix_len, 100);
2172        assert!(
2173            matches!(protected_crc, ProtectedCrc::First),
2174            "First CRC should be protected when len1 > len2"
2175        );
2176    }
2177
2178    #[test]
2179    fn test_identify_protected_regions_len2_larger() {
2180        // When len2 > len1, the second CRC should be protected.
2181        let record = Checksum {
2182            len1: 50,
2183            crc1: 0xAAAAAAAA,
2184            len2: 100,
2185            crc2: 0xBBBBBBBB,
2186        };
2187
2188        let result =
2189            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
2190        assert!(result.is_some());
2191        let (prefix_len, protected_crc) = result.unwrap();
2192        assert_eq!(prefix_len, 100);
2193        assert!(
2194            matches!(protected_crc, ProtectedCrc::Second),
2195            "Second CRC should be protected when len2 > len1"
2196        );
2197    }
2198
2199    /// Test that `to_physical_pages` emits full pages zero-copy while still materializing the
2200    /// trailing partial page into one padded physical page.
2201    #[test_traced("DEBUG")]
2202    fn test_to_physical_pages_zero_copy_full_pages_and_materialized_partial() {
2203        // Build a tip buffer containing two full logical pages plus a trailing partial
2204        // page, convert it with `to_physical_pages`, then verify:
2205        // - the result is chunked rather than one contiguous buffer for the full-page portion
2206        // - the logical payload bytes for the first two pages are preserved in order
2207        // - the partial page is padded with zeros up to one full logical page
2208        // - all three resulting physical pages validate their CRC records
2209        let executor = deterministic::Runner::default();
2210        executor.start(|context: deterministic::Context| async move {
2211            // Open a new blob.
2212            let (blob, blob_size) = context
2213                .open("test_partition", b"to_physical_pages_zero_copy")
2214                .await
2215                .unwrap();
2216            assert_eq!(blob_size, 0);
2217
2218            // Create a page cache reference.
2219            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2220
2221            // Create an Append wrapper.
2222            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
2223                .await
2224                .unwrap();
2225
2226            // Build logical data with exactly two full pages followed by one trailing partial page.
2227            // This lets us verify that only the partial page is materialized.
2228            let logical_page_size = PAGE_SIZE.get() as usize;
2229            let partial_len = 17usize;
2230            let data: Vec<u8> = (0..(logical_page_size * 2 + partial_len))
2231                .map(|i| (i % 251) as u8)
2232                .collect();
2233
2234            // Seed a tip buffer with the logical bytes exactly as flush_internal would see them.
2235            let mut buffer = Buffer::new(0, data.len(), cache_ref.pool().clone());
2236            let over_capacity = buffer.append(&data);
2237            assert!(!over_capacity);
2238
2239            // Convert buffered logical bytes into physical-page writes.
2240            let (physical_pages, partial_page_state) =
2241                append.to_physical_pages(&buffer, true, None);
2242
2243            // Two full pages should each contribute a logical slice and a CRC slice, and the
2244            // trailing partial page should contribute one materialized padded physical page.
2245            assert_eq!(physical_pages.chunk_count(), 5);
2246
2247            // The returned partial-page CRC state must describe the exact trailing logical length.
2248            let crc_record = partial_page_state.expect("partial page state must be returned");
2249            let (len, _) = crc_record.get_crc();
2250            assert_eq!(len as usize, partial_len);
2251
2252            // Coalesce for easier content inspection. The assembled bytes should still form three
2253            // full physical pages on disk.
2254            let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
2255            let coalesced = physical_pages.coalesce();
2256            assert_eq!(coalesced.len(), physical_page_size * 3);
2257
2258            // The first two physical pages must preserve the two full logical pages verbatim.
2259            assert_eq!(
2260                &coalesced.as_ref()[..logical_page_size],
2261                &data[..logical_page_size]
2262            );
2263            assert_eq!(
2264                &coalesced.as_ref()[physical_page_size..physical_page_size + logical_page_size],
2265                &data[logical_page_size..logical_page_size * 2],
2266            );
2267
2268            // The trailing partial page must contain the remaining logical bytes followed by zero
2269            // padding up to one full logical page.
2270            let partial_start = physical_page_size * 2;
2271            assert_eq!(
2272                &coalesced.as_ref()[partial_start..partial_start + partial_len],
2273                &data[logical_page_size * 2..],
2274            );
2275            assert!(coalesced.as_ref()
2276                [partial_start + partial_len..partial_start + logical_page_size]
2277                .iter()
2278                .all(|byte| *byte == 0));
2279
2280            // Each assembled physical page must carry a valid CRC record.
2281            assert!(Checksum::validate_page(&coalesced.as_ref()[..physical_page_size]).is_some());
2282            assert!(Checksum::validate_page(
2283                &coalesced.as_ref()[physical_page_size..physical_page_size * 2]
2284            )
2285            .is_some());
2286            assert!(Checksum::validate_page(
2287                &coalesced.as_ref()[physical_page_size * 2..physical_page_size * 3]
2288            )
2289            .is_some());
2290        });
2291    }
2292
2293    /// Test that slot 1 is NOT overwritten when it's the protected slot.
2294    ///
2295    /// Strategy: After extending twice (so slot 1 becomes authoritative with larger len),
2296    /// mangle the non-authoritative slot 0. Then extend again - slot 0 should be overwritten
2297    /// with the new CRC, while slot 1 (protected) should remain untouched.
2298    #[test_traced("DEBUG")]
2299    fn test_crc_slot1_protected() {
2300        let executor = deterministic::Runner::default();
2301        executor.start(|context: deterministic::Context| async move {
2302            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2303            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2304            let slot0_offset = PAGE_SIZE.get() as u64;
2305            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
2306
2307            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
2308            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
2309            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2310                .await
2311                .unwrap();
2312            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
2313            append.sync().await.unwrap();
2314            drop(append);
2315
2316            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
2317            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
2318            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2319                .await
2320                .unwrap();
2321            append
2322                .append(&(11..=30).collect::<Vec<u8>>())
2323                .await
2324                .unwrap();
2325            append.sync().await.unwrap();
2326            drop(append);
2327
2328            // Verify slot 1 is now authoritative
2329            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
2330            let page = blob
2331                .read_at(0, physical_page_size)
2332                .await
2333                .unwrap()
2334                .coalesce();
2335            let crc = read_crc_record_from_page(page.as_ref());
2336            assert!(
2337                crc.len2 > crc.len1,
2338                "Slot 1 should be authoritative (len2={} > len1={})",
2339                crc.len2,
2340                crc.len1
2341            );
2342
2343            // Capture slot 1 bytes before mangling slot 0
2344            let slot1_before: Vec<u8> = blob
2345                .read_at(slot1_offset, 6)
2346                .await
2347                .unwrap()
2348                .coalesce()
2349                .freeze()
2350                .into();
2351
2352            // === Step 3: Mangle slot 0 (non-authoritative) ===
2353            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
2354                .await
2355                .unwrap();
2356            blob.sync().await.unwrap();
2357
2358            // Verify mangle worked
2359            let slot0_mangled: Vec<u8> = blob
2360                .read_at(slot0_offset, 6)
2361                .await
2362                .unwrap()
2363                .coalesce()
2364                .freeze()
2365                .into();
2366            assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
2367
2368            // === Step 4: Extend to 50 bytes → new CRC goes to slot 0, slot 1 protected ===
2369            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2370                .await
2371                .unwrap();
2372            append
2373                .append(&(31..=50).collect::<Vec<u8>>())
2374                .await
2375                .unwrap();
2376            append.sync().await.unwrap();
2377            drop(append);
2378
2379            // === Step 5: Verify slot 0 was overwritten, slot 1 unchanged ===
2380            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
2381
2382            // Slot 0 should have new CRC (not our dummy marker)
2383            let slot0_after: Vec<u8> = blob
2384                .read_at(slot0_offset, 6)
2385                .await
2386                .unwrap()
2387                .coalesce()
2388                .freeze()
2389                .into();
2390            assert_ne!(
2391                slot0_after, DUMMY_MARKER,
2392                "Slot 0 should have been overwritten with new CRC"
2393            );
2394
2395            // Slot 1 should be UNCHANGED (protected)
2396            let slot1_after: Vec<u8> = blob
2397                .read_at(slot1_offset, 6)
2398                .await
2399                .unwrap()
2400                .coalesce()
2401                .freeze()
2402                .into();
2403            assert_eq!(
2404                slot1_before, slot1_after,
2405                "Slot 1 was modified! Protected region violated."
2406            );
2407
2408            // Verify the new CRC in slot 0 has len=50
2409            let page = blob
2410                .read_at(0, physical_page_size)
2411                .await
2412                .unwrap()
2413                .coalesce();
2414            let crc = read_crc_record_from_page(page.as_ref());
2415            assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
2416        });
2417    }
2418
2419    /// Test that slot 0 is NOT overwritten when it's the protected slot.
2420    ///
2421    /// Strategy: After extending three times (slot 0 becomes authoritative again with largest len),
2422    /// mangle the non-authoritative slot 1. Then extend again - slot 1 should be overwritten
2423    /// with the new CRC, while slot 0 (protected) should remain untouched.
2424    #[test_traced("DEBUG")]
2425    fn test_crc_slot0_protected() {
2426        let executor = deterministic::Runner::default();
2427        executor.start(|context: deterministic::Context| async move {
2428            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2429            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2430            let slot0_offset = PAGE_SIZE.get() as u64;
2431            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
2432
2433            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
2434            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
2435            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2436                .await
2437                .unwrap();
2438            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
2439            append.sync().await.unwrap();
2440            drop(append);
2441
2442            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
2443            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
2444            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2445                .await
2446                .unwrap();
2447            append
2448                .append(&(11..=30).collect::<Vec<u8>>())
2449                .await
2450                .unwrap();
2451            append.sync().await.unwrap();
2452            drop(append);
2453
2454            // === Step 3: Extend to 50 bytes → slot 0 authoritative (len=50) ===
2455            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
2456            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2457                .await
2458                .unwrap();
2459            append
2460                .append(&(31..=50).collect::<Vec<u8>>())
2461                .await
2462                .unwrap();
2463            append.sync().await.unwrap();
2464            drop(append);
2465
2466            // Verify slot 0 is now authoritative
2467            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
2468            let page = blob
2469                .read_at(0, physical_page_size)
2470                .await
2471                .unwrap()
2472                .coalesce();
2473            let crc = read_crc_record_from_page(page.as_ref());
2474            assert!(
2475                crc.len1 > crc.len2,
2476                "Slot 0 should be authoritative (len1={} > len2={})",
2477                crc.len1,
2478                crc.len2
2479            );
2480
2481            // Capture slot 0 bytes before mangling slot 1
2482            let slot0_before: Vec<u8> = blob
2483                .read_at(slot0_offset, 6)
2484                .await
2485                .unwrap()
2486                .coalesce()
2487                .freeze()
2488                .into();
2489
2490            // === Step 4: Mangle slot 1 (non-authoritative) ===
2491            blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
2492                .await
2493                .unwrap();
2494            blob.sync().await.unwrap();
2495
2496            // Verify mangle worked
2497            let slot1_mangled: Vec<u8> = blob
2498                .read_at(slot1_offset, 6)
2499                .await
2500                .unwrap()
2501                .coalesce()
2502                .freeze()
2503                .into();
2504            assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
2505
2506            // === Step 5: Extend to 70 bytes → new CRC goes to slot 1, slot 0 protected ===
2507            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2508                .await
2509                .unwrap();
2510            append
2511                .append(&(51..=70).collect::<Vec<u8>>())
2512                .await
2513                .unwrap();
2514            append.sync().await.unwrap();
2515            drop(append);
2516
2517            // === Step 6: Verify slot 1 was overwritten, slot 0 unchanged ===
2518            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
2519
2520            // Slot 1 should have new CRC (not our dummy marker)
2521            let slot1_after: Vec<u8> = blob
2522                .read_at(slot1_offset, 6)
2523                .await
2524                .unwrap()
2525                .coalesce()
2526                .freeze()
2527                .into();
2528            assert_ne!(
2529                slot1_after, DUMMY_MARKER,
2530                "Slot 1 should have been overwritten with new CRC"
2531            );
2532
2533            // Slot 0 should be UNCHANGED (protected)
2534            let slot0_after: Vec<u8> = blob
2535                .read_at(slot0_offset, 6)
2536                .await
2537                .unwrap()
2538                .coalesce()
2539                .freeze()
2540                .into();
2541            assert_eq!(
2542                slot0_before, slot0_after,
2543                "Slot 0 was modified! Protected region violated."
2544            );
2545
2546            // Verify the new CRC in slot 1 has len=70
2547            let page = blob
2548                .read_at(0, physical_page_size)
2549                .await
2550                .unwrap()
2551                .coalesce();
2552            let crc = read_crc_record_from_page(page.as_ref());
2553            assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
2554        });
2555    }
2556
2557    /// Test that the data prefix is NOT overwritten when extending a partial page.
2558    ///
2559    /// Strategy: Write data, then mangle the padding area (between data end and CRC start).
2560    /// After extending, the original data should be unchanged but the mangled padding
2561    /// should be overwritten with new data.
2562    #[test_traced("DEBUG")]
2563    fn test_data_prefix_not_overwritten() {
2564        let executor = deterministic::Runner::default();
2565        executor.start(|context: deterministic::Context| async move {
2566            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2567            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2568
2569            // === Step 1: Write 20 bytes ===
2570            let (blob, _) = context
2571                .open("test_partition", b"prefix_test")
2572                .await
2573                .unwrap();
2574            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2575                .await
2576                .unwrap();
2577            let data1: Vec<u8> = (1..=20).collect();
2578            append.append(&data1).await.unwrap();
2579            append.sync().await.unwrap();
2580            drop(append);
2581
2582            // === Step 2: Capture the first 20 bytes and mangle bytes 25-30 (in padding area) ===
2583            let (blob, size) = context
2584                .open("test_partition", b"prefix_test")
2585                .await
2586                .unwrap();
2587            assert_eq!(size, physical_page_size as u64);
2588
2589            let prefix_before: Vec<u8> = blob
2590                .read_at(0, 20)
2591                .await
2592                .unwrap()
2593                .coalesce()
2594                .freeze()
2595                .into();
2596
2597            // Mangle bytes 25-30 (safely in the padding area, after our 20 bytes of data)
2598            blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
2599            blob.sync().await.unwrap();
2600
2601            // === Step 3: Extend to 40 bytes ===
2602            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2603                .await
2604                .unwrap();
2605            append
2606                .append(&(21..=40).collect::<Vec<u8>>())
2607                .await
2608                .unwrap();
2609            append.sync().await.unwrap();
2610            drop(append);
2611
2612            // === Step 4: Verify prefix unchanged, mangled area overwritten ===
2613            let (blob, _) = context
2614                .open("test_partition", b"prefix_test")
2615                .await
2616                .unwrap();
2617
2618            // Original 20 bytes should be unchanged
2619            let prefix_after: Vec<u8> = blob
2620                .read_at(0, 20)
2621                .await
2622                .unwrap()
2623                .coalesce()
2624                .freeze()
2625                .into();
2626            assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
2627
2628            // Bytes at offset 25-30: data (21..=40) starts at offset 20, so offset 25 has value 26
2629            let overwritten: Vec<u8> = blob
2630                .read_at(25, 6)
2631                .await
2632                .unwrap()
2633                .coalesce()
2634                .freeze()
2635                .into();
2636            assert_eq!(
2637                overwritten,
2638                vec![26, 27, 28, 29, 30, 31],
2639                "New data should overwrite padding area"
2640            );
2641        });
2642    }
2643
2644    /// Test CRC slot protection when extending past a page boundary.
2645    ///
2646    /// Strategy: Write partial page, mangle slot 0 (non-authoritative after we do first extend),
2647    /// then extend past page boundary. Verify slot 0 gets new full-page CRC while
2648    /// the mangled marker is overwritten, and second page is written correctly.
2649    #[test_traced("DEBUG")]
2650    fn test_crc_slot_protection_across_page_boundary() {
2651        let executor = deterministic::Runner::default();
2652        executor.start(|context: deterministic::Context| async move {
2653            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2654            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2655            let slot0_offset = PAGE_SIZE.get() as u64;
2656            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
2657
2658            // === Step 1: Write 50 bytes → slot 0 authoritative ===
2659            let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
2660            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2661                .await
2662                .unwrap();
2663            append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
2664            append.sync().await.unwrap();
2665            drop(append);
2666
2667            // === Step 2: Extend to 80 bytes → slot 1 authoritative ===
2668            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
2669            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2670                .await
2671                .unwrap();
2672            append
2673                .append(&(51..=80).collect::<Vec<u8>>())
2674                .await
2675                .unwrap();
2676            append.sync().await.unwrap();
2677            drop(append);
2678
2679            // Verify slot 1 is authoritative
2680            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
2681            let page = blob
2682                .read_at(0, physical_page_size)
2683                .await
2684                .unwrap()
2685                .coalesce();
2686            let crc = read_crc_record_from_page(page.as_ref());
2687            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
2688
2689            // Capture slot 1 before extending past page boundary
2690            let slot1_before: Vec<u8> = blob
2691                .read_at(slot1_offset, 6)
2692                .await
2693                .unwrap()
2694                .coalesce()
2695                .freeze()
2696                .into();
2697
2698            // Mangle slot 0 (non-authoritative)
2699            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
2700                .await
2701                .unwrap();
2702            blob.sync().await.unwrap();
2703
2704            // === Step 3: Extend past page boundary (80 + 40 = 120, PAGE_SIZE=103) ===
2705            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2706                .await
2707                .unwrap();
2708            append
2709                .append(&(81..=120).collect::<Vec<u8>>())
2710                .await
2711                .unwrap();
2712            append.sync().await.unwrap();
2713            drop(append);
2714
2715            // === Step 4: Verify results ===
2716            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
2717            assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
2718
2719            // Slot 0 should have been overwritten with full-page CRC (not dummy marker)
2720            let slot0_after: Vec<u8> = blob
2721                .read_at(slot0_offset, 6)
2722                .await
2723                .unwrap()
2724                .coalesce()
2725                .freeze()
2726                .into();
2727            assert_ne!(
2728                slot0_after, DUMMY_MARKER,
2729                "Slot 0 should have full-page CRC"
2730            );
2731
2732            // Slot 1 should be UNCHANGED (protected during boundary crossing)
2733            let slot1_after: Vec<u8> = blob
2734                .read_at(slot1_offset, 6)
2735                .await
2736                .unwrap()
2737                .coalesce()
2738                .freeze()
2739                .into();
2740            assert_eq!(
2741                slot1_before, slot1_after,
2742                "Slot 1 was modified during page boundary crossing!"
2743            );
2744
2745            // Verify page 0 has correct CRC structure
2746            let page0 = blob
2747                .read_at(0, physical_page_size)
2748                .await
2749                .unwrap()
2750                .coalesce();
2751            let crc0 = read_crc_record_from_page(page0.as_ref());
2752            assert_eq!(
2753                crc0.len1,
2754                PAGE_SIZE.get(),
2755                "Slot 0 should have full page length"
2756            );
2757
2758            // Verify data integrity
2759            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2760                .await
2761                .unwrap();
2762            assert_eq!(append.size().await, 120);
2763            let all_data: Vec<u8> = append.read_at(0, 120).await.unwrap().coalesce().into();
2764            let expected: Vec<u8> = (1..=120).collect();
2765            assert_eq!(all_data, expected);
2766        });
2767    }
2768
2769    /// Test that corrupting the primary CRC (but not its length) causes fallback to the previous
2770    /// partial page contents.
2771    ///
2772    /// Strategy:
2773    /// 1. Write 10 bytes → slot 0 authoritative (len=10, valid crc)
2774    /// 2. Extend to 30 bytes → slot 1 authoritative (len=30, valid crc)
2775    /// 3. Corrupt ONLY the crc2 value in slot 1 (not the length)
2776    /// 4. Re-open and verify we fall back to slot 0's 10 bytes
2777    #[test_traced("DEBUG")]
2778    fn test_crc_fallback_on_corrupted_primary() {
2779        let executor = deterministic::Runner::default();
2780        executor.start(|context: deterministic::Context| async move {
2781            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2782            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2783            // crc2 is at offset: PAGE_SIZE + 6 (for len2) + 2 (skip len2 bytes) = PAGE_SIZE + 8
2784            let crc2_offset = PAGE_SIZE.get() as u64 + 8;
2785
2786            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
2787            let (blob, _) = context
2788                .open("test_partition", b"crc_fallback")
2789                .await
2790                .unwrap();
2791            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2792                .await
2793                .unwrap();
2794            let data1: Vec<u8> = (1..=10).collect();
2795            append.append(&data1).await.unwrap();
2796            append.sync().await.unwrap();
2797            drop(append);
2798
2799            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
2800            let (blob, size) = context
2801                .open("test_partition", b"crc_fallback")
2802                .await
2803                .unwrap();
2804            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2805                .await
2806                .unwrap();
2807            append
2808                .append(&(11..=30).collect::<Vec<u8>>())
2809                .await
2810                .unwrap();
2811            append.sync().await.unwrap();
2812            drop(append);
2813
2814            // Verify slot 1 is now authoritative and data reads correctly
2815            let (blob, size) = context
2816                .open("test_partition", b"crc_fallback")
2817                .await
2818                .unwrap();
2819            assert_eq!(size, physical_page_size as u64);
2820
2821            let page = blob
2822                .read_at(0, physical_page_size)
2823                .await
2824                .unwrap()
2825                .coalesce();
2826            let crc = read_crc_record_from_page(page.as_ref());
2827            assert!(
2828                crc.len2 > crc.len1,
2829                "Slot 1 should be authoritative (len2={} > len1={})",
2830                crc.len2,
2831                crc.len1
2832            );
2833            assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
2834            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
2835
2836            // Verify we can read all 30 bytes before corruption
2837            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
2838                .await
2839                .unwrap();
2840            assert_eq!(append.size().await, 30);
2841            let all_data: Vec<u8> = append.read_at(0, 30).await.unwrap().coalesce().into();
2842            let expected: Vec<u8> = (1..=30).collect();
2843            assert_eq!(all_data, expected);
2844            drop(append);
2845
2846            // === Step 3: Corrupt ONLY crc2 (not len2) ===
2847            // crc2 is 4 bytes at offset PAGE_SIZE + 8
2848            blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
2849                .await
2850                .unwrap();
2851            blob.sync().await.unwrap();
2852
2853            // Verify corruption: len2 should still be 30, but crc2 is now garbage
2854            let page = blob
2855                .read_at(0, physical_page_size)
2856                .await
2857                .unwrap()
2858                .coalesce();
2859            let crc = read_crc_record_from_page(page.as_ref());
2860            assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
2861            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
2862
2863            // === Step 4: Re-open and verify fallback to slot 0's 10 bytes ===
2864            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2865                .await
2866                .unwrap();
2867
2868            // Should fall back to 10 bytes (slot 0's length)
2869            assert_eq!(
2870                append.size().await,
2871                10,
2872                "Should fall back to slot 0's 10 bytes after primary CRC corruption"
2873            );
2874
2875            // Verify the data is the original 10 bytes
2876            let fallback_data: Vec<u8> = append.read_at(0, 10).await.unwrap().coalesce().into();
2877            assert_eq!(
2878                fallback_data, data1,
2879                "Fallback data should match original 10 bytes"
2880            );
2881
2882            // Reading beyond 10 bytes should fail
2883            let result = append.read_at(0, 11).await;
2884            assert!(result.is_err(), "Reading beyond fallback size should fail");
2885        });
2886    }
2887
2888    /// Test that corrupting a non-last page's primary CRC fails even if fallback is valid.
2889    ///
2890    /// Non-last pages must always be full. If the primary CRC is corrupted and the fallback
2891    /// indicates a partial page, validation should fail entirely (not fall back to partial).
2892    ///
2893    /// Strategy:
2894    /// 1. Write 10 bytes → slot 0 has len=10 (partial)
2895    /// 2. Extend to full page (103 bytes) → slot 1 has len=103 (full, authoritative)
2896    /// 3. Extend past page boundary (e.g., 110 bytes) → page 0 is now non-last
2897    /// 4. Corrupt the primary CRC of page 0 (slot 1's crc, which has len=103)
2898    /// 5. Re-open and verify that reading from page 0 fails (fallback has len=10, not full)
2899    #[test_traced("DEBUG")]
2900    fn test_non_last_page_rejects_partial_fallback() {
2901        let executor = deterministic::Runner::default();
2902        executor.start(|context: deterministic::Context| async move {
2903            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2904            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2905            // crc2 for page 0 is at offset: PAGE_SIZE + 8
2906            let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
2907
2908            // === Step 1: Write 10 bytes → slot 0 has len=10 ===
2909            let (blob, _) = context
2910                .open("test_partition", b"non_last_page")
2911                .await
2912                .unwrap();
2913            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
2914                .await
2915                .unwrap();
2916            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
2917            append.sync().await.unwrap();
2918            drop(append);
2919
2920            // === Step 2: Extend to exactly full page (103 bytes) → slot 1 has len=103 ===
2921            let (blob, size) = context
2922                .open("test_partition", b"non_last_page")
2923                .await
2924                .unwrap();
2925            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2926                .await
2927                .unwrap();
2928            // Add bytes 11 through 103 (93 more bytes)
2929            append
2930                .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
2931                .await
2932                .unwrap();
2933            append.sync().await.unwrap();
2934            drop(append);
2935
2936            // Verify page 0 slot 1 is authoritative with len=103 (full page)
2937            let (blob, size) = context
2938                .open("test_partition", b"non_last_page")
2939                .await
2940                .unwrap();
2941            let page = blob
2942                .read_at(0, physical_page_size)
2943                .await
2944                .unwrap()
2945                .coalesce();
2946            let crc = read_crc_record_from_page(page.as_ref());
2947            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
2948            assert_eq!(
2949                crc.len2,
2950                PAGE_SIZE.get(),
2951                "Slot 1 should have len=103 (full page)"
2952            );
2953            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
2954
2955            // === Step 3: Extend past page boundary (add 10 more bytes for total of 113) ===
2956            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2957                .await
2958                .unwrap();
2959            // Add bytes 104 through 113 (10 more bytes, now on page 1)
2960            append
2961                .append(&(104..=113).collect::<Vec<u8>>())
2962                .await
2963                .unwrap();
2964            append.sync().await.unwrap();
2965            drop(append);
2966
2967            // Verify we now have 2 pages
2968            let (blob, size) = context
2969                .open("test_partition", b"non_last_page")
2970                .await
2971                .unwrap();
2972            assert_eq!(
2973                size,
2974                (physical_page_size * 2) as u64,
2975                "Should have 2 physical pages"
2976            );
2977
2978            // Verify data is readable before corruption
2979            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
2980                .await
2981                .unwrap();
2982            assert_eq!(append.size().await, 113);
2983            let all_data: Vec<u8> = append.read_at(0, 113).await.unwrap().coalesce().into();
2984            let expected: Vec<u8> = (1..=113).collect();
2985            assert_eq!(all_data, expected);
2986            drop(append);
2987
2988            // === Step 4: Corrupt page 0's primary CRC (slot 1's crc2) ===
2989            blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
2990                .await
2991                .unwrap();
2992            blob.sync().await.unwrap();
2993
2994            // Verify corruption: page 0's slot 1 still has len=103 but bad CRC
2995            let page = blob
2996                .read_at(0, physical_page_size)
2997                .await
2998                .unwrap()
2999                .coalesce();
3000            let crc = read_crc_record_from_page(page.as_ref());
3001            assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
3002            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
3003            // Slot 0 fallback has len=10 (partial), which is invalid for non-last page
3004            assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
3005
3006            // === Step 5: Re-open and try to read from page 0 ===
3007            // The first page's primary CRC is bad, and fallback indicates partial (len=10).
3008            // Since page 0 is not the last page, a partial fallback is invalid.
3009            // Reading from page 0 should fail because the fallback CRC indicates a partial
3010            // page, which is not allowed for non-last pages.
3011            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3012                .await
3013                .unwrap();
3014
3015            // The blob still reports 113 bytes because init only validates the last page.
3016            // But reading from page 0 should fail because the CRC fallback is partial.
3017            assert_eq!(append.size().await, 113);
3018
3019            // Try to read from page 0 - this should fail with InvalidChecksum because
3020            // the fallback CRC has len=10 (partial), which is invalid for a non-last page.
3021            let result = append.read_at(0, 10).await;
3022            assert!(
3023                result.is_err(),
3024                "Reading from corrupted non-last page via Append should fail, but got: {:?}",
3025                result
3026            );
3027            drop(append);
3028
3029            // Also verify that reading via Replay fails the same way.
3030            let (blob, size) = context
3031                .open("test_partition", b"non_last_page")
3032                .await
3033                .unwrap();
3034            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3035                .await
3036                .unwrap();
3037            let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
3038
3039            // Try to fill pages - should fail on CRC validation.
3040            let result = replay.ensure(1).await;
3041            assert!(
3042                result.is_err(),
3043                "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
3044                result
3045            );
3046        });
3047    }
3048
3049    #[test]
3050    fn test_resize_shrink_validates_crc() {
3051        // Verify that shrinking a blob to a partial page validates the CRC, rather than
3052        // blindly reading raw bytes which could silently load corrupted data.
3053        let executor = deterministic::Runner::default();
3054
3055        executor.start(|context| async move {
3056            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3057            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
3058
3059            let (blob, size) = context
3060                .open("test_partition", b"resize_crc_test")
3061                .await
3062                .unwrap();
3063
3064            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3065                .await
3066                .unwrap();
3067
3068            // Write data across 3 pages: page 0 (full), page 1 (full), page 2 (partial).
3069            // PAGE_SIZE = 103, so 250 bytes = 103 + 103 + 44.
3070            let data: Vec<u8> = (0..=249).collect();
3071            append.append(&data).await.unwrap();
3072            append.sync().await.unwrap();
3073            assert_eq!(append.size().await, 250);
3074            drop(append);
3075
3076            // Corrupt the CRC record of page 1 (middle page).
3077            let (blob, size) = context
3078                .open("test_partition", b"resize_crc_test")
3079                .await
3080                .unwrap();
3081            assert_eq!(size as usize, physical_page_size * 3);
3082
3083            // Page 1 CRC record is at the end of the second physical page.
3084            let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
3085            blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
3086                .await
3087                .unwrap();
3088            blob.sync().await.unwrap();
3089
3090            // Open the blob - Append::new() validates the LAST page (page 2), which is still valid.
3091            // So it should open successfully with size 250.
3092            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3093                .await
3094                .unwrap();
3095            assert_eq!(append.size().await, 250);
3096
3097            // Try to shrink to 150 bytes, which ends in page 1 (the corrupted page).
3098            // 150 bytes = page 0 (103 full) + page 1 (47 partial).
3099            // This should fail because page 1's CRC is corrupted.
3100            let result = append.resize(150).await;
3101            assert!(
3102                matches!(result, Err(crate::Error::InvalidChecksum)),
3103                "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
3104                result
3105            );
3106        });
3107    }
3108
3109    #[test]
3110    fn test_resize_invalidates_cache() {
3111        // Regression: shrinking a blob across a page boundary must drop cached pages for the
3112        // truncated region. Before the fix, `try_read_sync` (which bypasses the tip buffer)
3113        // would observe pre-resize bytes at offsets later reclaimed by new appends.
3114        let executor = deterministic::Runner::default();
3115        executor.start(|context: deterministic::Context| async move {
3116            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3117            let (blob, blob_size) = context
3118                .open("test_partition", b"resize_invalidates_cache")
3119                .await
3120                .unwrap();
3121            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
3122                .await
3123                .unwrap();
3124
3125            // Write + sync a full page so it lands in the page cache. Use a distinct byte
3126            // pattern so a stale cache read would be obvious.
3127            let page_size = PAGE_SIZE.get() as usize;
3128            let old_bytes = vec![0xAAu8; page_size];
3129            append.append(&old_bytes).await.unwrap();
3130            append.sync().await.unwrap();
3131
3132            // Confirm page 0 is reachable via the cache-only fast path.
3133            let mut probe = vec![0u8; 16];
3134            assert!(append.try_read_sync(0, &mut probe));
3135            assert_eq!(probe, vec![0xAAu8; 16]);
3136
3137            // Rewind to 0 (crossing the page boundary) and append a new, distinct pattern.
3138            append.resize(0).await.unwrap();
3139            let new_bytes = vec![0xBBu8; 16];
3140            append.append(&new_bytes).await.unwrap();
3141
3142            // The cache must not serve pre-resize bytes. Either try_read_sync misses (cache
3143            // was invalidated) or it returns the new pattern; it must never return 0xAA.
3144            let mut probe = vec![0u8; 16];
3145            let hit = append.try_read_sync(0, &mut probe);
3146            assert!(
3147                !hit || probe == new_bytes,
3148                "try_read_sync served stale pre-resize bytes: {probe:?}"
3149            );
3150        });
3151    }
3152
3153    #[test]
3154    fn test_resize_same_size_is_noop() {
3155        let executor = deterministic::Runner::default();
3156        executor.start(|context: deterministic::Context| async move {
3157            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3158            let (blob, blob_size) = context
3159                .open("test_partition", b"resize_same_size")
3160                .await
3161                .unwrap();
3162            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
3163                .await
3164                .unwrap();
3165
3166            append.append(b"hello world").await.unwrap();
3167            assert_eq!(append.size().await, 11);
3168
3169            // Resize to same size. Should succeed.
3170            append.resize(11).await.unwrap();
3171            assert_eq!(append.size().await, 11);
3172
3173            // Verify content is still readable and intact.
3174            let read = append.read_at(0, 11).await.unwrap().coalesce();
3175            assert_eq!(read.as_ref(), b"hello world");
3176        });
3177    }
3178
3179    #[test]
3180    fn test_resize_same_page_shrink_reopens_at_shorter_size() {
3181        let executor = deterministic::Runner::default();
3182
3183        executor.start(|context| async move {
3184            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3185            let data: Vec<u8> = (0..50).collect();
3186
3187            let (blob, size) = context
3188                .open("test_partition", b"same_page_shrink")
3189                .await
3190                .unwrap();
3191            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3192                .await
3193                .unwrap();
3194
3195            // Create a partial page whose authoritative CRC is in the first slot. The interrupted
3196            // tests below exercise the opposite slot orientation.
3197            append.append(&data).await.unwrap();
3198            append.sync().await.unwrap();
3199
3200            append.resize(45).await.unwrap();
3201            drop(append);
3202
3203            let (blob, size) = context
3204                .open("test_partition", b"same_page_shrink")
3205                .await
3206                .unwrap();
3207            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3208                .await
3209                .unwrap();
3210            assert_eq!(append.size().await, 45);
3211            let read = append.read_at(0, 45).await.unwrap().coalesce();
3212            assert_eq!(read.as_ref(), &data[..45]);
3213        });
3214    }
3215
3216    #[test]
3217    fn test_resize_same_page_shrink_survives_interrupted_crc_stage() {
3218        let executor = deterministic::Runner::default();
3219
3220        executor.start(|context| async move {
3221            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3222            let data: Vec<u8> = (0..50).collect();
3223
3224            let (blob, size) = context
3225                .open("test_partition", b"same_page_shrink_interrupted")
3226                .await
3227                .unwrap();
3228            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3229                .await
3230                .unwrap();
3231            append.append(&data[..40]).await.unwrap();
3232            append.sync().await.unwrap();
3233            append.append(&data[40..]).await.unwrap();
3234            append.sync().await.unwrap();
3235            drop(append);
3236
3237            let (blob, size) = context
3238                .open("test_partition", b"same_page_shrink_interrupted")
3239                .await
3240                .unwrap();
3241            let faulty_blob = PartialWriteBlob::new(blob, 1, 3);
3242            let write_count = faulty_blob.write_count();
3243            let failed_write_len = faulty_blob.failed_write_len();
3244            let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
3245                .await
3246                .unwrap();
3247
3248            assert!(
3249                append.resize(45).await.is_err(),
3250                "phase-1 partial write should fail"
3251            );
3252            assert_eq!(write_count.load(Ordering::SeqCst), 1);
3253            assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
3254            drop(append);
3255
3256            let (blob, size) = context
3257                .open("test_partition", b"same_page_shrink_interrupted")
3258                .await
3259                .unwrap();
3260            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3261                .await
3262                .unwrap();
3263            assert_eq!(append.size().await, 50);
3264            let read = append.read_at(0, 50).await.unwrap().coalesce();
3265            assert_eq!(read.as_ref(), &data);
3266        });
3267    }
3268
3269    #[test]
3270    fn test_resize_same_page_shrink_survives_interrupted_len_stage() {
3271        let executor = deterministic::Runner::default();
3272
3273        executor.start(|context| async move {
3274            const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(600);
3275            const LARGE_BUFFER_SIZE: usize = 1_200;
3276
3277            let cache_ref =
3278                CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(LARGE_BUFFER_SIZE));
3279            let data: Vec<u8> = (0..300).map(|i| (i % 251) as u8).collect();
3280
3281            let (blob, size) = context
3282                .open("test_partition", b"same_page_shrink_len_stage")
3283                .await
3284                .unwrap();
3285            let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3286                .await
3287                .unwrap();
3288            append.append(&data[..255]).await.unwrap();
3289            append.sync().await.unwrap();
3290            append.append(&data[255..]).await.unwrap();
3291            append.sync().await.unwrap();
3292            drop(append);
3293
3294            let (blob, size) = context
3295                .open("test_partition", b"same_page_shrink_len_stage")
3296                .await
3297                .unwrap();
3298            let faulty_blob = PartialWriteBlob::new(blob, 2, 1);
3299            let write_count = faulty_blob.write_count();
3300            let failed_write_len = faulty_blob.failed_write_len();
3301            let append = Append::new(faulty_blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3302                .await
3303                .unwrap();
3304
3305            assert!(
3306                append.resize(257).await.is_err(),
3307                "length-stage partial write should fail"
3308            );
3309            assert_eq!(write_count.load(Ordering::SeqCst), 2);
3310            assert_eq!(failed_write_len.load(Ordering::SeqCst), 2);
3311            drop(append);
3312
3313            let (blob, size) = context
3314                .open("test_partition", b"same_page_shrink_len_stage")
3315                .await
3316                .unwrap();
3317            let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref)
3318                .await
3319                .unwrap();
3320            assert_eq!(append.size().await, 300);
3321            let read = append.read_at(0, 300).await.unwrap().coalesce();
3322            assert_eq!(read.as_ref(), &data);
3323        });
3324    }
3325
3326    #[test]
3327    fn test_resize_same_page_shrink_preserves_validated_fallback_slot() {
3328        let executor = deterministic::Runner::default();
3329
3330        executor.start(|context| async move {
3331            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3332            let data: Vec<u8> = (0..52).collect();
3333
3334            let (blob, size) = context
3335                .open("test_partition", b"same_page_shrink_fallback_slot")
3336                .await
3337                .unwrap();
3338            let faulty_blob = PartialWriteBlob::new(blob.clone(), 5, 3);
3339            let write_count = faulty_blob.write_count();
3340            let failed_write_len = faulty_blob.failed_write_len();
3341            let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
3342                .await
3343                .unwrap();
3344            append.append(&data[..48]).await.unwrap();
3345            append.sync().await.unwrap();
3346            assert_eq!(write_count.load(Ordering::SeqCst), 1);
3347
3348            append.append(&data[48..50]).await.unwrap();
3349            append.sync().await.unwrap();
3350            assert_eq!(write_count.load(Ordering::SeqCst), 3);
3351
3352            append.append(&data[50..]).await.unwrap();
3353            append.sync().await.unwrap();
3354            assert_eq!(write_count.load(Ordering::SeqCst), 4);
3355
3356            // Corrupt the newer authoritative slot. The older slot still covers the shrink target.
3357            // `resize()` first syncs the live buffer, which writes a valid fallback slot but leaves
3358            // the cached footer stale. A torn phase-1 shrink write must preserve that validated
3359            // fallback slot.
3360            let slot0_offset = PAGE_SIZE.get() as u64;
3361            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
3362                .await
3363                .unwrap();
3364            blob.sync().await.unwrap();
3365
3366            assert!(
3367                append.resize(45).await.is_err(),
3368                "phase-1 partial write should fail"
3369            );
3370            assert_eq!(write_count.load(Ordering::SeqCst), 5);
3371            assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
3372            drop(append);
3373
3374            let (blob, size) = context
3375                .open("test_partition", b"same_page_shrink_fallback_slot")
3376                .await
3377                .unwrap();
3378            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3379                .await
3380                .unwrap();
3381            assert_eq!(append.size().await, 50);
3382            let read = append.read_at(0, 50).await.unwrap().coalesce();
3383            assert_eq!(read.as_ref(), &data[..50]);
3384        });
3385    }
3386
3387    #[test]
3388    fn test_resize_full_page_to_partial_reopens_at_shorter_size() {
3389        let executor = deterministic::Runner::default();
3390
3391        executor.start(|context| async move {
3392            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3393            let page_size = PAGE_SIZE.get() as u64;
3394            let target = page_size + 45;
3395            let data: Vec<u8> = (0..page_size * 2).map(|i| (i % 251) as u8).collect();
3396
3397            let (blob, size) = context
3398                .open("test_partition", b"full_page_to_partial")
3399                .await
3400                .unwrap();
3401            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3402                .await
3403                .unwrap();
3404            append.append(&data).await.unwrap();
3405            append.sync().await.unwrap();
3406
3407            append.resize(target).await.unwrap();
3408            drop(append);
3409
3410            let (blob, size) = context
3411                .open("test_partition", b"full_page_to_partial")
3412                .await
3413                .unwrap();
3414            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3415                .await
3416                .unwrap();
3417            assert_eq!(append.size().await, target);
3418            let read = append.read_at(0, target as usize).await.unwrap().coalesce();
3419            assert_eq!(read.as_ref(), &data[..target as usize]);
3420        });
3421    }
3422
3423    #[test]
3424    fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() {
3425        let executor = deterministic::Runner::default();
3426
3427        executor.start(|context| async move {
3428            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3429            let page_size = PAGE_SIZE.get() as u64;
3430            let target = page_size + 45;
3431            let data: Vec<u8> = (0..page_size * 3).map(|i| (i % 251) as u8).collect();
3432
3433            let (blob, size) = context
3434                .open("test_partition", b"full_page_to_partial_interrupted")
3435                .await
3436                .unwrap();
3437            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3438                .await
3439                .unwrap();
3440            append.append(&data).await.unwrap();
3441            append.sync().await.unwrap();
3442            drop(append);
3443
3444            let (blob, size) = context
3445                .open("test_partition", b"full_page_to_partial_interrupted")
3446                .await
3447                .unwrap();
3448            let faulty_blob = PartialWriteBlob::new(blob, 1, 3);
3449            let write_count = faulty_blob.write_count();
3450            let failed_write_len = faulty_blob.failed_write_len();
3451            let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
3452                .await
3453                .unwrap();
3454
3455            assert!(
3456                append.resize(target).await.is_err(),
3457                "phase-1 partial write should fail"
3458            );
3459            assert_eq!(write_count.load(Ordering::SeqCst), 1);
3460            assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
3461            drop(append);
3462
3463            let (blob, size) = context
3464                .open("test_partition", b"full_page_to_partial_interrupted")
3465                .await
3466                .unwrap();
3467            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3468                .await
3469                .unwrap();
3470            assert_eq!(append.size().await, page_size * 2);
3471            let read = append
3472                .read_at(0, (page_size * 2) as usize)
3473                .await
3474                .unwrap()
3475                .coalesce();
3476            assert_eq!(read.as_ref(), &data[..(page_size * 2) as usize]);
3477        });
3478    }
3479
3480    #[test]
3481    fn test_resize_same_page_shrink_survives_interrupted_length_invalidation() {
3482        let executor = deterministic::Runner::default();
3483
3484        executor.start(|context| async move {
3485            const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(600);
3486            const LARGE_BUFFER_SIZE: usize = 1_200;
3487
3488            let cache_ref =
3489                CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(LARGE_BUFFER_SIZE));
3490            let data: Vec<u8> = (0..300).map(|i| (i % 251) as u8).collect();
3491
3492            let (blob, size) = context
3493                .open(
3494                    "test_partition",
3495                    b"same_page_shrink_interrupted_len_invalidation",
3496                )
3497                .await
3498                .unwrap();
3499            let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3500                .await
3501                .unwrap();
3502            // Put the old authoritative CRC in slot 1, so the shorter CRC will be staged in slot
3503            // 0. The old length is above 255, so a one-byte tear changes the decoded length.
3504            append.append(&data[..255]).await.unwrap();
3505            append.sync().await.unwrap();
3506            append.append(&data[255..]).await.unwrap();
3507            append.sync().await.unwrap();
3508            drop(append);
3509
3510            let (blob, size) = context
3511                .open(
3512                    "test_partition",
3513                    b"same_page_shrink_interrupted_len_invalidation",
3514                )
3515                .await
3516                .unwrap();
3517            let faulty_blob = PartialWriteBlob::new(blob, 3, 1);
3518            let write_count = faulty_blob.write_count();
3519            let failed_write_len = faulty_blob.failed_write_len();
3520            let append = Append::new(faulty_blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
3521                .await
3522                .unwrap();
3523
3524            assert!(
3525                append.resize(40).await.is_err(),
3526                "old-slot length invalidation should fail"
3527            );
3528            assert_eq!(write_count.load(Ordering::SeqCst), 3);
3529            assert_eq!(
3530                failed_write_len.load(Ordering::SeqCst),
3531                std::mem::size_of::<u16>()
3532            );
3533            drop(append);
3534
3535            let (blob, size) = context
3536                .open(
3537                    "test_partition",
3538                    b"same_page_shrink_interrupted_len_invalidation",
3539                )
3540                .await
3541                .unwrap();
3542            let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref)
3543                .await
3544                .unwrap();
3545            assert_eq!(append.size().await, 40);
3546            let read = append.read_at(0, 40).await.unwrap().coalesce();
3547            assert_eq!(read.as_ref(), &data[..40]);
3548        });
3549    }
3550
3551    #[test_traced("DEBUG")]
3552    fn test_resize_partial_shrink_without_physical_resize_uses_range_sync() {
3553        let executor = deterministic::Runner::default();
3554        executor.start(|context: deterministic::Context| async move {
3555            let blob = SyncTrackingBlob::new();
3556            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3557            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
3558                .await
3559                .unwrap();
3560            append.sync().await.unwrap();
3561
3562            let data = vec![5u8; PAGE_SIZE.get() as usize];
3563            append.append(&data).await.unwrap();
3564            append.sync().await.unwrap();
3565
3566            // Shrinking within the same physical page only rewrites CRC metadata.
3567            append.resize(50).await.unwrap();
3568            append.sync().await.unwrap();
3569
3570            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3571            assert_eq!(writes, 4);
3572            assert_eq!(full_syncs, 1);
3573            assert_eq!(range_syncs, 4);
3574        });
3575    }
3576
3577    #[test_traced("DEBUG")]
3578    fn test_resize_partial_shrink_with_physical_resize_clears_full_sync_requirement() {
3579        let executor = deterministic::Runner::default();
3580        executor.start(|context: deterministic::Context| async move {
3581            let blob = SyncTrackingBlob::new();
3582            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3583            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
3584                .await
3585                .unwrap();
3586            append.sync().await.unwrap();
3587
3588            let data = vec![9u8; PAGE_SIZE.get() as usize * 2];
3589            append.append(&data).await.unwrap();
3590            append.sync().await.unwrap();
3591
3592            // Shrinking from two physical pages to one partial page must also make the resize
3593            // durable.
3594            append.resize(50).await.unwrap();
3595            append.sync().await.unwrap();
3596
3597            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3598            assert_eq!(writes, 4);
3599            assert_eq!(full_syncs, 2);
3600            assert_eq!(range_syncs, 3);
3601
3602            // Once the resize barrier is cleared, the next single flush can use range sync again.
3603            append.append(b"x").await.unwrap();
3604            append.sync().await.unwrap();
3605
3606            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3607            assert_eq!(writes, 5);
3608            assert_eq!(full_syncs, 2);
3609            assert_eq!(range_syncs, 4);
3610
3611            let mut expected = data[..50].to_vec();
3612            expected.push(b'x');
3613            let read = append.read_at(0, expected.len()).await.unwrap().coalesce();
3614            assert_eq!(read.as_ref(), expected.as_slice());
3615        });
3616    }
3617
3618    #[test_traced("DEBUG")]
3619    fn test_resize_page_boundary_shrink_uses_full_sync() {
3620        let executor = deterministic::Runner::default();
3621        executor.start(|context: deterministic::Context| async move {
3622            let blob = SyncTrackingBlob::new();
3623            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3624            let append = Append::new(blob.clone(), 0, BUFFER_SIZE, cache_ref)
3625                .await
3626                .unwrap();
3627            append.sync().await.unwrap();
3628
3629            // Start with two durable full pages. After clearing the wrapper barrier, the data sync
3630            // can persist them with one range-sync write.
3631            let page_size = PAGE_SIZE.get() as usize;
3632            let data = vec![11u8; page_size * 2];
3633            append.append(&data).await.unwrap();
3634            append.sync().await.unwrap();
3635
3636            // Shrinking to a page boundary resizes the blob but does not rewrite CRC metadata.
3637            append.resize(PAGE_SIZE.get() as u64).await.unwrap();
3638            append.sync().await.unwrap();
3639
3640            // Only the resize needs a full sync, no additional writes are emitted by the shrink.
3641            let (_, writes, full_syncs, range_syncs) = blob.snapshot();
3642            assert_eq!(writes, 1);
3643            assert_eq!(full_syncs, 2);
3644            assert_eq!(range_syncs, 1);
3645
3646            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3647            let reopened = Append::new(blob.clone(), blob.size(), BUFFER_SIZE, cache_ref)
3648                .await
3649                .unwrap();
3650            assert_eq!(reopened.size().await, PAGE_SIZE.get() as u64);
3651            let read = reopened.read_at(0, page_size).await.unwrap().coalesce();
3652            assert_eq!(read.as_ref(), &data[..page_size]);
3653        });
3654    }
3655
3656    #[test]
3657    fn test_reopen_partial_tail_append_and_resize() {
3658        let executor = deterministic::Runner::default();
3659
3660        executor.start(|context| async move {
3661            const PAGE_SIZE: NonZeroU16 = NZU16!(64);
3662            const BUFFER_SIZE: usize = 256;
3663
3664            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(4));
3665
3666            let (blob, size) = context
3667                .open("test_partition", b"partial_tail_test")
3668                .await
3669                .unwrap();
3670
3671            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3672                .await
3673                .unwrap();
3674
3675            // Write some initial data.
3676            append.append(&[1, 2, 3, 4, 5]).await.unwrap();
3677            append.sync().await.unwrap();
3678            assert_eq!(append.size().await, 5);
3679            drop(append);
3680
3681            let (blob, size) = context
3682                .open("test_partition", b"partial_tail_test")
3683                .await
3684                .unwrap();
3685
3686            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3687                .await
3688                .unwrap();
3689            assert_eq!(append.size().await, 5);
3690
3691            append.append(&[6, 7, 8]).await.unwrap();
3692            append.resize(6).await.unwrap();
3693            append.sync().await.unwrap();
3694
3695            let data: Vec<u8> = append.read_at(0, 6).await.unwrap().coalesce().into();
3696            assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
3697        });
3698    }
3699
3700    #[test]
3701    fn test_corrupted_crc_len_too_large() {
3702        let executor = deterministic::Runner::default();
3703
3704        executor.start(|context| async move {
3705            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3706            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
3707
3708            // Step 1: Create blob with valid data
3709            let (blob, size) = context
3710                .open("test_partition", b"crc_len_test")
3711                .await
3712                .unwrap();
3713
3714            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3715                .await
3716                .unwrap();
3717
3718            append.append(&[0x42; 50]).await.unwrap();
3719            append.sync().await.unwrap();
3720            drop(append);
3721
3722            // Step 2: Corrupt the CRC record to have len > page_size
3723            let (blob, size) = context
3724                .open("test_partition", b"crc_len_test")
3725                .await
3726                .unwrap();
3727            assert_eq!(size as usize, physical_page_size);
3728
3729            // CRC record is at the end of the physical page
3730            let crc_offset = PAGE_SIZE.get() as u64;
3731
3732            // Create a CRC record with len1 = 0xFFFF (65535), which is >> page_size (103)
3733            // Format: [len1_hi, len1_lo, crc1 (4 bytes), len2_hi, len2_lo, crc2 (4 bytes)]
3734            let bad_crc_record: [u8; 12] = [
3735                0xFF, 0xFF, // len1 = 65535 (way too large)
3736                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
3737                0x00, 0x00, // len2 = 0
3738                0x00, 0x00, 0x00, 0x00, // crc2 = 0
3739            ];
3740            blob.write_at(crc_offset, bad_crc_record.to_vec())
3741                .await
3742                .unwrap();
3743            blob.sync().await.unwrap();
3744
3745            // Step 3: Try to open the blob - should NOT panic, should return error or handle gracefully
3746            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
3747
3748            // Either returns InvalidChecksum error OR truncates the corrupted data
3749            // (both are acceptable behaviors - panicking is NOT acceptable)
3750            match result {
3751                Ok(append) => {
3752                    // If it opens successfully, the corrupted page should have been truncated
3753                    let recovered_size = append.size().await;
3754                    assert_eq!(
3755                        recovered_size, 0,
3756                        "Corrupted page should be truncated, size should be 0"
3757                    );
3758                }
3759                Err(e) => {
3760                    // Error is also acceptable
3761                    assert!(
3762                        matches!(e, crate::Error::InvalidChecksum),
3763                        "Expected InvalidChecksum error, got: {:?}",
3764                        e
3765                    );
3766                }
3767            }
3768        });
3769    }
3770
3771    #[test]
3772    fn test_corrupted_crc_both_slots_len_too_large() {
3773        let executor = deterministic::Runner::default();
3774
3775        executor.start(|context| async move {
3776            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3777
3778            // Step 1: Create blob with valid data
3779            let (blob, size) = context
3780                .open("test_partition", b"crc_both_bad")
3781                .await
3782                .unwrap();
3783
3784            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3785                .await
3786                .unwrap();
3787
3788            append.append(&[0x42; 50]).await.unwrap();
3789            append.sync().await.unwrap();
3790            drop(append);
3791
3792            // Step 2: Corrupt BOTH CRC slots to have len > page_size
3793            let (blob, size) = context
3794                .open("test_partition", b"crc_both_bad")
3795                .await
3796                .unwrap();
3797
3798            let crc_offset = PAGE_SIZE.get() as u64;
3799
3800            // Both slots have len > page_size
3801            let bad_crc_record: [u8; 12] = [
3802                0x01, 0x00, // len1 = 256 (> 103)
3803                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
3804                0x02, 0x00, // len2 = 512 (> 103)
3805                0xCA, 0xFE, 0xBA, 0xBE, // crc2 (garbage)
3806            ];
3807            blob.write_at(crc_offset, bad_crc_record.to_vec())
3808                .await
3809                .unwrap();
3810            blob.sync().await.unwrap();
3811
3812            // Step 3: Try to open - should NOT panic
3813            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
3814
3815            match result {
3816                Ok(append) => {
3817                    // Corrupted page truncated
3818                    assert_eq!(append.size().await, 0);
3819                }
3820                Err(e) => {
3821                    assert!(
3822                        matches!(e, crate::Error::InvalidChecksum),
3823                        "Expected InvalidChecksum, got: {:?}",
3824                        e
3825                    );
3826                }
3827            }
3828        });
3829    }
3830}