Skip to main content

commonware_runtime/utils/buffer/paged/
append.rs

1//! The [Append] wrapper consists of a [Blob] and a write buffer, and provides a logical view over
2//! the underlying blob which has a page-oriented structure that provides integrity guarantees. The
3//! wrapper also provides read caching managed by a page cache.
4//!
5//! # Warning
6//!
7//! Writing new data to the blob can only be done through `append`. The `write` function is not
8//! supported and will panic.
9//!
10//! # Immutability
11//!
12//! The wrapper can be created in (or converted to) an immutable state, which will prevent any
13//! modifications while still supporting cached reads. This can be used to reduce its memory
14//! footprint and/or to prevent unintended modifications.
15//!
16//! # Recovery
17//!
18//! On `sync`, this wrapper will durably write buffered data to the underlying blob in pages. All
19//! pages have a [Checksum] at the end. If no CRC record existed before for the page being written,
20//! then one of the checksums will be all zero. If a checksum already existed for the page being
21//! written, then the write will overwrite only the checksum with the lesser length value. Should
22//! this write fail, the previously committed page state can still be recovered.
23//!
24//! During non-immutable blob initialization, the wrapper will back up over any page that is not
25//! accompanied by a valid CRC, treating it as the result of an incomplete write that may be
26//! invalid. Immutable blob initialization will fail if any trailing data is detected that cannot be
27//! validated by a CRC.
28
29use super::read::{PageReader, Replay};
30use crate::{
31    buffer::{
32        paged::{CacheRef, Checksum, CHECKSUM_SIZE},
33        tip::Buffer,
34    },
35    Blob, Error, IoBufMut, IoBufs, IoBufsMut, RwLock, RwLockWriteGuard,
36};
37use commonware_cryptography::Crc32;
38use std::{
39    num::{NonZeroU16, NonZeroUsize},
40    sync::Arc,
41};
42use tracing::warn;
43
44/// Indicates which CRC slot in a page record must not be overwritten.
45#[derive(Clone, Copy)]
46enum ProtectedCrc {
47    First,
48    Second,
49}
50
51/// Describes the state of the underlying blob with respect to the buffer.
52#[derive(Clone)]
53struct BlobState<B: Blob> {
54    blob: B,
55
56    /// The page where the next appended byte will be written to.
57    current_page: u64,
58
59    /// The state of the partial page in the blob. If it was written due to a sync call, then this
60    /// will contain its CRC record.
61    partial_page_state: Option<Checksum>,
62}
63
64/// A [Blob] wrapper that supports write-cached appending of data, with checksums for data integrity
65/// and page cache managed caching.
66#[derive(Clone)]
67pub struct Append<B: Blob> {
68    /// The underlying blob being wrapped.
69    blob_state: Arc<RwLock<BlobState<B>>>,
70
71    /// Unique id assigned to this blob by the page cache.
72    id: u64,
73
74    /// A reference to the page cache that manages read caching for this blob.
75    cache_ref: CacheRef,
76
77    /// The write buffer containing any logical bytes following the last full page boundary in the
78    /// underlying blob.
79    buffer: Arc<RwLock<Buffer>>,
80}
81
82/// Returns the capacity with a floor applied to ensure it can hold at least one full page of new
83/// data even when caching a nearly-full page of already written data.
84fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
85    let floor = page_size as usize * 2;
86    if capacity < floor {
87        warn!(
88            floor,
89            "requested buffer capacity is too low, increasing it to floor"
90        );
91        floor
92    } else {
93        capacity
94    }
95}
96
97impl<B: Blob> Append<B> {
98    /// Create a new [Append] wrapper of the provided `blob` that is known to have `blob_size`
99    /// underlying physical bytes, using the provided `cache_ref` for read caching, and a write
100    /// buffer with capacity `capacity`. Rewinds the blob if necessary to ensure it only contains
101    /// checksum-validated data.
102    pub async fn new(
103        blob: B,
104        original_blob_size: u64,
105        capacity: usize,
106        cache_ref: CacheRef,
107    ) -> Result<Self, Error> {
108        let (partial_page_state, pages, invalid_data_found) =
109            Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
110        if invalid_data_found {
111            // Invalid data was detected, trim it from the blob.
112            let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
113            warn!(
114                original_blob_size,
115                new_blob_size, "truncating blob to remove invalid data"
116            );
117            blob.resize(new_blob_size).await?;
118            blob.sync().await?;
119        }
120
121        let capacity = capacity_with_floor(capacity, cache_ref.page_size());
122
123        let (blob_state, data) = match partial_page_state {
124            Some((mut partial_page, crc_record)) => {
125                // A partial page exists, make sure we buffer it.
126                partial_page.reserve(capacity - partial_page.len());
127                (
128                    BlobState {
129                        blob,
130                        current_page: pages - 1,
131                        partial_page_state: Some(crc_record),
132                    },
133                    partial_page,
134                )
135            }
136            None => (
137                BlobState {
138                    blob,
139                    current_page: pages,
140                    partial_page_state: None,
141                },
142                Vec::with_capacity(capacity),
143            ),
144        };
145
146        let buffer = Buffer {
147            offset: blob_state.current_page * cache_ref.page_size(),
148            data,
149            capacity,
150            immutable: false,
151        };
152
153        Ok(Self {
154            blob_state: Arc::new(RwLock::new(blob_state)),
155            id: cache_ref.next_id().await,
156            cache_ref,
157            buffer: Arc::new(RwLock::new(buffer)),
158        })
159    }
160
161    /// Return a new [Append] wrapper of the provided `blob` that is known to have `blob_size`
162    /// underlying physical bytes, using the provided `cache_ref` for read caching. The wrapper is
163    /// for read-only data, and any append attempts will return error. The provided `capacity` is
164    /// used only if the blob is later turned into a mutable one. Immutable blobs are assumed
165    /// consistent on disk, so any CRC verification failure results in an error without any recovery
166    /// attempt.
167    pub async fn new_immutable(
168        blob: B,
169        blob_size: u64,
170        capacity: usize,
171        cache_ref: CacheRef,
172    ) -> Result<Self, Error> {
173        let (partial_page_state, pages, invalid_data_found) =
174            Self::read_last_valid_page(&blob, blob_size, cache_ref.page_size()).await?;
175        if invalid_data_found {
176            // Invalid data was detected, so this blob is not consistent.
177            return Err(Error::InvalidChecksum);
178        }
179
180        let capacity = capacity_with_floor(capacity, cache_ref.page_size());
181
182        let (blob_state, data) = match partial_page_state {
183            Some((mut partial_page, crc_record)) => {
184                // A partial page exists, so put it in the buffer.
185                partial_page.shrink_to_fit();
186                (
187                    BlobState {
188                        blob,
189                        current_page: pages - 1,
190                        partial_page_state: Some(crc_record),
191                    },
192                    partial_page,
193                )
194            }
195            None => (
196                BlobState {
197                    blob,
198                    current_page: pages,
199                    partial_page_state: None,
200                },
201                vec![],
202            ),
203        };
204        let buffer = Buffer {
205            data,
206            capacity,
207            offset: blob_state.current_page * cache_ref.page_size(),
208            immutable: true,
209        };
210
211        Ok(Self {
212            blob_state: Arc::new(RwLock::new(blob_state)),
213            id: cache_ref.next_id().await,
214            cache_ref,
215            buffer: Arc::new(RwLock::new(buffer)),
216        })
217    }
218
219    /// Returns `true` if this blob is in the immutable state.
220    pub async fn is_immutable(&self) -> bool {
221        let buffer = self.buffer.read().await;
222
223        buffer.immutable
224    }
225
226    /// Convert this blob to the immutable state if it's not already in it.
227    ///
228    /// If there is unwritten data in the buffer, it will be flushed and synced before returning.
229    pub async fn to_immutable(&self) -> Result<(), Error> {
230        // Flush any buffered data. When flush_internal returns, write_at has completed and data
231        // has been written to the underlying blob.
232        let mut buf_guard = self.buffer.write().await;
233        if buf_guard.immutable {
234            return Ok(());
235        }
236        buf_guard.immutable = true;
237        self.flush_internal(buf_guard, true).await?;
238
239        // Shrink the buffer capacity to minimum since we won't be adding to it. This requires
240        // re-acquiring the write lock.
241        {
242            let mut buf_guard = self.buffer.write().await;
243            buf_guard.data.shrink_to_fit();
244        }
245
246        // Sync the underlying blob to ensure new_immutable on restart will succeed even in the
247        // event of a crash.
248        let blob_state = self.blob_state.read().await;
249        blob_state.blob.sync().await
250    }
251
252    /// Convert this blob to the mutable state if it's not already in it.
253    pub async fn to_mutable(&self) {
254        let mut buffer = self.buffer.write().await;
255        if !buffer.immutable {
256            return;
257        }
258        buffer.immutable = false;
259    }
260
261    /// Scans backwards from the end of the blob, stopping when it finds a valid page.
262    ///
263    /// # Returns
264    ///
265    /// A tuple of `(partial_page, page_count, invalid_data_found)`:
266    ///
267    /// - `partial_page`: If the last valid page is partial (contains fewer than `page_size` logical
268    ///   bytes), returns `Some((data, crc_record))` containing the logical data and its CRC record.
269    ///   Returns `None` if the last valid page is full or if no valid pages exist.
270    ///
271    /// - `page_count`: The number of pages in the blob up to and including the last valid page
272    ///   found (whether or not it's partial). Note that it's possible earlier pages may be invalid
273    ///   since this function stops scanning when it finds one valid page.
274    ///
275    /// - `invalid_data_found`: `true` if there are any bytes in the blob that follow the last valid
276    ///   page. Typically the blob should be resized to eliminate them since their integrity cannot
277    ///   be guaranteed.
278    async fn read_last_valid_page(
279        blob: &B,
280        blob_size: u64,
281        page_size: u64,
282    ) -> Result<(Option<(Vec<u8>, Checksum)>, u64, bool), Error> {
283        let physical_page_size = page_size + CHECKSUM_SIZE;
284        let partial_bytes = blob_size % physical_page_size;
285        let mut last_page_end = blob_size - partial_bytes;
286
287        // If the last physical page in the blob is truncated, it can't have a valid CRC record and
288        // must be invalid.
289        let mut invalid_data_found = partial_bytes != 0;
290
291        while last_page_end != 0 {
292            // Read the last page and parse its CRC record.
293            let page_start = last_page_end - physical_page_size;
294            let buf = blob
295                .read_at(page_start, IoBufMut::zeroed(physical_page_size as usize))
296                .await?
297                .coalesce()
298                .freeze();
299
300            match Checksum::validate_page(buf.as_ref()) {
301                Some(crc_record) => {
302                    // Found a valid page.
303                    let (len, _) = crc_record.get_crc();
304                    let len = len as u64;
305                    if len != page_size {
306                        // The page is partial (logical data doesn't fill the page).
307                        let logical_bytes = buf.slice(..len as usize).into();
308                        return Ok((
309                            Some((logical_bytes, crc_record)),
310                            last_page_end / physical_page_size,
311                            invalid_data_found,
312                        ));
313                    }
314                    // The page is full.
315                    return Ok((None, last_page_end / physical_page_size, invalid_data_found));
316                }
317                None => {
318                    // The page is invalid.
319                    last_page_end = page_start;
320                    invalid_data_found = true;
321                }
322            }
323        }
324
325        // No valid page exists in the blob.
326        Ok((None, 0, invalid_data_found))
327    }
328
329    /// Append all bytes in `buf` to the tip of the blob.
330    ///
331    /// # Errors
332    ///
333    /// * `Error::ImmutableBlob` - The blob is in the immutable state.
334    pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
335        let mut buffer = self.buffer.write().await;
336        if buffer.immutable {
337            return Err(Error::ImmutableBlob);
338        }
339
340        if !buffer.append(buf) {
341            return Ok(());
342        }
343
344        // Buffer is over capacity, so we need to write data to the blob.
345        self.flush_internal(buffer, false).await
346    }
347
348    /// Flush all full pages from the buffer to disk, resetting the buffer to contain only the bytes
349    /// in any final partial page. If `write_partial_page` is true, the partial page will be written
350    /// to the blob as well along with a CRC record.
351    async fn flush_internal(
352        &self,
353        mut buf_guard: RwLockWriteGuard<'_, Buffer>,
354        write_partial_page: bool,
355    ) -> Result<(), Error> {
356        let buffer = &mut *buf_guard;
357
358        // Cache the pages we are writing in the page cache so they remain cached for concurrent
359        // reads while we flush the buffer.
360        let remaining_byte_count = self
361            .cache_ref
362            .cache(self.id, &buffer.data, buffer.offset)
363            .await;
364
365        // Read the old partial page state before doing the heavy work of preparing physical pages.
366        // This is safe because partial_page_state is only modified by flush_internal, and we hold
367        // the buffer write lock which prevents concurrent flushes.
368        let old_partial_page_state = {
369            let blob_state = self.blob_state.read().await;
370            blob_state.partial_page_state.clone()
371        };
372
373        // Prepare the *physical* pages corresponding to the data in the buffer.
374        // Pass the old partial page state so the CRC record is constructed correctly.
375        let (physical_pages, partial_page_state) = self.to_physical_pages(
376            &*buffer,
377            write_partial_page,
378            old_partial_page_state.as_ref(),
379        );
380
381        // If there's nothing to write, return early.
382        if physical_pages.is_empty() {
383            return Ok(());
384        }
385
386        // Drain the provided buffer of the full pages that are now cached in the page cache and
387        // will be written to the blob.
388        let bytes_to_drain = buffer.data.len() - remaining_byte_count;
389        buffer.data.drain(0..bytes_to_drain);
390        buffer.offset += bytes_to_drain as u64;
391        let new_offset = buffer.offset;
392
393        // Acquire a write lock on the blob state so nobody tries to read or modify the blob while
394        // we're writing to it.
395        let mut blob_state = self.blob_state.write().await;
396
397        // Release the buffer lock to allow for concurrent reads & buffered writes while we write
398        // the physical pages.
399        drop(buf_guard);
400
401        let logical_page_size = self.cache_ref.page_size() as usize;
402        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
403        let write_at_offset = blob_state.current_page * physical_page_size as u64;
404
405        // Count only FULL pages for advancing current_page. A partial page (if included) takes
406        // up a full physical page on disk, but it's not complete - the next byte still goes to
407        // that same logical page.
408        let total_pages_in_buffer = physical_pages.len() / physical_page_size;
409        let full_pages_written = if partial_page_state.is_some() {
410            total_pages_in_buffer.saturating_sub(1)
411        } else {
412            total_pages_in_buffer
413        };
414
415        // Identify protected regions based on the OLD partial page state
416        let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
417
418        // Update state before writing. This may appear to risk data loss if writes fail,
419        // but write failures are fatal per this codebase's design - callers must not use
420        // the blob after any mutable method returns an error.
421        blob_state.current_page += full_pages_written as u64;
422        blob_state.partial_page_state = partial_page_state;
423
424        // Make sure the buffer offset and underlying blob agree on the state of the tip.
425        assert_eq!(
426            blob_state.current_page * self.cache_ref.page_size(),
427            new_offset
428        );
429
430        // Write the physical pages to the blob.
431        // If there are protected regions in the first page, we need to write around them.
432        if let Some((prefix_len, protected_crc)) = protected_regions {
433            match protected_crc {
434                ProtectedCrc::First => {
435                    // Protected CRC is first: [page_size..page_size+6]
436                    // Write 1: New data in first page [prefix_len..page_size]
437                    if prefix_len < logical_page_size {
438                        blob_state
439                            .blob
440                            .write_at(
441                                write_at_offset + prefix_len as u64,
442                                physical_pages[prefix_len..logical_page_size].to_vec(),
443                            )
444                            .await?;
445                    }
446                    // Write 2: Second CRC of first page + all remaining pages [page_size+6..end]
447                    let second_crc_start = logical_page_size + 6;
448                    blob_state
449                        .blob
450                        .write_at(
451                            write_at_offset + second_crc_start as u64,
452                            physical_pages[second_crc_start..].to_vec(),
453                        )
454                        .await?;
455                }
456                ProtectedCrc::Second => {
457                    // Protected CRC is second: [page_size+6..page_size+12]
458                    // Write 1: New data + first CRC of first page [prefix_len..page_size+6]
459                    let first_crc_end = logical_page_size + 6;
460                    if prefix_len < first_crc_end {
461                        blob_state
462                            .blob
463                            .write_at(
464                                write_at_offset + prefix_len as u64,
465                                physical_pages[prefix_len..first_crc_end].to_vec(),
466                            )
467                            .await?;
468                    }
469                    // Write 2: All remaining pages (if any) [physical_page_size..end]
470                    if physical_pages.len() > physical_page_size {
471                        blob_state
472                            .blob
473                            .write_at(
474                                write_at_offset + physical_page_size as u64,
475                                physical_pages[physical_page_size..].to_vec(),
476                            )
477                            .await?;
478                    }
479                }
480            }
481        } else {
482            // No protected regions, write everything in one operation
483            blob_state
484                .blob
485                .write_at(write_at_offset, physical_pages)
486                .await?;
487        }
488
489        Ok(())
490    }
491
492    /// Returns the logical size of the blob. This accounts for both written and buffered data.
493    pub async fn size(&self) -> u64 {
494        let buffer = self.buffer.read().await;
495        buffer.size()
496    }
497
498    /// Reads up to `buf.len()` bytes starting at `logical_offset`, but only as many as are
499    /// available.
500    ///
501    /// This is useful for reading variable-length prefixes (like varints) where you want to read
502    /// up to a maximum number of bytes but the actual data might be shorter.
503    ///
504    /// Returns the buffer (truncated to actual bytes read) and the number of bytes read.
505    /// Returns an error if no bytes are available at the given offset.
506    pub async fn read_up_to(
507        &self,
508        buf: impl Into<IoBufMut> + Send,
509        logical_offset: u64,
510    ) -> Result<(IoBufMut, usize), Error> {
511        let mut buf = buf.into();
512        if buf.is_empty() {
513            return Ok((buf, 0));
514        }
515        let blob_size = self.size().await;
516        let available = (blob_size.saturating_sub(logical_offset) as usize).min(buf.len());
517        if available == 0 {
518            return Err(Error::BlobInsufficientLength);
519        }
520        buf.truncate(available);
521        self.read_into(buf.as_mut(), logical_offset).await?;
522
523        Ok((buf, available))
524    }
525
526    /// Reads bytes starting at `logical_offset` into `buf`.
527    ///
528    /// This method allows reading directly into a mutable slice without taking ownership of the
529    /// buffer or requiring a specific buffer type.
530    pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
531        // Ensure the read doesn't overflow.
532        let end_offset = logical_offset
533            .checked_add(buf.len() as u64)
534            .ok_or(Error::OffsetOverflow)?;
535
536        // Acquire a read lock on the buffer.
537        let buffer = self.buffer.read().await;
538
539        // If the data required is beyond the size of the blob, return an error.
540        if end_offset > buffer.size() {
541            return Err(Error::BlobInsufficientLength);
542        }
543
544        // Extract any bytes from the buffer that overlap with the requested range.
545        let remaining = buffer.extract(buf.as_mut(), logical_offset);
546
547        // Release buffer lock before potential I/O.
548        drop(buffer);
549
550        if remaining == 0 {
551            return Ok(());
552        }
553
554        // Fast path: try to read *only* from page cache without acquiring blob lock. This allows
555        // concurrent reads even while a flush is in progress.
556        let cached = self
557            .cache_ref
558            .read_cached(self.id, &mut buf[..remaining], logical_offset)
559            .await;
560
561        if cached == remaining {
562            // All bytes found in cache.
563            return Ok(());
564        }
565
566        // Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight
567        // write completes before we read from the blob.
568        let blob_guard = self.blob_state.read().await;
569
570        // Read remaining bytes that were not already obtained from the earlier cache read.
571        let uncached_offset = logical_offset + cached as u64;
572        let uncached_len = remaining - cached;
573        self.cache_ref
574            .read(
575                &blob_guard.blob,
576                self.id,
577                &mut buf[cached..cached + uncached_len],
578                uncached_offset,
579            )
580            .await
581    }
582
583    /// Returns the protected region info for a partial page, if any.
584    ///
585    /// # Returns
586    ///
587    /// `None` if there's no existing partial page.
588    ///
589    /// `Some((prefix_len, protected_crc))` where:
590    /// - `prefix_len`: bytes `[0..prefix_len]` were already written and can be substituted with
591    ///   zeros (skip writing)
592    /// - `protected_crc`: which CRC slot must not be overwritten
593    fn identify_protected_regions(
594        partial_page_state: Option<&Checksum>,
595    ) -> Option<(usize, ProtectedCrc)> {
596        let crc_record = partial_page_state?;
597        let (old_len, _) = crc_record.get_crc();
598        // The protected CRC is the one with the larger (authoritative) length.
599        let protected_crc = if crc_record.len1 >= crc_record.len2 {
600            ProtectedCrc::First
601        } else {
602            ProtectedCrc::Second
603        };
604        Some((old_len as usize, protected_crc))
605    }
606
607    /// Prepare a buffer containing the result of converting each buffered logical page in the input
608    /// into a physical page (meaning each page has a CRC record). If the last page is not yet full,
609    /// it will be included only if `include_partial_page` is true.
610    ///
611    /// # Arguments
612    ///
613    /// * `buffer` - The buffer containing logical page data
614    /// * `include_partial_page` - Whether to include a partial page if one exists
615    /// * `old_crc_record` - The CRC record from a previously committed partial page, if any.
616    ///   When present, the first page's CRC record will preserve the old CRC in its original slot
617    ///   and place the new CRC in the other slot.
618    fn to_physical_pages(
619        &self,
620        buffer: &Buffer,
621        include_partial_page: bool,
622        old_crc_record: Option<&Checksum>,
623    ) -> (Vec<u8>, Option<Checksum>) {
624        let logical_page_size = self.cache_ref.page_size() as usize;
625        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
626        let pages_to_write = buffer.data.len() / logical_page_size;
627        let mut write_buffer = Vec::with_capacity(pages_to_write * physical_page_size);
628
629        // For each logical page, copy over the data and then write a crc record for it.
630        for page in 0..pages_to_write {
631            let start_read_idx = page * logical_page_size;
632            let end_read_idx = start_read_idx + logical_page_size;
633            let logical_page = &buffer.data[start_read_idx..end_read_idx];
634            write_buffer.extend_from_slice(logical_page);
635
636            let crc = Crc32::checksum(logical_page);
637            let logical_page_size_u16 =
638                u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
639
640            // For the first page, if there's an old partial page CRC, construct the record
641            // to preserve the old CRC in its original slot.
642            let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
643                Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
644            } else {
645                Checksum::new(logical_page_size_u16, crc)
646            };
647            write_buffer.extend_from_slice(&crc_record.to_bytes());
648        }
649
650        if !include_partial_page {
651            return (write_buffer, None);
652        }
653
654        let partial_page = &buffer.data[pages_to_write * logical_page_size..];
655        if partial_page.is_empty() {
656            // No partial page data to write.
657            return (write_buffer, None);
658        }
659
660        // If there are no full pages and the partial page length matches what was already
661        // written, there's nothing new to write.
662        if pages_to_write == 0 {
663            if let Some(old_crc) = old_crc_record {
664                let (old_len, _) = old_crc.get_crc();
665                if partial_page.len() == old_len as usize {
666                    return (write_buffer, None);
667                }
668            }
669        }
670        write_buffer.extend_from_slice(partial_page);
671        let partial_len = partial_page.len();
672        let crc = Crc32::checksum(partial_page);
673
674        // Pad with zeros to fill up to logical_page_size.
675        write_buffer.resize(write_buffer.len() + (logical_page_size - partial_len), 0);
676
677        // For partial pages: if this is the first page and there's an old CRC, preserve it.
678        // Otherwise just use the new CRC in slot 0.
679        let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
680            Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
681        } else {
682            Checksum::new(partial_len as u16, crc)
683        };
684
685        write_buffer.extend_from_slice(&crc_record.to_bytes());
686
687        // Return the CRC record that matches what we wrote to disk, so that future flushes
688        // correctly identify which slot is protected.
689        (write_buffer, Some(crc_record))
690    }
691
692    /// Build a CRC record that preserves the old CRC in its original slot and places
693    /// the new CRC in the other slot.
694    const fn build_crc_record_preserving_old(
695        new_len: u16,
696        new_crc: u32,
697        old_crc: &Checksum,
698    ) -> Checksum {
699        let (old_len, old_crc_val) = old_crc.get_crc();
700        // The old CRC is in the slot with the larger length value (first slot wins ties).
701        if old_crc.len1 >= old_crc.len2 {
702            // Old CRC is in slot 0, put new CRC in slot 1
703            Checksum {
704                len1: old_len,
705                crc1: old_crc_val,
706                len2: new_len,
707                crc2: new_crc,
708            }
709        } else {
710            // Old CRC is in slot 1, put new CRC in slot 0
711            Checksum {
712                len1: new_len,
713                crc1: new_crc,
714                len2: old_len,
715                crc2: old_crc_val,
716            }
717        }
718    }
719
720    /// Flushes any buffered data, then returns a [Replay] for the underlying blob.
721    ///
722    /// The returned replay can be used to sequentially read all pages from the blob while ensuring
723    /// all data passes integrity verification. CRCs are validated but not included in the output.
724    pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
725        let logical_page_size = self.cache_ref.page_size();
726        let logical_page_size_nz =
727            NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
728
729        // Flush any buffered data (without fsync) so the reader sees all written data.
730        {
731            let buf_guard = self.buffer.write().await;
732            if !buf_guard.immutable {
733                self.flush_internal(buf_guard, true).await?;
734            }
735        }
736
737        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
738
739        // Convert buffer size (bytes) to page count
740        let prefetch_pages = buffer_size.get() / physical_page_size as usize;
741        let prefetch_pages = prefetch_pages.max(1); // At least 1 page
742        let blob_guard = self.blob_state.read().await;
743
744        // Compute both physical and logical blob sizes.
745        let (physical_blob_size, logical_blob_size) =
746            blob_guard.partial_page_state.as_ref().map_or_else(
747                || {
748                    // All pages are full.
749                    let physical = physical_page_size * blob_guard.current_page;
750                    let logical = logical_page_size * blob_guard.current_page;
751                    (physical, logical)
752                },
753                |crc_record| {
754                    // There's a partial page with a checksum.
755                    let (partial_len, _) = crc_record.get_crc();
756                    let partial_len = partial_len as u64;
757                    // Physical: all pages including the partial one (which is padded to full size).
758                    let physical = physical_page_size * (blob_guard.current_page + 1);
759                    // Logical: full pages before this + partial page's actual data length.
760                    let logical = logical_page_size * blob_guard.current_page + partial_len;
761                    (physical, logical)
762                },
763            );
764
765        let reader = PageReader::new(
766            blob_guard.blob.clone(),
767            physical_blob_size,
768            logical_blob_size,
769            prefetch_pages,
770            logical_page_size_nz,
771        );
772        Ok(Replay::new(reader))
773    }
774}
775
776impl<B: Blob> Blob for Append<B> {
777    async fn read_at(
778        &self,
779        logical_offset: u64,
780        buf: impl Into<IoBufsMut> + Send,
781    ) -> Result<IoBufsMut, Error> {
782        let buf = buf.into();
783        let len = buf.len();
784        match buf {
785            IoBufsMut::Single(mut single) => {
786                self.read_into(single.as_mut(), logical_offset).await?;
787                Ok(IoBufsMut::Single(single))
788            }
789            IoBufsMut::Chunked(mut chunks) => {
790                // Read into a temporary buffer and copy back to preserve structure
791                let mut temp = vec![0u8; len];
792                self.read_into(&mut temp, logical_offset).await?;
793                let mut pos = 0;
794                for chunk in chunks.iter_mut() {
795                    let chunk_len = chunk.len();
796                    chunk.as_mut().copy_from_slice(&temp[pos..pos + chunk_len]);
797                    pos += chunk_len;
798                }
799                Ok(IoBufsMut::Chunked(chunks))
800            }
801        }
802    }
803
804    async fn sync(&self) -> Result<(), Error> {
805        // Flush any buffered data, including any partial page. When flush_internal returns,
806        // write_at has completed and data has been written to the underlying blob.
807        let buf_guard = self.buffer.write().await;
808        if buf_guard.immutable {
809            return Ok(());
810        }
811        self.flush_internal(buf_guard, true).await?;
812
813        // Sync the underlying blob. We need the blob read lock here since sync() requires access
814        // to the blob, but only a read lock since we're not modifying blob state.
815        let blob_state = self.blob_state.read().await;
816        blob_state.blob.sync().await
817    }
818
819    /// This [Blob] trait method is unimplemented by [Append] and unconditionally panics.
820    async fn write_at(&self, _offset: u64, _buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
821        // TODO(<https://github.com/commonwarexyz/monorepo/issues/1207>): Extend the page cache to
822        // support arbitrary writes.
823        unimplemented!("append-only blob type does not support write_at")
824    }
825
826    /// Resize the blob to the provided logical `size`.
827    ///
828    /// This truncates the blob to contain only `size` logical bytes. The physical blob size will
829    /// be adjusted to include the necessary CRC records for the remaining pages.
830    ///
831    /// # Warning
832    ///
833    /// - Concurrent mutable operations (append, resize) are not supported and will cause data loss.
834    /// - Concurrent readers which try to read past the new size during the resize may error.
835    /// - The resize is not guaranteed durable until the next sync.
836    async fn resize(&self, size: u64) -> Result<(), Error> {
837        let current_size = self.size().await;
838
839        // Handle growing by appending zero bytes.
840        if size > current_size {
841            let zeros_needed = (size - current_size) as usize;
842            let zeros = vec![0u8; zeros_needed];
843            self.append(&zeros).await?;
844            return Ok(());
845        }
846
847        // Implementation note: rewinding the blob across a page boundary potentially results in
848        // stale data remaining in the page cache. We don't proactively purge the data
849        // within this function since it would be inaccessible anyway. Instead we ensure it is
850        // always updated should the blob grow back to the point where we have new data for the same
851        // page, if any old data hasn't expired naturally by then.
852
853        let logical_page_size = self.cache_ref.page_size();
854        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
855
856        // Flush any buffered data first to ensure we have a consistent state on disk.
857        self.sync().await?;
858
859        // Acquire both locks to prevent concurrent operations.
860        let mut buf_guard = self.buffer.write().await;
861        if buf_guard.immutable {
862            return Err(Error::ImmutableBlob);
863        }
864        let mut blob_guard = self.blob_state.write().await;
865
866        // Calculate the physical size needed for the new logical size.
867        let full_pages = size / logical_page_size;
868        let partial_bytes = size % logical_page_size;
869        let new_physical_size = if partial_bytes > 0 {
870            // We need full_pages + 1 physical pages to hold the partial data.
871            // The partial page will be padded to full physical page size.
872            (full_pages + 1) * physical_page_size
873        } else {
874            // No partial page needed.
875            full_pages * physical_page_size
876        };
877
878        // Resize the underlying blob.
879        blob_guard.blob.resize(new_physical_size).await?;
880        blob_guard.partial_page_state = None;
881
882        // Update blob state and buffer based on the desired logical size. The partial page data is
883        // read with CRC validation; the validated length may exceed partial_bytes (reflecting the
884        // old data length), but we only load the prefix we need. The next sync will write the
885        // correct CRC for the new length.
886        //
887        // Note: This updates state before validation completes, which could leave state
888        // inconsistent if validation fails. This is acceptable because failures from mutable
889        // methods are fatal - callers must not use the blob after any error.
890
891        blob_guard.current_page = full_pages;
892        buf_guard.offset = full_pages * logical_page_size;
893
894        if partial_bytes > 0 {
895            // There's a partial page. Read its data from disk with CRC validation.
896            let page_data =
897                super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
898
899            // Ensure the validated data covers what we need.
900            if (page_data.len() as u64) < partial_bytes {
901                return Err(Error::InvalidChecksum);
902            }
903
904            buf_guard.data = page_data[..partial_bytes as usize].to_vec();
905        } else {
906            // No partial page - all pages are full or blob is empty.
907            buf_guard.data = vec![];
908        }
909
910        Ok(())
911    }
912}
913
914#[cfg(test)]
915mod tests {
916    use super::*;
917    use crate::{deterministic, IoBufMut, Runner as _, Storage as _};
918    use commonware_codec::ReadExt;
919    use commonware_macros::test_traced;
920    use commonware_utils::{NZUsize, NZU16};
921    use std::num::NonZeroU16;
922
923    const PAGE_SIZE: NonZeroU16 = NZU16!(103); // janky size to ensure we test page alignment
924    const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
925
926    #[test_traced("DEBUG")]
927    fn test_append_crc_empty() {
928        let executor = deterministic::Runner::default();
929        executor.start(|context: deterministic::Context| async move {
930            // Open a new blob.
931            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
932            assert_eq!(blob_size, 0);
933
934            // Create a page cache reference.
935            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
936
937            // Create an Append wrapper.
938            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
939                .await
940                .unwrap();
941
942            // Verify initial size is 0.
943            assert_eq!(append.size().await, 0);
944
945            // Close & re-open.
946            append.sync().await.unwrap();
947            drop(append);
948
949            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
950            assert_eq!(blob_size, 0); // There was no need to write a crc since there was no data.
951
952            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
953                .await
954                .unwrap();
955
956            assert_eq!(append.size().await, 0);
957        });
958    }
959
960    #[test_traced("DEBUG")]
961    fn test_append_crc_basic() {
962        let executor = deterministic::Runner::default();
963        executor.start(|context: deterministic::Context| async move {
964            // Open a new blob.
965            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
966            assert_eq!(blob_size, 0);
967
968            // Create a page cache reference.
969            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
970
971            // Create an Append wrapper.
972            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
973                .await
974                .unwrap();
975
976            // Verify initial size is 0.
977            assert_eq!(append.size().await, 0);
978
979            // Append some bytes.
980            let data = vec![1, 2, 3, 4, 5];
981            append.append(&data).await.unwrap();
982
983            // Verify size reflects appended data.
984            assert_eq!(append.size().await, 5);
985
986            // Append more bytes.
987            let more_data = vec![6, 7, 8, 9, 10];
988            append.append(&more_data).await.unwrap();
989
990            // Verify size is cumulative.
991            assert_eq!(append.size().await, 10);
992
993            // Read back the first chunk and verify.
994            let read_buf = append
995                .read_at(0, IoBufMut::zeroed(5))
996                .await
997                .unwrap()
998                .coalesce();
999            assert_eq!(read_buf, &data[..]);
1000
1001            // Read back the second chunk and verify.
1002            let read_buf = append
1003                .read_at(5, IoBufMut::zeroed(5))
1004                .await
1005                .unwrap()
1006                .coalesce();
1007            assert_eq!(read_buf, &more_data[..]);
1008
1009            // Read all data at once and verify.
1010            let read_buf = append
1011                .read_at(0, IoBufMut::zeroed(10))
1012                .await
1013                .unwrap()
1014                .coalesce();
1015            assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1016
1017            // Close and reopen the blob and make sure the data is still there and the trailing
1018            // checksum is written & stripped as expected.
1019            append.sync().await.unwrap();
1020            drop(append);
1021
1022            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1023            // Physical page = 103 logical + 12 Checksum = 115 bytes (padded partial page)
1024            assert_eq!(blob_size, 115);
1025            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1026                .await
1027                .unwrap();
1028            assert_eq!(append.size().await, 10); // CRC should be stripped after verification
1029
1030            // Append data that spans a page boundary.
1031            // PAGE_SIZE=103 is the logical page size. We have 10 bytes, so writing
1032            // 100 more bytes (total 110) will cross the page boundary at byte 103.
1033            let spanning_data: Vec<u8> = (11..=110).collect();
1034            append.append(&spanning_data).await.unwrap();
1035            assert_eq!(append.size().await, 110);
1036
1037            // Read back data that spans the page boundary.
1038            let read_buf = append
1039                .read_at(10, IoBufMut::zeroed(100))
1040                .await
1041                .unwrap()
1042                .coalesce();
1043            assert_eq!(read_buf, &spanning_data[..]);
1044
1045            // Read all 110 bytes at once.
1046            let read_buf = append
1047                .read_at(0, IoBufMut::zeroed(110))
1048                .await
1049                .unwrap()
1050                .coalesce();
1051            let expected: Vec<u8> = (1..=110).collect();
1052            assert_eq!(read_buf, &expected[..]);
1053
1054            // Drop and re-open and make sure bytes are still there.
1055            append.sync().await.unwrap();
1056            drop(append);
1057
1058            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1059            // 2 physical pages: 2 * 115 = 230 bytes
1060            assert_eq!(blob_size, 230);
1061            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1062                .await
1063                .unwrap();
1064            assert_eq!(append.size().await, 110);
1065
1066            // Append data to reach exactly a page boundary.
1067            // Logical page size is 103. We have 110 bytes, next boundary is 206 (103 * 2).
1068            // So we need 96 more bytes.
1069            let boundary_data: Vec<u8> = (111..=206).collect();
1070            assert_eq!(boundary_data.len(), 96);
1071            append.append(&boundary_data).await.unwrap();
1072            assert_eq!(append.size().await, 206);
1073
1074            // Verify we can read it back.
1075            let read_buf = append
1076                .read_at(0, IoBufMut::zeroed(206))
1077                .await
1078                .unwrap()
1079                .coalesce();
1080            let expected: Vec<u8> = (1..=206).collect();
1081            assert_eq!(read_buf, &expected[..]);
1082
1083            // Drop and re-open at the page boundary.
1084            append.sync().await.unwrap();
1085            drop(append);
1086
1087            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1088            // Physical size should be exactly 2 pages: 115 * 2 = 230 bytes
1089            assert_eq!(blob_size, 230);
1090            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1091                .await
1092                .unwrap();
1093            assert_eq!(append.size().await, 206);
1094
1095            // Verify data is still readable after reopen.
1096            let read_buf = append
1097                .read_at(0, IoBufMut::zeroed(206))
1098                .await
1099                .unwrap()
1100                .coalesce();
1101            assert_eq!(read_buf, &expected[..]);
1102        });
1103    }
1104
1105    /// Helper to read the CRC record from raw blob bytes at the end of a physical page.
1106    fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1107        let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1108        Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1109    }
1110
1111    /// Dummy marker bytes with len=0 so the mangled slot is never authoritative.
1112    /// Format: [len_hi=0, len_lo=0, 0xDE, 0xAD, 0xBE, 0xEF]
1113    const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1114
1115    #[test]
1116    fn test_identify_protected_regions_equal_lengths() {
1117        // When lengths are equal, the first CRC should be protected (tie-breaking rule).
1118        let record = Checksum {
1119            len1: 50,
1120            crc1: 0xAAAAAAAA,
1121            len2: 50,
1122            crc2: 0xBBBBBBBB,
1123        };
1124
1125        let result =
1126            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1127        assert!(result.is_some());
1128        let (prefix_len, protected_crc) = result.unwrap();
1129        assert_eq!(prefix_len, 50);
1130        assert!(
1131            matches!(protected_crc, ProtectedCrc::First),
1132            "First CRC should be protected when lengths are equal"
1133        );
1134    }
1135
1136    #[test]
1137    fn test_identify_protected_regions_len1_larger() {
1138        // When len1 > len2, the first CRC should be protected.
1139        let record = Checksum {
1140            len1: 100,
1141            crc1: 0xAAAAAAAA,
1142            len2: 50,
1143            crc2: 0xBBBBBBBB,
1144        };
1145
1146        let result =
1147            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1148        assert!(result.is_some());
1149        let (prefix_len, protected_crc) = result.unwrap();
1150        assert_eq!(prefix_len, 100);
1151        assert!(
1152            matches!(protected_crc, ProtectedCrc::First),
1153            "First CRC should be protected when len1 > len2"
1154        );
1155    }
1156
1157    #[test]
1158    fn test_identify_protected_regions_len2_larger() {
1159        // When len2 > len1, the second CRC should be protected.
1160        let record = Checksum {
1161            len1: 50,
1162            crc1: 0xAAAAAAAA,
1163            len2: 100,
1164            crc2: 0xBBBBBBBB,
1165        };
1166
1167        let result =
1168            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1169        assert!(result.is_some());
1170        let (prefix_len, protected_crc) = result.unwrap();
1171        assert_eq!(prefix_len, 100);
1172        assert!(
1173            matches!(protected_crc, ProtectedCrc::Second),
1174            "Second CRC should be protected when len2 > len1"
1175        );
1176    }
1177
1178    /// Test that slot 1 is NOT overwritten when it's the protected slot.
1179    ///
1180    /// Strategy: After extending twice (so slot 1 becomes authoritative with larger len),
1181    /// mangle the non-authoritative slot 0. Then extend again - slot 0 should be overwritten
1182    /// with the new CRC, while slot 1 (protected) should remain untouched.
1183    #[test_traced("DEBUG")]
1184    fn test_crc_slot1_protected() {
1185        let executor = deterministic::Runner::default();
1186        executor.start(|context: deterministic::Context| async move {
1187            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1188            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1189            let slot0_offset = PAGE_SIZE.get() as u64;
1190            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1191
1192            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1193            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1194            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1195                .await
1196                .unwrap();
1197            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1198            append.sync().await.unwrap();
1199            drop(append);
1200
1201            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1202            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1203            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1204                .await
1205                .unwrap();
1206            append
1207                .append(&(11..=30).collect::<Vec<u8>>())
1208                .await
1209                .unwrap();
1210            append.sync().await.unwrap();
1211            drop(append);
1212
1213            // Verify slot 1 is now authoritative
1214            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1215            let page = blob
1216                .read_at(0, IoBufMut::zeroed(physical_page_size))
1217                .await
1218                .unwrap()
1219                .coalesce();
1220            let crc = read_crc_record_from_page(page.as_ref());
1221            assert!(
1222                crc.len2 > crc.len1,
1223                "Slot 1 should be authoritative (len2={} > len1={})",
1224                crc.len2,
1225                crc.len1
1226            );
1227
1228            // Capture slot 1 bytes before mangling slot 0
1229            let slot1_before: Vec<u8> = blob
1230                .read_at(slot1_offset, IoBufMut::zeroed(6))
1231                .await
1232                .unwrap()
1233                .coalesce()
1234                .freeze()
1235                .into();
1236
1237            // === Step 3: Mangle slot 0 (non-authoritative) ===
1238            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1239                .await
1240                .unwrap();
1241            blob.sync().await.unwrap();
1242
1243            // Verify mangle worked
1244            let slot0_mangled: Vec<u8> = blob
1245                .read_at(slot0_offset, IoBufMut::zeroed(6))
1246                .await
1247                .unwrap()
1248                .coalesce()
1249                .freeze()
1250                .into();
1251            assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1252
1253            // === Step 4: Extend to 50 bytes → new CRC goes to slot 0, slot 1 protected ===
1254            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1255                .await
1256                .unwrap();
1257            append
1258                .append(&(31..=50).collect::<Vec<u8>>())
1259                .await
1260                .unwrap();
1261            append.sync().await.unwrap();
1262            drop(append);
1263
1264            // === Step 5: Verify slot 0 was overwritten, slot 1 unchanged ===
1265            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1266
1267            // Slot 0 should have new CRC (not our dummy marker)
1268            let slot0_after: Vec<u8> = blob
1269                .read_at(slot0_offset, IoBufMut::zeroed(6))
1270                .await
1271                .unwrap()
1272                .coalesce()
1273                .freeze()
1274                .into();
1275            assert_ne!(
1276                slot0_after, DUMMY_MARKER,
1277                "Slot 0 should have been overwritten with new CRC"
1278            );
1279
1280            // Slot 1 should be UNCHANGED (protected)
1281            let slot1_after: Vec<u8> = blob
1282                .read_at(slot1_offset, IoBufMut::zeroed(6))
1283                .await
1284                .unwrap()
1285                .coalesce()
1286                .freeze()
1287                .into();
1288            assert_eq!(
1289                slot1_before, slot1_after,
1290                "Slot 1 was modified! Protected region violated."
1291            );
1292
1293            // Verify the new CRC in slot 0 has len=50
1294            let page = blob
1295                .read_at(0, IoBufMut::zeroed(physical_page_size))
1296                .await
1297                .unwrap()
1298                .coalesce();
1299            let crc = read_crc_record_from_page(page.as_ref());
1300            assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1301        });
1302    }
1303
1304    /// Test that slot 0 is NOT overwritten when it's the protected slot.
1305    ///
1306    /// Strategy: After extending three times (slot 0 becomes authoritative again with largest len),
1307    /// mangle the non-authoritative slot 1. Then extend again - slot 1 should be overwritten
1308    /// with the new CRC, while slot 0 (protected) should remain untouched.
1309    #[test_traced("DEBUG")]
1310    fn test_crc_slot0_protected() {
1311        let executor = deterministic::Runner::default();
1312        executor.start(|context: deterministic::Context| async move {
1313            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1314            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1315            let slot0_offset = PAGE_SIZE.get() as u64;
1316            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1317
1318            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1319            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1320            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1321                .await
1322                .unwrap();
1323            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1324            append.sync().await.unwrap();
1325            drop(append);
1326
1327            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1328            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1329            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1330                .await
1331                .unwrap();
1332            append
1333                .append(&(11..=30).collect::<Vec<u8>>())
1334                .await
1335                .unwrap();
1336            append.sync().await.unwrap();
1337            drop(append);
1338
1339            // === Step 3: Extend to 50 bytes → slot 0 authoritative (len=50) ===
1340            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1341            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1342                .await
1343                .unwrap();
1344            append
1345                .append(&(31..=50).collect::<Vec<u8>>())
1346                .await
1347                .unwrap();
1348            append.sync().await.unwrap();
1349            drop(append);
1350
1351            // Verify slot 0 is now authoritative
1352            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1353            let page = blob
1354                .read_at(0, IoBufMut::zeroed(physical_page_size))
1355                .await
1356                .unwrap()
1357                .coalesce();
1358            let crc = read_crc_record_from_page(page.as_ref());
1359            assert!(
1360                crc.len1 > crc.len2,
1361                "Slot 0 should be authoritative (len1={} > len2={})",
1362                crc.len1,
1363                crc.len2
1364            );
1365
1366            // Capture slot 0 bytes before mangling slot 1
1367            let slot0_before: Vec<u8> = blob
1368                .read_at(slot0_offset, IoBufMut::zeroed(6))
1369                .await
1370                .unwrap()
1371                .coalesce()
1372                .freeze()
1373                .into();
1374
1375            // === Step 4: Mangle slot 1 (non-authoritative) ===
1376            blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
1377                .await
1378                .unwrap();
1379            blob.sync().await.unwrap();
1380
1381            // Verify mangle worked
1382            let slot1_mangled: Vec<u8> = blob
1383                .read_at(slot1_offset, IoBufMut::zeroed(6))
1384                .await
1385                .unwrap()
1386                .coalesce()
1387                .freeze()
1388                .into();
1389            assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1390
1391            // === Step 5: Extend to 70 bytes → new CRC goes to slot 1, slot 0 protected ===
1392            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1393                .await
1394                .unwrap();
1395            append
1396                .append(&(51..=70).collect::<Vec<u8>>())
1397                .await
1398                .unwrap();
1399            append.sync().await.unwrap();
1400            drop(append);
1401
1402            // === Step 6: Verify slot 1 was overwritten, slot 0 unchanged ===
1403            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1404
1405            // Slot 1 should have new CRC (not our dummy marker)
1406            let slot1_after: Vec<u8> = blob
1407                .read_at(slot1_offset, IoBufMut::zeroed(6))
1408                .await
1409                .unwrap()
1410                .coalesce()
1411                .freeze()
1412                .into();
1413            assert_ne!(
1414                slot1_after, DUMMY_MARKER,
1415                "Slot 1 should have been overwritten with new CRC"
1416            );
1417
1418            // Slot 0 should be UNCHANGED (protected)
1419            let slot0_after: Vec<u8> = blob
1420                .read_at(slot0_offset, IoBufMut::zeroed(6))
1421                .await
1422                .unwrap()
1423                .coalesce()
1424                .freeze()
1425                .into();
1426            assert_eq!(
1427                slot0_before, slot0_after,
1428                "Slot 0 was modified! Protected region violated."
1429            );
1430
1431            // Verify the new CRC in slot 1 has len=70
1432            let page = blob
1433                .read_at(0, IoBufMut::zeroed(physical_page_size))
1434                .await
1435                .unwrap()
1436                .coalesce();
1437            let crc = read_crc_record_from_page(page.as_ref());
1438            assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1439        });
1440    }
1441
1442    /// Test that the data prefix is NOT overwritten when extending a partial page.
1443    ///
1444    /// Strategy: Write data, then mangle the padding area (between data end and CRC start).
1445    /// After extending, the original data should be unchanged but the mangled padding
1446    /// should be overwritten with new data.
1447    #[test_traced("DEBUG")]
1448    fn test_data_prefix_not_overwritten() {
1449        let executor = deterministic::Runner::default();
1450        executor.start(|context: deterministic::Context| async move {
1451            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1452            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1453
1454            // === Step 1: Write 20 bytes ===
1455            let (blob, _) = context
1456                .open("test_partition", b"prefix_test")
1457                .await
1458                .unwrap();
1459            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1460                .await
1461                .unwrap();
1462            let data1: Vec<u8> = (1..=20).collect();
1463            append.append(&data1).await.unwrap();
1464            append.sync().await.unwrap();
1465            drop(append);
1466
1467            // === Step 2: Capture the first 20 bytes and mangle bytes 25-30 (in padding area) ===
1468            let (blob, size) = context
1469                .open("test_partition", b"prefix_test")
1470                .await
1471                .unwrap();
1472            assert_eq!(size, physical_page_size as u64);
1473
1474            let prefix_before: Vec<u8> = blob
1475                .read_at(0, IoBufMut::zeroed(20))
1476                .await
1477                .unwrap()
1478                .coalesce()
1479                .freeze()
1480                .into();
1481
1482            // Mangle bytes 25-30 (safely in the padding area, after our 20 bytes of data)
1483            blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
1484            blob.sync().await.unwrap();
1485
1486            // === Step 3: Extend to 40 bytes ===
1487            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1488                .await
1489                .unwrap();
1490            append
1491                .append(&(21..=40).collect::<Vec<u8>>())
1492                .await
1493                .unwrap();
1494            append.sync().await.unwrap();
1495            drop(append);
1496
1497            // === Step 4: Verify prefix unchanged, mangled area overwritten ===
1498            let (blob, _) = context
1499                .open("test_partition", b"prefix_test")
1500                .await
1501                .unwrap();
1502
1503            // Original 20 bytes should be unchanged
1504            let prefix_after: Vec<u8> = blob
1505                .read_at(0, IoBufMut::zeroed(20))
1506                .await
1507                .unwrap()
1508                .coalesce()
1509                .freeze()
1510                .into();
1511            assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1512
1513            // Bytes at offset 25-30: data (21..=40) starts at offset 20, so offset 25 has value 26
1514            let overwritten: Vec<u8> = blob
1515                .read_at(25, IoBufMut::zeroed(6))
1516                .await
1517                .unwrap()
1518                .coalesce()
1519                .freeze()
1520                .into();
1521            assert_eq!(
1522                overwritten,
1523                vec![26, 27, 28, 29, 30, 31],
1524                "New data should overwrite padding area"
1525            );
1526        });
1527    }
1528
1529    /// Test CRC slot protection when extending past a page boundary.
1530    ///
1531    /// Strategy: Write partial page, mangle slot 0 (non-authoritative after we do first extend),
1532    /// then extend past page boundary. Verify slot 0 gets new full-page CRC while
1533    /// the mangled marker is overwritten, and second page is written correctly.
1534    #[test_traced("DEBUG")]
1535    fn test_crc_slot_protection_across_page_boundary() {
1536        let executor = deterministic::Runner::default();
1537        executor.start(|context: deterministic::Context| async move {
1538            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1539            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1540            let slot0_offset = PAGE_SIZE.get() as u64;
1541            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1542
1543            // === Step 1: Write 50 bytes → slot 0 authoritative ===
1544            let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1545            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1546                .await
1547                .unwrap();
1548            append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1549            append.sync().await.unwrap();
1550            drop(append);
1551
1552            // === Step 2: Extend to 80 bytes → slot 1 authoritative ===
1553            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1554            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1555                .await
1556                .unwrap();
1557            append
1558                .append(&(51..=80).collect::<Vec<u8>>())
1559                .await
1560                .unwrap();
1561            append.sync().await.unwrap();
1562            drop(append);
1563
1564            // Verify slot 1 is authoritative
1565            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1566            let page = blob
1567                .read_at(0, IoBufMut::zeroed(physical_page_size))
1568                .await
1569                .unwrap()
1570                .coalesce();
1571            let crc = read_crc_record_from_page(page.as_ref());
1572            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1573
1574            // Capture slot 1 before extending past page boundary
1575            let slot1_before: Vec<u8> = blob
1576                .read_at(slot1_offset, IoBufMut::zeroed(6))
1577                .await
1578                .unwrap()
1579                .coalesce()
1580                .freeze()
1581                .into();
1582
1583            // Mangle slot 0 (non-authoritative)
1584            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1585                .await
1586                .unwrap();
1587            blob.sync().await.unwrap();
1588
1589            // === Step 3: Extend past page boundary (80 + 40 = 120, PAGE_SIZE=103) ===
1590            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1591                .await
1592                .unwrap();
1593            append
1594                .append(&(81..=120).collect::<Vec<u8>>())
1595                .await
1596                .unwrap();
1597            append.sync().await.unwrap();
1598            drop(append);
1599
1600            // === Step 4: Verify results ===
1601            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1602            assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1603
1604            // Slot 0 should have been overwritten with full-page CRC (not dummy marker)
1605            let slot0_after: Vec<u8> = blob
1606                .read_at(slot0_offset, IoBufMut::zeroed(6))
1607                .await
1608                .unwrap()
1609                .coalesce()
1610                .freeze()
1611                .into();
1612            assert_ne!(
1613                slot0_after, DUMMY_MARKER,
1614                "Slot 0 should have full-page CRC"
1615            );
1616
1617            // Slot 1 should be UNCHANGED (protected during boundary crossing)
1618            let slot1_after: Vec<u8> = blob
1619                .read_at(slot1_offset, IoBufMut::zeroed(6))
1620                .await
1621                .unwrap()
1622                .coalesce()
1623                .freeze()
1624                .into();
1625            assert_eq!(
1626                slot1_before, slot1_after,
1627                "Slot 1 was modified during page boundary crossing!"
1628            );
1629
1630            // Verify page 0 has correct CRC structure
1631            let page0 = blob
1632                .read_at(0, IoBufMut::zeroed(physical_page_size))
1633                .await
1634                .unwrap()
1635                .coalesce();
1636            let crc0 = read_crc_record_from_page(page0.as_ref());
1637            assert_eq!(
1638                crc0.len1,
1639                PAGE_SIZE.get(),
1640                "Slot 0 should have full page length"
1641            );
1642
1643            // Verify data integrity
1644            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1645                .await
1646                .unwrap();
1647            assert_eq!(append.size().await, 120);
1648            let all_data: Vec<u8> = append
1649                .read_at(0, IoBufMut::zeroed(120))
1650                .await
1651                .unwrap()
1652                .coalesce()
1653                .freeze()
1654                .into();
1655            let expected: Vec<u8> = (1..=120).collect();
1656            assert_eq!(all_data, expected);
1657        });
1658    }
1659
1660    /// Test that corrupting the primary CRC (but not its length) causes fallback to the previous
1661    /// partial page contents.
1662    ///
1663    /// Strategy:
1664    /// 1. Write 10 bytes → slot 0 authoritative (len=10, valid crc)
1665    /// 2. Extend to 30 bytes → slot 1 authoritative (len=30, valid crc)
1666    /// 3. Corrupt ONLY the crc2 value in slot 1 (not the length)
1667    /// 4. Re-open and verify we fall back to slot 0's 10 bytes
1668    #[test_traced("DEBUG")]
1669    fn test_crc_fallback_on_corrupted_primary() {
1670        let executor = deterministic::Runner::default();
1671        executor.start(|context: deterministic::Context| async move {
1672            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1673            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1674            // crc2 is at offset: PAGE_SIZE + 6 (for len2) + 2 (skip len2 bytes) = PAGE_SIZE + 8
1675            let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1676
1677            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1678            let (blob, _) = context
1679                .open("test_partition", b"crc_fallback")
1680                .await
1681                .unwrap();
1682            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1683                .await
1684                .unwrap();
1685            let data1: Vec<u8> = (1..=10).collect();
1686            append.append(&data1).await.unwrap();
1687            append.sync().await.unwrap();
1688            drop(append);
1689
1690            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1691            let (blob, size) = context
1692                .open("test_partition", b"crc_fallback")
1693                .await
1694                .unwrap();
1695            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1696                .await
1697                .unwrap();
1698            append
1699                .append(&(11..=30).collect::<Vec<u8>>())
1700                .await
1701                .unwrap();
1702            append.sync().await.unwrap();
1703            drop(append);
1704
1705            // Verify slot 1 is now authoritative and data reads correctly
1706            let (blob, size) = context
1707                .open("test_partition", b"crc_fallback")
1708                .await
1709                .unwrap();
1710            assert_eq!(size, physical_page_size as u64);
1711
1712            let page = blob
1713                .read_at(0, IoBufMut::zeroed(physical_page_size))
1714                .await
1715                .unwrap()
1716                .coalesce();
1717            let crc = read_crc_record_from_page(page.as_ref());
1718            assert!(
1719                crc.len2 > crc.len1,
1720                "Slot 1 should be authoritative (len2={} > len1={})",
1721                crc.len2,
1722                crc.len1
1723            );
1724            assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1725            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1726
1727            // Verify we can read all 30 bytes before corruption
1728            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1729                .await
1730                .unwrap();
1731            assert_eq!(append.size().await, 30);
1732            let all_data: Vec<u8> = append
1733                .read_at(0, IoBufMut::zeroed(30))
1734                .await
1735                .unwrap()
1736                .coalesce()
1737                .freeze()
1738                .into();
1739            let expected: Vec<u8> = (1..=30).collect();
1740            assert_eq!(all_data, expected);
1741            drop(append);
1742
1743            // === Step 3: Corrupt ONLY crc2 (not len2) ===
1744            // crc2 is 4 bytes at offset PAGE_SIZE + 8
1745            blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1746                .await
1747                .unwrap();
1748            blob.sync().await.unwrap();
1749
1750            // Verify corruption: len2 should still be 30, but crc2 is now garbage
1751            let page = blob
1752                .read_at(0, IoBufMut::zeroed(physical_page_size))
1753                .await
1754                .unwrap()
1755                .coalesce();
1756            let crc = read_crc_record_from_page(page.as_ref());
1757            assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1758            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1759
1760            // === Step 4: Re-open and verify fallback to slot 0's 10 bytes ===
1761            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1762                .await
1763                .unwrap();
1764
1765            // Should fall back to 10 bytes (slot 0's length)
1766            assert_eq!(
1767                append.size().await,
1768                10,
1769                "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1770            );
1771
1772            // Verify the data is the original 10 bytes
1773            let fallback_data: Vec<u8> = append
1774                .read_at(0, IoBufMut::zeroed(10))
1775                .await
1776                .unwrap()
1777                .coalesce()
1778                .freeze()
1779                .into();
1780            assert_eq!(
1781                fallback_data, data1,
1782                "Fallback data should match original 10 bytes"
1783            );
1784
1785            // Reading beyond 10 bytes should fail
1786            let result = append.read_at(0, IoBufMut::zeroed(11)).await;
1787            assert!(result.is_err(), "Reading beyond fallback size should fail");
1788        });
1789    }
1790
1791    /// Test that corrupting a non-last page's primary CRC fails even if fallback is valid.
1792    ///
1793    /// Non-last pages must always be full. If the primary CRC is corrupted and the fallback
1794    /// indicates a partial page, validation should fail entirely (not fall back to partial).
1795    ///
1796    /// Strategy:
1797    /// 1. Write 10 bytes → slot 0 has len=10 (partial)
1798    /// 2. Extend to full page (103 bytes) → slot 1 has len=103 (full, authoritative)
1799    /// 3. Extend past page boundary (e.g., 110 bytes) → page 0 is now non-last
1800    /// 4. Corrupt the primary CRC of page 0 (slot 1's crc, which has len=103)
1801    /// 5. Re-open and verify that reading from page 0 fails (fallback has len=10, not full)
1802    #[test_traced("DEBUG")]
1803    fn test_non_last_page_rejects_partial_fallback() {
1804        let executor = deterministic::Runner::default();
1805        executor.start(|context: deterministic::Context| async move {
1806            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1807            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1808            // crc2 for page 0 is at offset: PAGE_SIZE + 8
1809            let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1810
1811            // === Step 1: Write 10 bytes → slot 0 has len=10 ===
1812            let (blob, _) = context
1813                .open("test_partition", b"non_last_page")
1814                .await
1815                .unwrap();
1816            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1817                .await
1818                .unwrap();
1819            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1820            append.sync().await.unwrap();
1821            drop(append);
1822
1823            // === Step 2: Extend to exactly full page (103 bytes) → slot 1 has len=103 ===
1824            let (blob, size) = context
1825                .open("test_partition", b"non_last_page")
1826                .await
1827                .unwrap();
1828            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1829                .await
1830                .unwrap();
1831            // Add bytes 11 through 103 (93 more bytes)
1832            append
1833                .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1834                .await
1835                .unwrap();
1836            append.sync().await.unwrap();
1837            drop(append);
1838
1839            // Verify page 0 slot 1 is authoritative with len=103 (full page)
1840            let (blob, size) = context
1841                .open("test_partition", b"non_last_page")
1842                .await
1843                .unwrap();
1844            let page = blob
1845                .read_at(0, IoBufMut::zeroed(physical_page_size))
1846                .await
1847                .unwrap()
1848                .coalesce();
1849            let crc = read_crc_record_from_page(page.as_ref());
1850            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1851            assert_eq!(
1852                crc.len2,
1853                PAGE_SIZE.get(),
1854                "Slot 1 should have len=103 (full page)"
1855            );
1856            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1857
1858            // === Step 3: Extend past page boundary (add 10 more bytes for total of 113) ===
1859            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1860                .await
1861                .unwrap();
1862            // Add bytes 104 through 113 (10 more bytes, now on page 1)
1863            append
1864                .append(&(104..=113).collect::<Vec<u8>>())
1865                .await
1866                .unwrap();
1867            append.sync().await.unwrap();
1868            drop(append);
1869
1870            // Verify we now have 2 pages
1871            let (blob, size) = context
1872                .open("test_partition", b"non_last_page")
1873                .await
1874                .unwrap();
1875            assert_eq!(
1876                size,
1877                (physical_page_size * 2) as u64,
1878                "Should have 2 physical pages"
1879            );
1880
1881            // Verify data is readable before corruption
1882            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1883                .await
1884                .unwrap();
1885            assert_eq!(append.size().await, 113);
1886            let all_data: Vec<u8> = append
1887                .read_at(0, IoBufMut::zeroed(113))
1888                .await
1889                .unwrap()
1890                .coalesce()
1891                .freeze()
1892                .into();
1893            let expected: Vec<u8> = (1..=113).collect();
1894            assert_eq!(all_data, expected);
1895            drop(append);
1896
1897            // === Step 4: Corrupt page 0's primary CRC (slot 1's crc2) ===
1898            blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1899                .await
1900                .unwrap();
1901            blob.sync().await.unwrap();
1902
1903            // Verify corruption: page 0's slot 1 still has len=103 but bad CRC
1904            let page = blob
1905                .read_at(0, IoBufMut::zeroed(physical_page_size))
1906                .await
1907                .unwrap()
1908                .coalesce();
1909            let crc = read_crc_record_from_page(page.as_ref());
1910            assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1911            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1912            // Slot 0 fallback has len=10 (partial), which is invalid for non-last page
1913            assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1914
1915            // === Step 5: Re-open and try to read from page 0 ===
1916            // The first page's primary CRC is bad, and fallback indicates partial (len=10).
1917            // Since page 0 is not the last page, a partial fallback is invalid.
1918            // Reading from page 0 should fail because the fallback CRC indicates a partial
1919            // page, which is not allowed for non-last pages.
1920            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1921                .await
1922                .unwrap();
1923
1924            // The blob still reports 113 bytes because init only validates the last page.
1925            // But reading from page 0 should fail because the CRC fallback is partial.
1926            assert_eq!(append.size().await, 113);
1927
1928            // Try to read from page 0 - this should fail with InvalidChecksum because
1929            // the fallback CRC has len=10 (partial), which is invalid for a non-last page.
1930            let result = append.read_at(0, IoBufMut::zeroed(10)).await;
1931            assert!(
1932                result.is_err(),
1933                "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1934                result
1935            );
1936            drop(append);
1937
1938            // Also verify that reading via Replay fails the same way.
1939            let (blob, size) = context
1940                .open("test_partition", b"non_last_page")
1941                .await
1942                .unwrap();
1943            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1944                .await
1945                .unwrap();
1946            let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1947
1948            // Try to fill pages - should fail on CRC validation.
1949            let result = replay.ensure(1).await;
1950            assert!(
1951                result.is_err(),
1952                "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1953                result
1954            );
1955        });
1956    }
1957
1958    #[test]
1959    fn test_resize_shrink_validates_crc() {
1960        // Verify that shrinking a blob to a partial page validates the CRC, rather than
1961        // blindly reading raw bytes which could silently load corrupted data.
1962        let executor = deterministic::Runner::default();
1963
1964        executor.start(|context| async move {
1965            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1966            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1967
1968            let (blob, size) = context
1969                .open("test_partition", b"resize_crc_test")
1970                .await
1971                .unwrap();
1972
1973            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1974                .await
1975                .unwrap();
1976
1977            // Write data across 3 pages: page 0 (full), page 1 (full), page 2 (partial).
1978            // PAGE_SIZE = 103, so 250 bytes = 103 + 103 + 44.
1979            let data: Vec<u8> = (0..=249).collect();
1980            append.append(&data).await.unwrap();
1981            append.sync().await.unwrap();
1982            assert_eq!(append.size().await, 250);
1983            drop(append);
1984
1985            // Corrupt the CRC record of page 1 (middle page).
1986            let (blob, size) = context
1987                .open("test_partition", b"resize_crc_test")
1988                .await
1989                .unwrap();
1990            assert_eq!(size as usize, physical_page_size * 3);
1991
1992            // Page 1 CRC record is at the end of the second physical page.
1993            let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
1994            blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
1995                .await
1996                .unwrap();
1997            blob.sync().await.unwrap();
1998
1999            // Open the blob - Append::new() validates the LAST page (page 2), which is still valid.
2000            // So it should open successfully with size 250.
2001            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2002                .await
2003                .unwrap();
2004            assert_eq!(append.size().await, 250);
2005
2006            // Try to shrink to 150 bytes, which ends in page 1 (the corrupted page).
2007            // 150 bytes = page 0 (103 full) + page 1 (47 partial).
2008            // This should fail because page 1's CRC is corrupted.
2009            let result = append.resize(150).await;
2010            assert!(
2011                matches!(result, Err(crate::Error::InvalidChecksum)),
2012                "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
2013                result
2014            );
2015        });
2016    }
2017
2018    #[test]
2019    fn test_immutable_blob_rejects_append_and_resize() {
2020        let executor = deterministic::Runner::default();
2021
2022        executor.start(|context| async move {
2023            const PAGE_SIZE: NonZeroU16 = NZU16!(64);
2024            const BUFFER_SIZE: usize = 256;
2025
2026            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(4));
2027
2028            let (blob, size) = context
2029                .open("test_partition", b"immutable_test")
2030                .await
2031                .unwrap();
2032
2033            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2034                .await
2035                .unwrap();
2036
2037            // Write some initial data.
2038            append.append(&[1, 2, 3, 4, 5]).await.unwrap();
2039            append.sync().await.unwrap();
2040            assert_eq!(append.size().await, 5);
2041
2042            // Convert to immutable.
2043            append.to_immutable().await.unwrap();
2044            assert!(append.is_immutable().await);
2045
2046            // Verify append() returns ImmutableBlob error.
2047            let result = append.append(&[6, 7, 8]).await;
2048            assert!(
2049                matches!(result, Err(crate::Error::ImmutableBlob)),
2050                "Expected ImmutableBlob error from append(), got: {:?}",
2051                result
2052            );
2053
2054            // Verify resize() returns ImmutableBlob error.
2055            let result = append.resize(100).await;
2056            assert!(
2057                matches!(result, Err(crate::Error::ImmutableBlob)),
2058                "Expected ImmutableBlob error from resize(), got: {:?}",
2059                result
2060            );
2061
2062            // Verify sync() returns Ok.
2063            let result = append.sync().await;
2064            assert!(
2065                result.is_ok(),
2066                "sync() on immutable blob should return Ok, got: {:?}",
2067                result
2068            );
2069
2070            // Verify data is still readable.
2071            let data: Vec<u8> = append
2072                .read_at(0, IoBufMut::zeroed(5))
2073                .await
2074                .unwrap()
2075                .coalesce()
2076                .freeze()
2077                .into();
2078            assert_eq!(data, vec![1, 2, 3, 4, 5]);
2079        });
2080    }
2081
2082    #[test]
2083    fn test_corrupted_crc_len_too_large() {
2084        let executor = deterministic::Runner::default();
2085
2086        executor.start(|context| async move {
2087            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2088            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2089
2090            // Step 1: Create blob with valid data
2091            let (blob, size) = context
2092                .open("test_partition", b"crc_len_test")
2093                .await
2094                .unwrap();
2095
2096            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2097                .await
2098                .unwrap();
2099
2100            append.append(&[0x42; 50]).await.unwrap();
2101            append.sync().await.unwrap();
2102            drop(append);
2103
2104            // Step 2: Corrupt the CRC record to have len > page_size
2105            let (blob, size) = context
2106                .open("test_partition", b"crc_len_test")
2107                .await
2108                .unwrap();
2109            assert_eq!(size as usize, physical_page_size);
2110
2111            // CRC record is at the end of the physical page
2112            let crc_offset = PAGE_SIZE.get() as u64;
2113
2114            // Create a CRC record with len1 = 0xFFFF (65535), which is >> page_size (103)
2115            // Format: [len1_hi, len1_lo, crc1 (4 bytes), len2_hi, len2_lo, crc2 (4 bytes)]
2116            let bad_crc_record: [u8; 12] = [
2117                0xFF, 0xFF, // len1 = 65535 (way too large)
2118                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
2119                0x00, 0x00, // len2 = 0
2120                0x00, 0x00, 0x00, 0x00, // crc2 = 0
2121            ];
2122            blob.write_at(crc_offset, bad_crc_record.to_vec())
2123                .await
2124                .unwrap();
2125            blob.sync().await.unwrap();
2126
2127            // Step 3: Try to open the blob - should NOT panic, should return error or handle gracefully
2128            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2129
2130            // Either returns InvalidChecksum error OR truncates the corrupted data
2131            // (both are acceptable behaviors - panicking is NOT acceptable)
2132            match result {
2133                Ok(append) => {
2134                    // If it opens successfully, the corrupted page should have been truncated
2135                    let recovered_size = append.size().await;
2136                    assert_eq!(
2137                        recovered_size, 0,
2138                        "Corrupted page should be truncated, size should be 0"
2139                    );
2140                }
2141                Err(e) => {
2142                    // Error is also acceptable (for immutable blobs)
2143                    assert!(
2144                        matches!(e, crate::Error::InvalidChecksum),
2145                        "Expected InvalidChecksum error, got: {:?}",
2146                        e
2147                    );
2148                }
2149            }
2150        });
2151    }
2152
2153    #[test]
2154    fn test_corrupted_crc_both_slots_len_too_large() {
2155        let executor = deterministic::Runner::default();
2156
2157        executor.start(|context| async move {
2158            let cache_ref = CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2159
2160            // Step 1: Create blob with valid data
2161            let (blob, size) = context
2162                .open("test_partition", b"crc_both_bad")
2163                .await
2164                .unwrap();
2165
2166            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2167                .await
2168                .unwrap();
2169
2170            append.append(&[0x42; 50]).await.unwrap();
2171            append.sync().await.unwrap();
2172            drop(append);
2173
2174            // Step 2: Corrupt BOTH CRC slots to have len > page_size
2175            let (blob, size) = context
2176                .open("test_partition", b"crc_both_bad")
2177                .await
2178                .unwrap();
2179
2180            let crc_offset = PAGE_SIZE.get() as u64;
2181
2182            // Both slots have len > page_size
2183            let bad_crc_record: [u8; 12] = [
2184                0x01, 0x00, // len1 = 256 (> 103)
2185                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
2186                0x02, 0x00, // len2 = 512 (> 103)
2187                0xCA, 0xFE, 0xBA, 0xBE, // crc2 (garbage)
2188            ];
2189            blob.write_at(crc_offset, bad_crc_record.to_vec())
2190                .await
2191                .unwrap();
2192            blob.sync().await.unwrap();
2193
2194            // Step 3: Try to open - should NOT panic
2195            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2196
2197            match result {
2198                Ok(append) => {
2199                    // Corrupted page truncated
2200                    assert_eq!(append.size().await, 0);
2201                }
2202                Err(e) => {
2203                    assert!(
2204                        matches!(e, crate::Error::InvalidChecksum),
2205                        "Expected InvalidChecksum, got: {:?}",
2206                        e
2207                    );
2208                }
2209            }
2210        });
2211    }
2212}