Skip to main content

commonware_runtime/utils/buffer/paged/
append.rs

1//! The [Append] wrapper consists of a [Blob] and a write buffer, and provides a logical view over
2//! the underlying blob which has a page-oriented structure that provides integrity guarantees. The
3//! wrapper also provides read caching managed by a page cache.
4//!
5//! # Recovery
6//!
7//! On `sync`, this wrapper will durably write buffered data to the underlying blob in pages. All
8//! pages have a [Checksum] at the end. If no CRC record existed before for the page being written,
9//! then one of the checksums will be all zero. If a checksum already existed for the page being
10//! written, then the write will overwrite only the checksum with the lesser length value. Should
11//! this write fail, the previously committed page state can still be recovered.
12//!
13//! During initialization, the wrapper will back up over any page that is not accompanied by a
14//! valid CRC, treating it as the result of an incomplete write that may be invalid.
15
16use super::read::{PageReader, Replay};
17use crate::{
18    buffer::{
19        paged::{CacheRef, Checksum, CHECKSUM_SIZE},
20        tip::Buffer,
21    },
22    Blob, Error, IoBuf, IoBufMut, IoBufs,
23};
24use bytes::BufMut;
25use commonware_cryptography::Crc32;
26use commonware_utils::sync::{AsyncRwLock, AsyncRwLockWriteGuard};
27use std::{
28    num::{NonZeroU16, NonZeroUsize},
29    sync::Arc,
30};
31use tracing::warn;
32
33/// Indicates which CRC slot in a page record must not be overwritten.
34#[derive(Clone, Copy)]
35enum ProtectedCrc {
36    First,
37    Second,
38}
39
40/// Describes the state of the underlying blob with respect to the buffer.
41#[derive(Clone)]
42struct BlobState<B: Blob> {
43    blob: B,
44
45    /// The page where the next appended byte will be written to.
46    current_page: u64,
47
48    /// The state of the partial page in the blob. If it was written due to a sync call, then this
49    /// will contain its CRC record.
50    partial_page_state: Option<Checksum>,
51}
52
53/// A [Blob] wrapper that supports write-cached appending of data, with checksums for data integrity
54/// and page cache managed caching.
55#[derive(Clone)]
56pub struct Append<B: Blob> {
57    /// The underlying blob being wrapped.
58    blob_state: Arc<AsyncRwLock<BlobState<B>>>,
59
60    /// Unique id assigned to this blob by the page cache.
61    id: u64,
62
63    /// A reference to the page cache that manages read caching for this blob.
64    cache_ref: CacheRef,
65
66    /// The write buffer containing any logical bytes following the last full page boundary in the
67    /// underlying blob.
68    buffer: Arc<AsyncRwLock<Buffer>>,
69}
70
71/// Returns the capacity with a floor applied to ensure it can hold at least one full page of new
72/// data even when caching a nearly-full page of already written data.
73fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
74    let floor = page_size as usize * 2;
75    if capacity < floor {
76        warn!(
77            floor,
78            "requested buffer capacity is too low, increasing it to floor"
79        );
80        floor
81    } else {
82        capacity
83    }
84}
85
86impl<B: Blob> Append<B> {
87    /// Create a new [Append] wrapper of the provided `blob` that is known to have `blob_size`
88    /// underlying physical bytes, using the provided `cache_ref` for read caching, and a write
89    /// buffer with capacity `capacity`. Rewinds the blob if necessary to ensure it only contains
90    /// checksum-validated data.
91    pub async fn new(
92        blob: B,
93        original_blob_size: u64,
94        capacity: usize,
95        cache_ref: CacheRef,
96    ) -> Result<Self, Error> {
97        let (partial_page_state, pages, invalid_data_found) =
98            Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
99        if invalid_data_found {
100            // Invalid data was detected, trim it from the blob.
101            let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
102            warn!(
103                original_blob_size,
104                new_blob_size, "truncating blob to remove invalid data"
105            );
106            blob.resize(new_blob_size).await?;
107            blob.sync().await?;
108        }
109
110        let capacity = capacity_with_floor(capacity, cache_ref.page_size());
111
112        let (blob_state, partial_data) = match partial_page_state {
113            Some((partial_page, crc_record)) => (
114                BlobState {
115                    blob,
116                    current_page: pages - 1,
117                    partial_page_state: Some(crc_record),
118                },
119                Some(partial_page),
120            ),
121            None => (
122                BlobState {
123                    blob,
124                    current_page: pages,
125                    partial_page_state: None,
126                },
127                None,
128            ),
129        };
130
131        let buffer = Buffer::from(
132            blob_state.current_page * cache_ref.page_size(),
133            partial_data.unwrap_or_default(),
134            capacity,
135            cache_ref.pool().clone(),
136        );
137
138        Ok(Self {
139            blob_state: Arc::new(AsyncRwLock::new(blob_state)),
140            id: cache_ref.next_id(),
141            cache_ref,
142            buffer: Arc::new(AsyncRwLock::new(buffer)),
143        })
144    }
145
146    /// Scans backwards from the end of the blob, stopping when it finds a valid page.
147    ///
148    /// # Returns
149    ///
150    /// A tuple of `(partial_page, page_count, invalid_data_found)`:
151    ///
152    /// - `partial_page`: If the last valid page is partial (contains fewer than `page_size` logical
153    ///   bytes), returns `Some((data, crc_record))` containing the logical data and its CRC record.
154    ///   Returns `None` if the last valid page is full or if no valid pages exist.
155    ///
156    /// - `page_count`: The number of pages in the blob up to and including the last valid page
157    ///   found (whether or not it's partial). Note that it's possible earlier pages may be invalid
158    ///   since this function stops scanning when it finds one valid page.
159    ///
160    /// - `invalid_data_found`: `true` if there are any bytes in the blob that follow the last valid
161    ///   page. Typically the blob should be resized to eliminate them since their integrity cannot
162    ///   be guaranteed.
163    async fn read_last_valid_page(
164        blob: &B,
165        blob_size: u64,
166        page_size: u64,
167    ) -> Result<(Option<(IoBuf, Checksum)>, u64, bool), Error> {
168        let physical_page_size = page_size + CHECKSUM_SIZE;
169        let partial_bytes = blob_size % physical_page_size;
170        let mut last_page_end = blob_size - partial_bytes;
171
172        // If the last physical page in the blob is truncated, it can't have a valid CRC record and
173        // must be invalid.
174        let mut invalid_data_found = partial_bytes != 0;
175
176        while last_page_end != 0 {
177            // Read the last page and parse its CRC record.
178            let page_start = last_page_end - physical_page_size;
179            let buf = blob
180                .read_at(page_start, physical_page_size as usize)
181                .await?
182                .coalesce()
183                .freeze();
184
185            match Checksum::validate_page(buf.as_ref()) {
186                Some(crc_record) => {
187                    // Found a valid page.
188                    let (len, _) = crc_record.get_crc();
189                    let len = len as u64;
190                    if len != page_size {
191                        // The page is partial (logical data doesn't fill the page).
192                        let logical_bytes = buf.slice(..len as usize);
193                        return Ok((
194                            Some((logical_bytes, crc_record)),
195                            last_page_end / physical_page_size,
196                            invalid_data_found,
197                        ));
198                    }
199                    // The page is full.
200                    return Ok((None, last_page_end / physical_page_size, invalid_data_found));
201                }
202                None => {
203                    // The page is invalid.
204                    last_page_end = page_start;
205                    invalid_data_found = true;
206                }
207            }
208        }
209
210        // No valid page exists in the blob.
211        Ok((None, 0, invalid_data_found))
212    }
213
214    /// Append all bytes in `buf` to the tip of the blob.
215    pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
216        let mut buffer = self.buffer.write().await;
217
218        if !buffer.append(buf) {
219            return Ok(());
220        }
221
222        // Buffer is over capacity, so we need to write data to the blob.
223        self.flush_internal(buffer, false).await
224    }
225
226    /// Flush all full pages from the buffer to disk, resetting the buffer to contain only the bytes
227    /// in any final partial page. If `write_partial_page` is true, the partial page will be written
228    /// to the blob as well along with a CRC record.
229    async fn flush_internal(
230        &self,
231        mut buf_guard: AsyncRwLockWriteGuard<'_, Buffer>,
232        write_partial_page: bool,
233    ) -> Result<(), Error> {
234        let buffer = &mut *buf_guard;
235
236        // Cache the pages we are writing in the page cache so they remain cached for concurrent
237        // reads while we flush the buffer.
238        let remaining_byte_count = self
239            .cache_ref
240            .cache(self.id, buffer.as_ref(), buffer.offset);
241
242        // Read the old partial page state before doing the heavy work of preparing physical pages.
243        // This is safe because partial_page_state is only modified by flush_internal, and we hold
244        // the buffer write lock which prevents concurrent flushes.
245        let old_partial_page_state = {
246            let blob_state = self.blob_state.read().await;
247            blob_state.partial_page_state.clone()
248        };
249
250        // Prepare the *physical* pages corresponding to the data in the buffer.
251        // Pass the old partial page state so the CRC record is constructed correctly.
252        let (physical_pages, partial_page_state) = self.to_physical_pages(
253            &*buffer,
254            write_partial_page,
255            old_partial_page_state.as_ref(),
256        );
257
258        // If there's nothing to write, return early.
259        if physical_pages.is_empty() {
260            return Ok(());
261        }
262
263        // Drain the provided buffer of the full pages that are now cached in the page cache and
264        // will be written to the blob. If the tip is fully drained, detach its backing so empty
265        // append buffers don't retain pooled storage.
266        if remaining_byte_count == 0 {
267            let _ = buffer
268                .take()
269                .expect("take must succeed when flush drains all buffered bytes");
270        } else {
271            let bytes_to_drain = buffer.len() - remaining_byte_count;
272            buffer.drop_prefix(bytes_to_drain);
273            buffer.offset += bytes_to_drain as u64;
274        }
275        let new_offset = buffer.offset;
276
277        // Acquire a write lock on the blob state so nobody tries to read or modify the blob while
278        // we're writing to it.
279        let mut blob_state = self.blob_state.write().await;
280
281        // Release the buffer lock to allow for concurrent reads & buffered writes while we write
282        // the physical pages.
283        drop(buf_guard);
284
285        let logical_page_size = self.cache_ref.page_size() as usize;
286        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
287        let write_at_offset = blob_state.current_page * physical_page_size as u64;
288
289        // Count only FULL pages for advancing current_page. A partial page (if included) takes
290        // up a full physical page on disk, but it's not complete - the next byte still goes to
291        // that same logical page.
292        let total_pages_in_buffer = physical_pages.len() / physical_page_size;
293        let full_pages_written = if partial_page_state.is_some() {
294            total_pages_in_buffer.saturating_sub(1)
295        } else {
296            total_pages_in_buffer
297        };
298
299        // Identify protected regions based on the OLD partial page state
300        let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
301
302        // Update state before writing. This may appear to risk data loss if writes fail,
303        // but write failures are fatal per this codebase's design - callers must not use
304        // the blob after any mutable method returns an error.
305        blob_state.current_page += full_pages_written as u64;
306        blob_state.partial_page_state = partial_page_state;
307
308        // Make sure the buffer offset and underlying blob agree on the state of the tip.
309        assert_eq!(
310            blob_state.current_page * self.cache_ref.page_size(),
311            new_offset
312        );
313
314        // Write the physical pages to the blob.
315        // If there are protected regions in the first page, we need to write around them.
316        if let Some((prefix_len, protected_crc)) = protected_regions {
317            match protected_crc {
318                ProtectedCrc::First => {
319                    // Protected CRC is first: [page_size..page_size+6]
320                    // Write 1: New data in first page [prefix_len..page_size]
321                    if prefix_len < logical_page_size {
322                        let payload = physical_pages.slice(prefix_len..logical_page_size);
323                        blob_state
324                            .blob
325                            .write_at(write_at_offset + prefix_len as u64, payload)
326                            .await?;
327                    }
328                    // Write 2: Second CRC of first page + all remaining pages [page_size+6..end]
329                    let second_crc_start = logical_page_size + 6;
330                    let payload = physical_pages.slice(second_crc_start..);
331                    blob_state
332                        .blob
333                        .write_at(write_at_offset + second_crc_start as u64, payload)
334                        .await?;
335                }
336                ProtectedCrc::Second => {
337                    // Protected CRC is second: [page_size+6..page_size+12]
338                    // Write 1: New data + first CRC of first page [prefix_len..page_size+6]
339                    let first_crc_end = logical_page_size + 6;
340                    if prefix_len < first_crc_end {
341                        let payload = physical_pages.slice(prefix_len..first_crc_end);
342                        blob_state
343                            .blob
344                            .write_at(write_at_offset + prefix_len as u64, payload)
345                            .await?;
346                    }
347                    // Write 2: All remaining pages (if any) [physical_page_size..end]
348                    if physical_pages.len() > physical_page_size {
349                        let payload = physical_pages.slice(physical_page_size..);
350                        blob_state
351                            .blob
352                            .write_at(write_at_offset + physical_page_size as u64, payload)
353                            .await?;
354                    }
355                }
356            }
357        } else {
358            // No protected regions, write everything in one operation
359            blob_state
360                .blob
361                .write_at(write_at_offset, physical_pages)
362                .await?;
363        }
364
365        Ok(())
366    }
367
368    /// Returns the logical size of the blob. This accounts for both written and buffered data.
369    pub async fn size(&self) -> u64 {
370        let buffer = self.buffer.read().await;
371        buffer.size()
372    }
373
374    /// Read exactly `len` immutable bytes starting at `offset`.
375    pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
376        // Read into a temporary contiguous buffer and copy back to preserve structure.
377        // SAFETY: read_into below initializes all `len` bytes.
378        let mut buf = unsafe { self.cache_ref.pool().alloc_len(len) };
379        self.read_into(buf.as_mut(), offset).await?;
380        Ok(buf.into())
381    }
382
383    /// Reads up to `buf.len()` bytes starting at `logical_offset`, but only as many as are
384    /// available.
385    ///
386    /// This is useful for reading variable-length prefixes (like varints) where you want to read
387    /// up to a maximum number of bytes but the actual data might be shorter.
388    ///
389    /// Returns the buffer (truncated to actual bytes read) and the number of bytes read.
390    /// Returns an error if no bytes are available at the given offset.
391    pub async fn read_up_to(
392        &self,
393        logical_offset: u64,
394        len: usize,
395        bufs: impl Into<IoBufMut> + Send,
396    ) -> Result<(IoBufMut, usize), Error> {
397        let mut bufs = bufs.into();
398        if len == 0 {
399            bufs.truncate(0);
400            return Ok((bufs, 0));
401        }
402        let blob_size = self.size().await;
403        let available = (blob_size.saturating_sub(logical_offset) as usize).min(len);
404        if available == 0 {
405            return Err(Error::BlobInsufficientLength);
406        }
407        // SAFETY: read_into below fills all `available` bytes.
408        unsafe { bufs.set_len(available) };
409        self.read_into(bufs.as_mut(), logical_offset).await?;
410
411        Ok((bufs, available))
412    }
413
414    /// Reads bytes starting at `logical_offset` into `buf`.
415    ///
416    /// This method allows reading directly into a mutable slice without taking ownership of the
417    /// buffer or requiring a specific buffer type.
418    pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
419        // Ensure the read doesn't overflow.
420        let end_offset = logical_offset
421            .checked_add(buf.len() as u64)
422            .ok_or(Error::OffsetOverflow)?;
423
424        // Acquire a read lock on the buffer.
425        let buffer = self.buffer.read().await;
426
427        // If the data required is beyond the size of the blob, return an error.
428        if end_offset > buffer.size() {
429            return Err(Error::BlobInsufficientLength);
430        }
431
432        // Extract any bytes from the buffer that overlap with the requested range.
433        let remaining = if end_offset <= buffer.offset {
434            // No overlap with tip.
435            buf.len()
436        } else {
437            // Overlap is always a suffix of requested range.
438            let overlap_start = buffer.offset.max(logical_offset);
439            let dst_start = (overlap_start - logical_offset) as usize;
440            let src_start = (overlap_start - buffer.offset) as usize;
441            let copied = buf.len() - dst_start;
442            buf[dst_start..].copy_from_slice(&buffer.as_ref()[src_start..src_start + copied]);
443            dst_start
444        };
445
446        // Release buffer lock before potential I/O.
447        drop(buffer);
448
449        if remaining == 0 {
450            return Ok(());
451        }
452
453        // Fast path: try to read *only* from page cache without acquiring blob lock. This allows
454        // concurrent reads even while a flush is in progress.
455        let cached = self
456            .cache_ref
457            .read_cached(self.id, &mut buf[..remaining], logical_offset);
458
459        if cached == remaining {
460            // All bytes found in cache.
461            return Ok(());
462        }
463
464        // Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight
465        // write completes before we read from the blob.
466        let blob_guard = self.blob_state.read().await;
467
468        // Read remaining bytes that were not already obtained from the earlier cache read.
469        let uncached_offset = logical_offset + cached as u64;
470        let uncached_len = remaining - cached;
471        self.cache_ref
472            .read(
473                &blob_guard.blob,
474                self.id,
475                &mut buf[cached..cached + uncached_len],
476                uncached_offset,
477            )
478            .await
479    }
480
481    /// Returns the protected region info for a partial page, if any.
482    ///
483    /// # Returns
484    ///
485    /// `None` if there's no existing partial page.
486    ///
487    /// `Some((prefix_len, protected_crc))` where:
488    /// - `prefix_len`: bytes `[0..prefix_len]` were already written and can be substituted with
489    ///   zeros (skip writing)
490    /// - `protected_crc`: which CRC slot must not be overwritten
491    fn identify_protected_regions(
492        partial_page_state: Option<&Checksum>,
493    ) -> Option<(usize, ProtectedCrc)> {
494        let crc_record = partial_page_state?;
495        let (old_len, _) = crc_record.get_crc();
496        // The protected CRC is the one with the larger (authoritative) length.
497        let protected_crc = if crc_record.len1 >= crc_record.len2 {
498            ProtectedCrc::First
499        } else {
500            ProtectedCrc::Second
501        };
502        Some((old_len as usize, protected_crc))
503    }
504
505    /// Prepare a buffer containing the result of converting each buffered logical page in the input
506    /// into a physical page (meaning each page has a CRC record). If the last page is not yet full,
507    /// it will be included only if `include_partial_page` is true.
508    ///
509    /// # Arguments
510    ///
511    /// * `buffer` - The buffer containing logical page data
512    /// * `include_partial_page` - Whether to include a partial page if one exists
513    /// * `old_crc_record` - The CRC record from a previously committed partial page, if any.
514    ///   When present, the first page's CRC record will preserve the old CRC in its original slot
515    ///   and place the new CRC in the other slot.
516    fn to_physical_pages(
517        &self,
518        buffer: &Buffer,
519        include_partial_page: bool,
520        old_crc_record: Option<&Checksum>,
521    ) -> (IoBuf, Option<Checksum>) {
522        let logical_page_size = self.cache_ref.page_size() as usize;
523        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
524        let pages_to_write = buffer.len() / logical_page_size;
525        let max_pages_to_write = pages_to_write + if include_partial_page { 1 } else { 0 };
526        let mut write_buffer = self
527            .cache_ref
528            .pool()
529            .alloc(max_pages_to_write * physical_page_size);
530        let buffer_data = buffer.as_ref();
531
532        // For each logical page, copy over the data and then write a crc record for it.
533        for page in 0..pages_to_write {
534            let start_read_idx = page * logical_page_size;
535            let end_read_idx = start_read_idx + logical_page_size;
536            let logical_page = &buffer_data[start_read_idx..end_read_idx];
537            write_buffer.put_slice(logical_page);
538
539            let crc = Crc32::checksum(logical_page);
540            let logical_page_size_u16 =
541                u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
542
543            // For the first page, if there's an old partial page CRC, construct the record
544            // to preserve the old CRC in its original slot.
545            let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
546                Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
547            } else {
548                Checksum::new(logical_page_size_u16, crc)
549            };
550            write_buffer.put_slice(&crc_record.to_bytes());
551        }
552
553        if !include_partial_page {
554            return (write_buffer.freeze(), None);
555        }
556
557        let partial_page = &buffer_data[pages_to_write * logical_page_size..];
558        if partial_page.is_empty() {
559            // No partial page data to write.
560            return (write_buffer.freeze(), None);
561        }
562
563        // If there are no full pages and the partial page length matches what was already
564        // written, there's nothing new to write.
565        if pages_to_write == 0 {
566            if let Some(old_crc) = old_crc_record {
567                let (old_len, _) = old_crc.get_crc();
568                if partial_page.len() == old_len as usize {
569                    return (write_buffer.freeze(), None);
570                }
571            }
572        }
573        write_buffer.put_slice(partial_page);
574        let partial_len = partial_page.len();
575        let crc = Crc32::checksum(partial_page);
576
577        // Pad with zeros to fill up to logical_page_size.
578        let zero_count = logical_page_size - partial_len;
579        if zero_count > 0 {
580            write_buffer.put_bytes(0, zero_count);
581        }
582
583        // For partial pages: if this is the first page and there's an old CRC, preserve it.
584        // Otherwise just use the new CRC in slot 0.
585        let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
586            Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
587        } else {
588            Checksum::new(partial_len as u16, crc)
589        };
590
591        write_buffer.put_slice(&crc_record.to_bytes());
592
593        // Return the CRC record that matches what we wrote to disk, so that future flushes
594        // correctly identify which slot is protected.
595        (write_buffer.freeze(), Some(crc_record))
596    }
597
598    /// Build a CRC record that preserves the old CRC in its original slot and places
599    /// the new CRC in the other slot.
600    const fn build_crc_record_preserving_old(
601        new_len: u16,
602        new_crc: u32,
603        old_crc: &Checksum,
604    ) -> Checksum {
605        let (old_len, old_crc_val) = old_crc.get_crc();
606        // The old CRC is in the slot with the larger length value (first slot wins ties).
607        if old_crc.len1 >= old_crc.len2 {
608            // Old CRC is in slot 0, put new CRC in slot 1
609            Checksum {
610                len1: old_len,
611                crc1: old_crc_val,
612                len2: new_len,
613                crc2: new_crc,
614            }
615        } else {
616            // Old CRC is in slot 1, put new CRC in slot 0
617            Checksum {
618                len1: new_len,
619                crc1: new_crc,
620                len2: old_len,
621                crc2: old_crc_val,
622            }
623        }
624    }
625
626    /// Flushes any buffered data, then returns a [Replay] for the underlying blob.
627    ///
628    /// The returned replay can be used to sequentially read all pages from the blob while ensuring
629    /// all data passes integrity verification. CRCs are validated but not included in the output.
630    pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
631        let logical_page_size = self.cache_ref.page_size();
632        let logical_page_size_nz =
633            NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
634
635        // Flush any buffered data (without fsync) so the reader sees all written data.
636        {
637            let buf_guard = self.buffer.write().await;
638            self.flush_internal(buf_guard, true).await?;
639        }
640
641        // Convert buffer size (bytes) to page count
642        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
643        let prefetch_pages = buffer_size.get() / physical_page_size as usize;
644        let prefetch_pages = prefetch_pages.max(1); // At least 1 page
645        let blob_guard = self.blob_state.read().await;
646
647        // Compute both physical and logical blob sizes.
648        let (physical_blob_size, logical_blob_size) =
649            blob_guard.partial_page_state.as_ref().map_or_else(
650                || {
651                    // All pages are full.
652                    let physical = physical_page_size * blob_guard.current_page;
653                    let logical = logical_page_size * blob_guard.current_page;
654                    (physical, logical)
655                },
656                |crc_record| {
657                    // There's a partial page with a checksum.
658                    let (partial_len, _) = crc_record.get_crc();
659                    let partial_len = partial_len as u64;
660                    // Physical: all pages including the partial one (which is padded to full size).
661                    let physical = physical_page_size * (blob_guard.current_page + 1);
662                    // Logical: full pages before this + partial page's actual data length.
663                    let logical = logical_page_size * blob_guard.current_page + partial_len;
664                    (physical, logical)
665                },
666            );
667
668        let reader = PageReader::new(
669            blob_guard.blob.clone(),
670            physical_blob_size,
671            logical_blob_size,
672            prefetch_pages,
673            logical_page_size_nz,
674        );
675        Ok(Replay::new(reader))
676    }
677}
678
679impl<B: Blob> Append<B> {
680    pub async fn sync(&self) -> Result<(), Error> {
681        // Flush any buffered data, including any partial page. When flush_internal returns,
682        // write_at has completed and data has been written to the underlying blob.
683        let buf_guard = self.buffer.write().await;
684        self.flush_internal(buf_guard, true).await?;
685
686        // Sync the underlying blob. We need the blob read lock here since sync() requires access
687        // to the blob, but only a read lock since we're not modifying blob state.
688        let blob_state = self.blob_state.read().await;
689        blob_state.blob.sync().await
690    }
691
692    /// Resize the blob to the provided logical `size`.
693    ///
694    /// This truncates the blob to contain only `size` logical bytes. The physical blob size will
695    /// be adjusted to include the necessary CRC records for the remaining pages.
696    ///
697    /// # Warning
698    ///
699    /// - Concurrent mutable operations (append, resize) are not supported and will cause data loss.
700    /// - Concurrent readers which try to read past the new size during the resize may error.
701    /// - The resize is not guaranteed durable until the next sync.
702    pub async fn resize(&self, size: u64) -> Result<(), Error> {
703        let current_size = self.size().await;
704
705        // Handle growing by appending zero bytes.
706        if size > current_size {
707            let zeros_needed = (size - current_size) as usize;
708            let mut zeros = self.cache_ref.pool().alloc(zeros_needed);
709            zeros.put_bytes(0, zeros_needed);
710            self.append(zeros.as_ref()).await?;
711            return Ok(());
712        }
713
714        // Implementation note: rewinding the blob across a page boundary potentially results in
715        // stale data remaining in the page cache. We don't proactively purge the data
716        // within this function since it would be inaccessible anyway. Instead we ensure it is
717        // always updated should the blob grow back to the point where we have new data for the same
718        // page, if any old data hasn't expired naturally by then.
719
720        let logical_page_size = self.cache_ref.page_size();
721        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
722
723        // Flush any buffered data first to ensure we have a consistent state on disk.
724        self.sync().await?;
725
726        // Acquire both locks to prevent concurrent operations.
727        let mut buf_guard = self.buffer.write().await;
728        let mut blob_guard = self.blob_state.write().await;
729
730        // Calculate the physical size needed for the new logical size.
731        let full_pages = size / logical_page_size;
732        let partial_bytes = size % logical_page_size;
733        let new_physical_size = if partial_bytes > 0 {
734            // We need full_pages + 1 physical pages to hold the partial data.
735            // The partial page will be padded to full physical page size.
736            (full_pages + 1) * physical_page_size
737        } else {
738            // No partial page needed.
739            full_pages * physical_page_size
740        };
741
742        // Resize the underlying blob.
743        blob_guard.blob.resize(new_physical_size).await?;
744        blob_guard.partial_page_state = None;
745
746        // Update blob state and buffer based on the desired logical size. The partial page data is
747        // read with CRC validation; the validated length may exceed partial_bytes (reflecting the
748        // old data length), but we only load the prefix we need. The next sync will write the
749        // correct CRC for the new length.
750        //
751        // Note: This updates state before validation completes, which could leave state
752        // inconsistent if validation fails. This is acceptable because failures from mutable
753        // methods are fatal - callers must not use the blob after any error.
754
755        blob_guard.current_page = full_pages;
756        buf_guard.offset = full_pages * logical_page_size;
757
758        if partial_bytes > 0 {
759            // There's a partial page. Read its data from disk with CRC validation.
760            let page_data =
761                super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
762
763            // Ensure the validated data covers what we need.
764            if (page_data.len() as u64) < partial_bytes {
765                return Err(Error::InvalidChecksum);
766            }
767
768            buf_guard.clear();
769            let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]);
770            assert!(!over_capacity);
771        } else {
772            // No partial page - all pages are full or blob is empty.
773            buf_guard.clear();
774        }
775
776        Ok(())
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use super::*;
783    use crate::{deterministic, BufferPool, BufferPoolConfig, Runner as _, Storage as _};
784    use commonware_codec::ReadExt;
785    use commonware_macros::test_traced;
786    use commonware_utils::{NZUsize, NZU16};
787    use prometheus_client::registry::Registry;
788    use std::num::NonZeroU16;
789
790    const PAGE_SIZE: NonZeroU16 = NZU16!(103); // janky size to ensure we test page alignment
791    const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
792
793    #[test_traced("DEBUG")]
794    fn test_append_crc_empty() {
795        let executor = deterministic::Runner::default();
796        executor.start(|context: deterministic::Context| async move {
797            // Open a new blob.
798            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
799            assert_eq!(blob_size, 0);
800
801            // Create a page cache reference.
802            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
803
804            // Create an Append wrapper.
805            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
806                .await
807                .unwrap();
808
809            // Verify initial size is 0.
810            assert_eq!(append.size().await, 0);
811
812            // Close & re-open.
813            append.sync().await.unwrap();
814            drop(append);
815
816            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
817            assert_eq!(blob_size, 0); // There was no need to write a crc since there was no data.
818
819            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
820                .await
821                .unwrap();
822
823            assert_eq!(append.size().await, 0);
824        });
825    }
826
827    #[test_traced("DEBUG")]
828    fn test_append_crc_basic() {
829        let executor = deterministic::Runner::default();
830        executor.start(|context: deterministic::Context| async move {
831            // Open a new blob.
832            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
833            assert_eq!(blob_size, 0);
834
835            // Create a page cache reference.
836            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
837
838            // Create an Append wrapper.
839            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
840                .await
841                .unwrap();
842
843            // Verify initial size is 0.
844            assert_eq!(append.size().await, 0);
845
846            // Append some bytes.
847            let data = vec![1, 2, 3, 4, 5];
848            append.append(&data).await.unwrap();
849
850            // Verify size reflects appended data.
851            assert_eq!(append.size().await, 5);
852
853            // Append more bytes.
854            let more_data = vec![6, 7, 8, 9, 10];
855            append.append(&more_data).await.unwrap();
856
857            // Verify size is cumulative.
858            assert_eq!(append.size().await, 10);
859
860            // Read back the first chunk and verify.
861            let read_buf = append.read_at(0, 5).await.unwrap().coalesce();
862            assert_eq!(read_buf, &data[..]);
863
864            // Read back the second chunk and verify.
865            let read_buf = append.read_at(5, 5).await.unwrap().coalesce();
866            assert_eq!(read_buf, &more_data[..]);
867
868            // Read all data at once and verify.
869            let read_buf = append.read_at(0, 10).await.unwrap().coalesce();
870            assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
871
872            // Close and reopen the blob and make sure the data is still there and the trailing
873            // checksum is written & stripped as expected.
874            append.sync().await.unwrap();
875            drop(append);
876
877            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
878            // Physical page = 103 logical + 12 Checksum = 115 bytes (padded partial page)
879            assert_eq!(blob_size, 115);
880            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
881                .await
882                .unwrap();
883            assert_eq!(append.size().await, 10); // CRC should be stripped after verification
884
885            // Append data that spans a page boundary.
886            // PAGE_SIZE=103 is the logical page size. We have 10 bytes, so writing
887            // 100 more bytes (total 110) will cross the page boundary at byte 103.
888            let spanning_data: Vec<u8> = (11..=110).collect();
889            append.append(&spanning_data).await.unwrap();
890            assert_eq!(append.size().await, 110);
891
892            // Read back data that spans the page boundary.
893            let read_buf = append.read_at(10, 100).await.unwrap().coalesce();
894            assert_eq!(read_buf, &spanning_data[..]);
895
896            // Read all 110 bytes at once.
897            let read_buf = append.read_at(0, 110).await.unwrap().coalesce();
898            let expected: Vec<u8> = (1..=110).collect();
899            assert_eq!(read_buf, &expected[..]);
900
901            // Drop and re-open and make sure bytes are still there.
902            append.sync().await.unwrap();
903            drop(append);
904
905            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
906            // 2 physical pages: 2 * 115 = 230 bytes
907            assert_eq!(blob_size, 230);
908            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
909                .await
910                .unwrap();
911            assert_eq!(append.size().await, 110);
912
913            // Append data to reach exactly a page boundary.
914            // Logical page size is 103. We have 110 bytes, next boundary is 206 (103 * 2).
915            // So we need 96 more bytes.
916            let boundary_data: Vec<u8> = (111..=206).collect();
917            assert_eq!(boundary_data.len(), 96);
918            append.append(&boundary_data).await.unwrap();
919            assert_eq!(append.size().await, 206);
920
921            // Verify we can read it back.
922            let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
923            let expected: Vec<u8> = (1..=206).collect();
924            assert_eq!(read_buf, &expected[..]);
925
926            // Drop and re-open at the page boundary.
927            append.sync().await.unwrap();
928            drop(append);
929
930            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
931            // Physical size should be exactly 2 pages: 115 * 2 = 230 bytes
932            assert_eq!(blob_size, 230);
933            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
934                .await
935                .unwrap();
936            assert_eq!(append.size().await, 206);
937
938            // Verify data is still readable after reopen.
939            let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
940            assert_eq!(read_buf, &expected[..]);
941        });
942    }
943
944    #[test_traced("DEBUG")]
945    fn test_sync_releases_tip_pool_slot_after_full_drain() {
946        let executor = deterministic::Runner::default();
947        executor.start(|context: deterministic::Context| async move {
948            let mut registry = Registry::default();
949            let pool = BufferPool::new(
950                BufferPoolConfig::for_storage().with_max_per_class(NZUsize!(2)),
951                &mut registry,
952            );
953            let cache_ref = CacheRef::new(pool.clone(), PAGE_SIZE, NZUsize!(1));
954
955            let (blob, blob_size) = context
956                .open("test_partition", b"release_tip_backing")
957                .await
958                .unwrap();
959            assert_eq!(blob_size, 0);
960
961            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
962                .await
963                .unwrap();
964
965            append
966                .append(&vec![7; PAGE_SIZE.get() as usize])
967                .await
968                .unwrap();
969
970            // One pooled slot backs the page cache and one backs the mutable tip.
971            assert!(
972                matches!(
973                    pool.try_alloc(BUFFER_SIZE),
974                    Err(crate::iobuf::PoolError::Exhausted)
975                ),
976                "full-page tip should occupy the remaining pooled slot before sync"
977            );
978
979            append.sync().await.unwrap();
980
981            // After a full drain, the tip should no longer pin that slot.
982            assert!(
983                pool.try_alloc(BUFFER_SIZE).is_ok(),
984                "sync should release pooled backing when no partial tail remains"
985            );
986        });
987    }
988
989    #[test_traced("DEBUG")]
990    fn test_read_up_to_zero_len_truncates_buffer() {
991        let executor = deterministic::Runner::default();
992        executor.start(|context: deterministic::Context| async move {
993            // Open a new blob.
994            let (blob, blob_size) = context
995                .open("test_partition", b"read_up_to_zero_len")
996                .await
997                .unwrap();
998            assert_eq!(blob_size, 0);
999
1000            // Create a page cache reference.
1001            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1002
1003            // Create an Append wrapper and write some data.
1004            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1005                .await
1006                .unwrap();
1007            append.append(&[1, 2, 3, 4]).await.unwrap();
1008
1009            // Request a zero-length read with a reused, non-empty buffer.
1010            let stale = vec![9, 8, 7, 6];
1011            let (buf, read) = append.read_up_to(0, 0, stale).await.unwrap();
1012
1013            assert_eq!(read, 0);
1014            assert_eq!(buf.len(), 0, "read_up_to must truncate returned buffer");
1015            assert_eq!(buf.freeze().as_ref(), b"");
1016        });
1017    }
1018
1019    /// Helper to read the CRC record from raw blob bytes at the end of a physical page.
1020    fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1021        let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1022        Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1023    }
1024
1025    /// Dummy marker bytes with len=0 so the mangled slot is never authoritative.
1026    /// Format: [len_hi=0, len_lo=0, 0xDE, 0xAD, 0xBE, 0xEF]
1027    const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1028
1029    #[test]
1030    fn test_identify_protected_regions_equal_lengths() {
1031        // When lengths are equal, the first CRC should be protected (tie-breaking rule).
1032        let record = Checksum {
1033            len1: 50,
1034            crc1: 0xAAAAAAAA,
1035            len2: 50,
1036            crc2: 0xBBBBBBBB,
1037        };
1038
1039        let result =
1040            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1041        assert!(result.is_some());
1042        let (prefix_len, protected_crc) = result.unwrap();
1043        assert_eq!(prefix_len, 50);
1044        assert!(
1045            matches!(protected_crc, ProtectedCrc::First),
1046            "First CRC should be protected when lengths are equal"
1047        );
1048    }
1049
1050    #[test]
1051    fn test_identify_protected_regions_len1_larger() {
1052        // When len1 > len2, the first CRC should be protected.
1053        let record = Checksum {
1054            len1: 100,
1055            crc1: 0xAAAAAAAA,
1056            len2: 50,
1057            crc2: 0xBBBBBBBB,
1058        };
1059
1060        let result =
1061            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1062        assert!(result.is_some());
1063        let (prefix_len, protected_crc) = result.unwrap();
1064        assert_eq!(prefix_len, 100);
1065        assert!(
1066            matches!(protected_crc, ProtectedCrc::First),
1067            "First CRC should be protected when len1 > len2"
1068        );
1069    }
1070
1071    #[test]
1072    fn test_identify_protected_regions_len2_larger() {
1073        // When len2 > len1, the second CRC should be protected.
1074        let record = Checksum {
1075            len1: 50,
1076            crc1: 0xAAAAAAAA,
1077            len2: 100,
1078            crc2: 0xBBBBBBBB,
1079        };
1080
1081        let result =
1082            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1083        assert!(result.is_some());
1084        let (prefix_len, protected_crc) = result.unwrap();
1085        assert_eq!(prefix_len, 100);
1086        assert!(
1087            matches!(protected_crc, ProtectedCrc::Second),
1088            "Second CRC should be protected when len2 > len1"
1089        );
1090    }
1091
1092    /// Test that slot 1 is NOT overwritten when it's the protected slot.
1093    ///
1094    /// Strategy: After extending twice (so slot 1 becomes authoritative with larger len),
1095    /// mangle the non-authoritative slot 0. Then extend again - slot 0 should be overwritten
1096    /// with the new CRC, while slot 1 (protected) should remain untouched.
1097    #[test_traced("DEBUG")]
1098    fn test_crc_slot1_protected() {
1099        let executor = deterministic::Runner::default();
1100        executor.start(|context: deterministic::Context| async move {
1101            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1102            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1103            let slot0_offset = PAGE_SIZE.get() as u64;
1104            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1105
1106            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1107            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1108            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1109                .await
1110                .unwrap();
1111            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1112            append.sync().await.unwrap();
1113            drop(append);
1114
1115            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1116            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1117            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1118                .await
1119                .unwrap();
1120            append
1121                .append(&(11..=30).collect::<Vec<u8>>())
1122                .await
1123                .unwrap();
1124            append.sync().await.unwrap();
1125            drop(append);
1126
1127            // Verify slot 1 is now authoritative
1128            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1129            let page = blob
1130                .read_at(0, physical_page_size)
1131                .await
1132                .unwrap()
1133                .coalesce();
1134            let crc = read_crc_record_from_page(page.as_ref());
1135            assert!(
1136                crc.len2 > crc.len1,
1137                "Slot 1 should be authoritative (len2={} > len1={})",
1138                crc.len2,
1139                crc.len1
1140            );
1141
1142            // Capture slot 1 bytes before mangling slot 0
1143            let slot1_before: Vec<u8> = blob
1144                .read_at(slot1_offset, 6)
1145                .await
1146                .unwrap()
1147                .coalesce()
1148                .freeze()
1149                .into();
1150
1151            // === Step 3: Mangle slot 0 (non-authoritative) ===
1152            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1153                .await
1154                .unwrap();
1155            blob.sync().await.unwrap();
1156
1157            // Verify mangle worked
1158            let slot0_mangled: Vec<u8> = blob
1159                .read_at(slot0_offset, 6)
1160                .await
1161                .unwrap()
1162                .coalesce()
1163                .freeze()
1164                .into();
1165            assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1166
1167            // === Step 4: Extend to 50 bytes → new CRC goes to slot 0, slot 1 protected ===
1168            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1169                .await
1170                .unwrap();
1171            append
1172                .append(&(31..=50).collect::<Vec<u8>>())
1173                .await
1174                .unwrap();
1175            append.sync().await.unwrap();
1176            drop(append);
1177
1178            // === Step 5: Verify slot 0 was overwritten, slot 1 unchanged ===
1179            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1180
1181            // Slot 0 should have new CRC (not our dummy marker)
1182            let slot0_after: Vec<u8> = blob
1183                .read_at(slot0_offset, 6)
1184                .await
1185                .unwrap()
1186                .coalesce()
1187                .freeze()
1188                .into();
1189            assert_ne!(
1190                slot0_after, DUMMY_MARKER,
1191                "Slot 0 should have been overwritten with new CRC"
1192            );
1193
1194            // Slot 1 should be UNCHANGED (protected)
1195            let slot1_after: Vec<u8> = blob
1196                .read_at(slot1_offset, 6)
1197                .await
1198                .unwrap()
1199                .coalesce()
1200                .freeze()
1201                .into();
1202            assert_eq!(
1203                slot1_before, slot1_after,
1204                "Slot 1 was modified! Protected region violated."
1205            );
1206
1207            // Verify the new CRC in slot 0 has len=50
1208            let page = blob
1209                .read_at(0, physical_page_size)
1210                .await
1211                .unwrap()
1212                .coalesce();
1213            let crc = read_crc_record_from_page(page.as_ref());
1214            assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1215        });
1216    }
1217
1218    /// Test that slot 0 is NOT overwritten when it's the protected slot.
1219    ///
1220    /// Strategy: After extending three times (slot 0 becomes authoritative again with largest len),
1221    /// mangle the non-authoritative slot 1. Then extend again - slot 1 should be overwritten
1222    /// with the new CRC, while slot 0 (protected) should remain untouched.
1223    #[test_traced("DEBUG")]
1224    fn test_crc_slot0_protected() {
1225        let executor = deterministic::Runner::default();
1226        executor.start(|context: deterministic::Context| async move {
1227            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1228            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1229            let slot0_offset = PAGE_SIZE.get() as u64;
1230            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1231
1232            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1233            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1234            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1235                .await
1236                .unwrap();
1237            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1238            append.sync().await.unwrap();
1239            drop(append);
1240
1241            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1242            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1243            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1244                .await
1245                .unwrap();
1246            append
1247                .append(&(11..=30).collect::<Vec<u8>>())
1248                .await
1249                .unwrap();
1250            append.sync().await.unwrap();
1251            drop(append);
1252
1253            // === Step 3: Extend to 50 bytes → slot 0 authoritative (len=50) ===
1254            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1255            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1256                .await
1257                .unwrap();
1258            append
1259                .append(&(31..=50).collect::<Vec<u8>>())
1260                .await
1261                .unwrap();
1262            append.sync().await.unwrap();
1263            drop(append);
1264
1265            // Verify slot 0 is now authoritative
1266            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1267            let page = blob
1268                .read_at(0, physical_page_size)
1269                .await
1270                .unwrap()
1271                .coalesce();
1272            let crc = read_crc_record_from_page(page.as_ref());
1273            assert!(
1274                crc.len1 > crc.len2,
1275                "Slot 0 should be authoritative (len1={} > len2={})",
1276                crc.len1,
1277                crc.len2
1278            );
1279
1280            // Capture slot 0 bytes before mangling slot 1
1281            let slot0_before: Vec<u8> = blob
1282                .read_at(slot0_offset, 6)
1283                .await
1284                .unwrap()
1285                .coalesce()
1286                .freeze()
1287                .into();
1288
1289            // === Step 4: Mangle slot 1 (non-authoritative) ===
1290            blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
1291                .await
1292                .unwrap();
1293            blob.sync().await.unwrap();
1294
1295            // Verify mangle worked
1296            let slot1_mangled: Vec<u8> = blob
1297                .read_at(slot1_offset, 6)
1298                .await
1299                .unwrap()
1300                .coalesce()
1301                .freeze()
1302                .into();
1303            assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1304
1305            // === Step 5: Extend to 70 bytes → new CRC goes to slot 1, slot 0 protected ===
1306            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1307                .await
1308                .unwrap();
1309            append
1310                .append(&(51..=70).collect::<Vec<u8>>())
1311                .await
1312                .unwrap();
1313            append.sync().await.unwrap();
1314            drop(append);
1315
1316            // === Step 6: Verify slot 1 was overwritten, slot 0 unchanged ===
1317            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1318
1319            // Slot 1 should have new CRC (not our dummy marker)
1320            let slot1_after: Vec<u8> = blob
1321                .read_at(slot1_offset, 6)
1322                .await
1323                .unwrap()
1324                .coalesce()
1325                .freeze()
1326                .into();
1327            assert_ne!(
1328                slot1_after, DUMMY_MARKER,
1329                "Slot 1 should have been overwritten with new CRC"
1330            );
1331
1332            // Slot 0 should be UNCHANGED (protected)
1333            let slot0_after: Vec<u8> = blob
1334                .read_at(slot0_offset, 6)
1335                .await
1336                .unwrap()
1337                .coalesce()
1338                .freeze()
1339                .into();
1340            assert_eq!(
1341                slot0_before, slot0_after,
1342                "Slot 0 was modified! Protected region violated."
1343            );
1344
1345            // Verify the new CRC in slot 1 has len=70
1346            let page = blob
1347                .read_at(0, physical_page_size)
1348                .await
1349                .unwrap()
1350                .coalesce();
1351            let crc = read_crc_record_from_page(page.as_ref());
1352            assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1353        });
1354    }
1355
1356    /// Test that the data prefix is NOT overwritten when extending a partial page.
1357    ///
1358    /// Strategy: Write data, then mangle the padding area (between data end and CRC start).
1359    /// After extending, the original data should be unchanged but the mangled padding
1360    /// should be overwritten with new data.
1361    #[test_traced("DEBUG")]
1362    fn test_data_prefix_not_overwritten() {
1363        let executor = deterministic::Runner::default();
1364        executor.start(|context: deterministic::Context| async move {
1365            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1366            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1367
1368            // === Step 1: Write 20 bytes ===
1369            let (blob, _) = context
1370                .open("test_partition", b"prefix_test")
1371                .await
1372                .unwrap();
1373            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1374                .await
1375                .unwrap();
1376            let data1: Vec<u8> = (1..=20).collect();
1377            append.append(&data1).await.unwrap();
1378            append.sync().await.unwrap();
1379            drop(append);
1380
1381            // === Step 2: Capture the first 20 bytes and mangle bytes 25-30 (in padding area) ===
1382            let (blob, size) = context
1383                .open("test_partition", b"prefix_test")
1384                .await
1385                .unwrap();
1386            assert_eq!(size, physical_page_size as u64);
1387
1388            let prefix_before: Vec<u8> = blob
1389                .read_at(0, 20)
1390                .await
1391                .unwrap()
1392                .coalesce()
1393                .freeze()
1394                .into();
1395
1396            // Mangle bytes 25-30 (safely in the padding area, after our 20 bytes of data)
1397            blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
1398            blob.sync().await.unwrap();
1399
1400            // === Step 3: Extend to 40 bytes ===
1401            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1402                .await
1403                .unwrap();
1404            append
1405                .append(&(21..=40).collect::<Vec<u8>>())
1406                .await
1407                .unwrap();
1408            append.sync().await.unwrap();
1409            drop(append);
1410
1411            // === Step 4: Verify prefix unchanged, mangled area overwritten ===
1412            let (blob, _) = context
1413                .open("test_partition", b"prefix_test")
1414                .await
1415                .unwrap();
1416
1417            // Original 20 bytes should be unchanged
1418            let prefix_after: Vec<u8> = blob
1419                .read_at(0, 20)
1420                .await
1421                .unwrap()
1422                .coalesce()
1423                .freeze()
1424                .into();
1425            assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1426
1427            // Bytes at offset 25-30: data (21..=40) starts at offset 20, so offset 25 has value 26
1428            let overwritten: Vec<u8> = blob
1429                .read_at(25, 6)
1430                .await
1431                .unwrap()
1432                .coalesce()
1433                .freeze()
1434                .into();
1435            assert_eq!(
1436                overwritten,
1437                vec![26, 27, 28, 29, 30, 31],
1438                "New data should overwrite padding area"
1439            );
1440        });
1441    }
1442
1443    /// Test CRC slot protection when extending past a page boundary.
1444    ///
1445    /// Strategy: Write partial page, mangle slot 0 (non-authoritative after we do first extend),
1446    /// then extend past page boundary. Verify slot 0 gets new full-page CRC while
1447    /// the mangled marker is overwritten, and second page is written correctly.
1448    #[test_traced("DEBUG")]
1449    fn test_crc_slot_protection_across_page_boundary() {
1450        let executor = deterministic::Runner::default();
1451        executor.start(|context: deterministic::Context| async move {
1452            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1453            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1454            let slot0_offset = PAGE_SIZE.get() as u64;
1455            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1456
1457            // === Step 1: Write 50 bytes → slot 0 authoritative ===
1458            let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1459            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1460                .await
1461                .unwrap();
1462            append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1463            append.sync().await.unwrap();
1464            drop(append);
1465
1466            // === Step 2: Extend to 80 bytes → slot 1 authoritative ===
1467            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1468            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1469                .await
1470                .unwrap();
1471            append
1472                .append(&(51..=80).collect::<Vec<u8>>())
1473                .await
1474                .unwrap();
1475            append.sync().await.unwrap();
1476            drop(append);
1477
1478            // Verify slot 1 is authoritative
1479            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1480            let page = blob
1481                .read_at(0, physical_page_size)
1482                .await
1483                .unwrap()
1484                .coalesce();
1485            let crc = read_crc_record_from_page(page.as_ref());
1486            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1487
1488            // Capture slot 1 before extending past page boundary
1489            let slot1_before: Vec<u8> = blob
1490                .read_at(slot1_offset, 6)
1491                .await
1492                .unwrap()
1493                .coalesce()
1494                .freeze()
1495                .into();
1496
1497            // Mangle slot 0 (non-authoritative)
1498            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1499                .await
1500                .unwrap();
1501            blob.sync().await.unwrap();
1502
1503            // === Step 3: Extend past page boundary (80 + 40 = 120, PAGE_SIZE=103) ===
1504            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1505                .await
1506                .unwrap();
1507            append
1508                .append(&(81..=120).collect::<Vec<u8>>())
1509                .await
1510                .unwrap();
1511            append.sync().await.unwrap();
1512            drop(append);
1513
1514            // === Step 4: Verify results ===
1515            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1516            assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1517
1518            // Slot 0 should have been overwritten with full-page CRC (not dummy marker)
1519            let slot0_after: Vec<u8> = blob
1520                .read_at(slot0_offset, 6)
1521                .await
1522                .unwrap()
1523                .coalesce()
1524                .freeze()
1525                .into();
1526            assert_ne!(
1527                slot0_after, DUMMY_MARKER,
1528                "Slot 0 should have full-page CRC"
1529            );
1530
1531            // Slot 1 should be UNCHANGED (protected during boundary crossing)
1532            let slot1_after: Vec<u8> = blob
1533                .read_at(slot1_offset, 6)
1534                .await
1535                .unwrap()
1536                .coalesce()
1537                .freeze()
1538                .into();
1539            assert_eq!(
1540                slot1_before, slot1_after,
1541                "Slot 1 was modified during page boundary crossing!"
1542            );
1543
1544            // Verify page 0 has correct CRC structure
1545            let page0 = blob
1546                .read_at(0, physical_page_size)
1547                .await
1548                .unwrap()
1549                .coalesce();
1550            let crc0 = read_crc_record_from_page(page0.as_ref());
1551            assert_eq!(
1552                crc0.len1,
1553                PAGE_SIZE.get(),
1554                "Slot 0 should have full page length"
1555            );
1556
1557            // Verify data integrity
1558            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1559                .await
1560                .unwrap();
1561            assert_eq!(append.size().await, 120);
1562            let all_data: Vec<u8> = append.read_at(0, 120).await.unwrap().coalesce().into();
1563            let expected: Vec<u8> = (1..=120).collect();
1564            assert_eq!(all_data, expected);
1565        });
1566    }
1567
1568    /// Test that corrupting the primary CRC (but not its length) causes fallback to the previous
1569    /// partial page contents.
1570    ///
1571    /// Strategy:
1572    /// 1. Write 10 bytes → slot 0 authoritative (len=10, valid crc)
1573    /// 2. Extend to 30 bytes → slot 1 authoritative (len=30, valid crc)
1574    /// 3. Corrupt ONLY the crc2 value in slot 1 (not the length)
1575    /// 4. Re-open and verify we fall back to slot 0's 10 bytes
1576    #[test_traced("DEBUG")]
1577    fn test_crc_fallback_on_corrupted_primary() {
1578        let executor = deterministic::Runner::default();
1579        executor.start(|context: deterministic::Context| async move {
1580            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1581            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1582            // crc2 is at offset: PAGE_SIZE + 6 (for len2) + 2 (skip len2 bytes) = PAGE_SIZE + 8
1583            let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1584
1585            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1586            let (blob, _) = context
1587                .open("test_partition", b"crc_fallback")
1588                .await
1589                .unwrap();
1590            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1591                .await
1592                .unwrap();
1593            let data1: Vec<u8> = (1..=10).collect();
1594            append.append(&data1).await.unwrap();
1595            append.sync().await.unwrap();
1596            drop(append);
1597
1598            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1599            let (blob, size) = context
1600                .open("test_partition", b"crc_fallback")
1601                .await
1602                .unwrap();
1603            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1604                .await
1605                .unwrap();
1606            append
1607                .append(&(11..=30).collect::<Vec<u8>>())
1608                .await
1609                .unwrap();
1610            append.sync().await.unwrap();
1611            drop(append);
1612
1613            // Verify slot 1 is now authoritative and data reads correctly
1614            let (blob, size) = context
1615                .open("test_partition", b"crc_fallback")
1616                .await
1617                .unwrap();
1618            assert_eq!(size, physical_page_size as u64);
1619
1620            let page = blob
1621                .read_at(0, physical_page_size)
1622                .await
1623                .unwrap()
1624                .coalesce();
1625            let crc = read_crc_record_from_page(page.as_ref());
1626            assert!(
1627                crc.len2 > crc.len1,
1628                "Slot 1 should be authoritative (len2={} > len1={})",
1629                crc.len2,
1630                crc.len1
1631            );
1632            assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1633            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1634
1635            // Verify we can read all 30 bytes before corruption
1636            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1637                .await
1638                .unwrap();
1639            assert_eq!(append.size().await, 30);
1640            let all_data: Vec<u8> = append.read_at(0, 30).await.unwrap().coalesce().into();
1641            let expected: Vec<u8> = (1..=30).collect();
1642            assert_eq!(all_data, expected);
1643            drop(append);
1644
1645            // === Step 3: Corrupt ONLY crc2 (not len2) ===
1646            // crc2 is 4 bytes at offset PAGE_SIZE + 8
1647            blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1648                .await
1649                .unwrap();
1650            blob.sync().await.unwrap();
1651
1652            // Verify corruption: len2 should still be 30, but crc2 is now garbage
1653            let page = blob
1654                .read_at(0, physical_page_size)
1655                .await
1656                .unwrap()
1657                .coalesce();
1658            let crc = read_crc_record_from_page(page.as_ref());
1659            assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1660            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1661
1662            // === Step 4: Re-open and verify fallback to slot 0's 10 bytes ===
1663            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1664                .await
1665                .unwrap();
1666
1667            // Should fall back to 10 bytes (slot 0's length)
1668            assert_eq!(
1669                append.size().await,
1670                10,
1671                "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1672            );
1673
1674            // Verify the data is the original 10 bytes
1675            let fallback_data: Vec<u8> = append.read_at(0, 10).await.unwrap().coalesce().into();
1676            assert_eq!(
1677                fallback_data, data1,
1678                "Fallback data should match original 10 bytes"
1679            );
1680
1681            // Reading beyond 10 bytes should fail
1682            let result = append.read_at(0, 11).await;
1683            assert!(result.is_err(), "Reading beyond fallback size should fail");
1684        });
1685    }
1686
1687    /// Test that corrupting a non-last page's primary CRC fails even if fallback is valid.
1688    ///
1689    /// Non-last pages must always be full. If the primary CRC is corrupted and the fallback
1690    /// indicates a partial page, validation should fail entirely (not fall back to partial).
1691    ///
1692    /// Strategy:
1693    /// 1. Write 10 bytes → slot 0 has len=10 (partial)
1694    /// 2. Extend to full page (103 bytes) → slot 1 has len=103 (full, authoritative)
1695    /// 3. Extend past page boundary (e.g., 110 bytes) → page 0 is now non-last
1696    /// 4. Corrupt the primary CRC of page 0 (slot 1's crc, which has len=103)
1697    /// 5. Re-open and verify that reading from page 0 fails (fallback has len=10, not full)
1698    #[test_traced("DEBUG")]
1699    fn test_non_last_page_rejects_partial_fallback() {
1700        let executor = deterministic::Runner::default();
1701        executor.start(|context: deterministic::Context| async move {
1702            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1703            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1704            // crc2 for page 0 is at offset: PAGE_SIZE + 8
1705            let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1706
1707            // === Step 1: Write 10 bytes → slot 0 has len=10 ===
1708            let (blob, _) = context
1709                .open("test_partition", b"non_last_page")
1710                .await
1711                .unwrap();
1712            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1713                .await
1714                .unwrap();
1715            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1716            append.sync().await.unwrap();
1717            drop(append);
1718
1719            // === Step 2: Extend to exactly full page (103 bytes) → slot 1 has len=103 ===
1720            let (blob, size) = context
1721                .open("test_partition", b"non_last_page")
1722                .await
1723                .unwrap();
1724            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1725                .await
1726                .unwrap();
1727            // Add bytes 11 through 103 (93 more bytes)
1728            append
1729                .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1730                .await
1731                .unwrap();
1732            append.sync().await.unwrap();
1733            drop(append);
1734
1735            // Verify page 0 slot 1 is authoritative with len=103 (full page)
1736            let (blob, size) = context
1737                .open("test_partition", b"non_last_page")
1738                .await
1739                .unwrap();
1740            let page = blob
1741                .read_at(0, physical_page_size)
1742                .await
1743                .unwrap()
1744                .coalesce();
1745            let crc = read_crc_record_from_page(page.as_ref());
1746            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1747            assert_eq!(
1748                crc.len2,
1749                PAGE_SIZE.get(),
1750                "Slot 1 should have len=103 (full page)"
1751            );
1752            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1753
1754            // === Step 3: Extend past page boundary (add 10 more bytes for total of 113) ===
1755            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1756                .await
1757                .unwrap();
1758            // Add bytes 104 through 113 (10 more bytes, now on page 1)
1759            append
1760                .append(&(104..=113).collect::<Vec<u8>>())
1761                .await
1762                .unwrap();
1763            append.sync().await.unwrap();
1764            drop(append);
1765
1766            // Verify we now have 2 pages
1767            let (blob, size) = context
1768                .open("test_partition", b"non_last_page")
1769                .await
1770                .unwrap();
1771            assert_eq!(
1772                size,
1773                (physical_page_size * 2) as u64,
1774                "Should have 2 physical pages"
1775            );
1776
1777            // Verify data is readable before corruption
1778            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1779                .await
1780                .unwrap();
1781            assert_eq!(append.size().await, 113);
1782            let all_data: Vec<u8> = append.read_at(0, 113).await.unwrap().coalesce().into();
1783            let expected: Vec<u8> = (1..=113).collect();
1784            assert_eq!(all_data, expected);
1785            drop(append);
1786
1787            // === Step 4: Corrupt page 0's primary CRC (slot 1's crc2) ===
1788            blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1789                .await
1790                .unwrap();
1791            blob.sync().await.unwrap();
1792
1793            // Verify corruption: page 0's slot 1 still has len=103 but bad CRC
1794            let page = blob
1795                .read_at(0, physical_page_size)
1796                .await
1797                .unwrap()
1798                .coalesce();
1799            let crc = read_crc_record_from_page(page.as_ref());
1800            assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1801            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1802            // Slot 0 fallback has len=10 (partial), which is invalid for non-last page
1803            assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1804
1805            // === Step 5: Re-open and try to read from page 0 ===
1806            // The first page's primary CRC is bad, and fallback indicates partial (len=10).
1807            // Since page 0 is not the last page, a partial fallback is invalid.
1808            // Reading from page 0 should fail because the fallback CRC indicates a partial
1809            // page, which is not allowed for non-last pages.
1810            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1811                .await
1812                .unwrap();
1813
1814            // The blob still reports 113 bytes because init only validates the last page.
1815            // But reading from page 0 should fail because the CRC fallback is partial.
1816            assert_eq!(append.size().await, 113);
1817
1818            // Try to read from page 0 - this should fail with InvalidChecksum because
1819            // the fallback CRC has len=10 (partial), which is invalid for a non-last page.
1820            let result = append.read_at(0, 10).await;
1821            assert!(
1822                result.is_err(),
1823                "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1824                result
1825            );
1826            drop(append);
1827
1828            // Also verify that reading via Replay fails the same way.
1829            let (blob, size) = context
1830                .open("test_partition", b"non_last_page")
1831                .await
1832                .unwrap();
1833            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1834                .await
1835                .unwrap();
1836            let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1837
1838            // Try to fill pages - should fail on CRC validation.
1839            let result = replay.ensure(1).await;
1840            assert!(
1841                result.is_err(),
1842                "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1843                result
1844            );
1845        });
1846    }
1847
1848    #[test]
1849    fn test_resize_shrink_validates_crc() {
1850        // Verify that shrinking a blob to a partial page validates the CRC, rather than
1851        // blindly reading raw bytes which could silently load corrupted data.
1852        let executor = deterministic::Runner::default();
1853
1854        executor.start(|context| async move {
1855            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1856            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1857
1858            let (blob, size) = context
1859                .open("test_partition", b"resize_crc_test")
1860                .await
1861                .unwrap();
1862
1863            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1864                .await
1865                .unwrap();
1866
1867            // Write data across 3 pages: page 0 (full), page 1 (full), page 2 (partial).
1868            // PAGE_SIZE = 103, so 250 bytes = 103 + 103 + 44.
1869            let data: Vec<u8> = (0..=249).collect();
1870            append.append(&data).await.unwrap();
1871            append.sync().await.unwrap();
1872            assert_eq!(append.size().await, 250);
1873            drop(append);
1874
1875            // Corrupt the CRC record of page 1 (middle page).
1876            let (blob, size) = context
1877                .open("test_partition", b"resize_crc_test")
1878                .await
1879                .unwrap();
1880            assert_eq!(size as usize, physical_page_size * 3);
1881
1882            // Page 1 CRC record is at the end of the second physical page.
1883            let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
1884            blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
1885                .await
1886                .unwrap();
1887            blob.sync().await.unwrap();
1888
1889            // Open the blob - Append::new() validates the LAST page (page 2), which is still valid.
1890            // So it should open successfully with size 250.
1891            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1892                .await
1893                .unwrap();
1894            assert_eq!(append.size().await, 250);
1895
1896            // Try to shrink to 150 bytes, which ends in page 1 (the corrupted page).
1897            // 150 bytes = page 0 (103 full) + page 1 (47 partial).
1898            // This should fail because page 1's CRC is corrupted.
1899            let result = append.resize(150).await;
1900            assert!(
1901                matches!(result, Err(crate::Error::InvalidChecksum)),
1902                "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
1903                result
1904            );
1905        });
1906    }
1907
1908    #[test]
1909    fn test_reopen_partial_tail_append_and_resize() {
1910        let executor = deterministic::Runner::default();
1911
1912        executor.start(|context| async move {
1913            const PAGE_SIZE: NonZeroU16 = NZU16!(64);
1914            const BUFFER_SIZE: usize = 256;
1915
1916            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(4));
1917
1918            let (blob, size) = context
1919                .open("test_partition", b"partial_tail_test")
1920                .await
1921                .unwrap();
1922
1923            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1924                .await
1925                .unwrap();
1926
1927            // Write some initial data.
1928            append.append(&[1, 2, 3, 4, 5]).await.unwrap();
1929            append.sync().await.unwrap();
1930            assert_eq!(append.size().await, 5);
1931            drop(append);
1932
1933            let (blob, size) = context
1934                .open("test_partition", b"partial_tail_test")
1935                .await
1936                .unwrap();
1937
1938            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1939                .await
1940                .unwrap();
1941            assert_eq!(append.size().await, 5);
1942
1943            append.append(&[6, 7, 8]).await.unwrap();
1944            append.resize(6).await.unwrap();
1945            append.sync().await.unwrap();
1946
1947            let data: Vec<u8> = append.read_at(0, 6).await.unwrap().coalesce().into();
1948            assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
1949        });
1950    }
1951
1952    #[test]
1953    fn test_corrupted_crc_len_too_large() {
1954        let executor = deterministic::Runner::default();
1955
1956        executor.start(|context| async move {
1957            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1958            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1959
1960            // Step 1: Create blob with valid data
1961            let (blob, size) = context
1962                .open("test_partition", b"crc_len_test")
1963                .await
1964                .unwrap();
1965
1966            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1967                .await
1968                .unwrap();
1969
1970            append.append(&[0x42; 50]).await.unwrap();
1971            append.sync().await.unwrap();
1972            drop(append);
1973
1974            // Step 2: Corrupt the CRC record to have len > page_size
1975            let (blob, size) = context
1976                .open("test_partition", b"crc_len_test")
1977                .await
1978                .unwrap();
1979            assert_eq!(size as usize, physical_page_size);
1980
1981            // CRC record is at the end of the physical page
1982            let crc_offset = PAGE_SIZE.get() as u64;
1983
1984            // Create a CRC record with len1 = 0xFFFF (65535), which is >> page_size (103)
1985            // Format: [len1_hi, len1_lo, crc1 (4 bytes), len2_hi, len2_lo, crc2 (4 bytes)]
1986            let bad_crc_record: [u8; 12] = [
1987                0xFF, 0xFF, // len1 = 65535 (way too large)
1988                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
1989                0x00, 0x00, // len2 = 0
1990                0x00, 0x00, 0x00, 0x00, // crc2 = 0
1991            ];
1992            blob.write_at(crc_offset, bad_crc_record.to_vec())
1993                .await
1994                .unwrap();
1995            blob.sync().await.unwrap();
1996
1997            // Step 3: Try to open the blob - should NOT panic, should return error or handle gracefully
1998            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
1999
2000            // Either returns InvalidChecksum error OR truncates the corrupted data
2001            // (both are acceptable behaviors - panicking is NOT acceptable)
2002            match result {
2003                Ok(append) => {
2004                    // If it opens successfully, the corrupted page should have been truncated
2005                    let recovered_size = append.size().await;
2006                    assert_eq!(
2007                        recovered_size, 0,
2008                        "Corrupted page should be truncated, size should be 0"
2009                    );
2010                }
2011                Err(e) => {
2012                    // Error is also acceptable
2013                    assert!(
2014                        matches!(e, crate::Error::InvalidChecksum),
2015                        "Expected InvalidChecksum error, got: {:?}",
2016                        e
2017                    );
2018                }
2019            }
2020        });
2021    }
2022
2023    #[test]
2024    fn test_corrupted_crc_both_slots_len_too_large() {
2025        let executor = deterministic::Runner::default();
2026
2027        executor.start(|context| async move {
2028            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2029
2030            // Step 1: Create blob with valid data
2031            let (blob, size) = context
2032                .open("test_partition", b"crc_both_bad")
2033                .await
2034                .unwrap();
2035
2036            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2037                .await
2038                .unwrap();
2039
2040            append.append(&[0x42; 50]).await.unwrap();
2041            append.sync().await.unwrap();
2042            drop(append);
2043
2044            // Step 2: Corrupt BOTH CRC slots to have len > page_size
2045            let (blob, size) = context
2046                .open("test_partition", b"crc_both_bad")
2047                .await
2048                .unwrap();
2049
2050            let crc_offset = PAGE_SIZE.get() as u64;
2051
2052            // Both slots have len > page_size
2053            let bad_crc_record: [u8; 12] = [
2054                0x01, 0x00, // len1 = 256 (> 103)
2055                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
2056                0x02, 0x00, // len2 = 512 (> 103)
2057                0xCA, 0xFE, 0xBA, 0xBE, // crc2 (garbage)
2058            ];
2059            blob.write_at(crc_offset, bad_crc_record.to_vec())
2060                .await
2061                .unwrap();
2062            blob.sync().await.unwrap();
2063
2064            // Step 3: Try to open - should NOT panic
2065            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2066
2067            match result {
2068                Ok(append) => {
2069                    // Corrupted page truncated
2070                    assert_eq!(append.size().await, 0);
2071                }
2072                Err(e) => {
2073                    assert!(
2074                        matches!(e, crate::Error::InvalidChecksum),
2075                        "Expected InvalidChecksum, got: {:?}",
2076                        e
2077                    );
2078                }
2079            }
2080        });
2081    }
2082}