commonware_runtime/utils/buffer/pool/
append.rs

1//! The [Append] wrapper consists of a [Blob] and a write buffer, and provides a logical view over
2//! the underlying blob which has a page-oriented structure that provides integrity guarantees. The
3//! wrapper also provides read caching managed by a buffer pool.
4//!
5//! # Warning
6//!
7//! Writing new data to the blob can only be done through `append`. The `write` function is not
8//! supported and will panic.
9//!
10//! # Immutability
11//!
12//! The wrapper can be created in (or converted to) an immutable state, which will prevent any
13//! modifications while still supporting cached reads. This can be used to reduce its memory
14//! footprint and/or to prevent unintended modifications.
15//!
16//! # Recovery
17//!
18//! On `sync`, this wrapper will durably write buffered data to the underlying blob in pages. All
19//! pages have a [Checksum] at the end. If no CRC record existed before for the page being written,
20//! then one of the checksums will be all zero. If a checksum already existed for the page being
21//! written, then the write will overwrite only the checksum with the lesser length value. Should
22//! this write fail, the previously committed page state can still be recovered.
23//!
24//! During non-immutable blob initialization, the wrapper will back up over any page that is not
25//! accompanied by a valid CRC, treating it as the result of an incomplete write that may be
26//! invalid. Immutable blob initialization will fail if any trailing data is detected that cannot be
27//! validated by a CRC.
28
29use super::read::{PageReader, Replay};
30use crate::{
31    buffer::{
32        pool::{Checksum, PoolRef, CHECKSUM_SIZE},
33        tip::Buffer,
34    },
35    Blob, Error, RwLock, RwLockWriteGuard,
36};
37use commonware_cryptography::Crc32;
38use commonware_utils::StableBuf;
39use std::{
40    num::{NonZeroU16, NonZeroUsize},
41    sync::Arc,
42};
43use tracing::warn;
44
45/// Indicates which CRC slot in a page record must not be overwritten.
46#[derive(Clone, Copy)]
47enum ProtectedCrc {
48    First,
49    Second,
50}
51
52/// Describes the state of the underlying blob with respect to the buffer.
53#[derive(Clone)]
54struct BlobState<B: Blob> {
55    blob: B,
56
57    /// The page where the next appended byte will be written to.
58    current_page: u64,
59
60    /// The state of the partial page in the blob. If it was written due to a sync call, then this
61    /// will contain its CRC record.
62    partial_page_state: Option<Checksum>,
63}
64
65/// A [Blob] wrapper that supports write-cached appending of data, with checksums for data integrity
66/// and buffer pool managed caching.
67#[derive(Clone)]
68pub struct Append<B: Blob> {
69    /// The underlying blob being wrapped.
70    blob_state: Arc<RwLock<BlobState<B>>>,
71
72    /// Unique id assigned to this blob by the buffer pool.
73    id: u64,
74
75    /// A reference to the buffer pool that manages read caching for this blob.
76    pool_ref: PoolRef,
77
78    /// The write buffer containing any logical bytes following the last full page boundary in the
79    /// underlying blob.
80    buffer: Arc<RwLock<Buffer>>,
81}
82
83/// Returns the capacity with a floor applied to ensure it can hold at least one full page of new
84/// data even when caching a nearly-full page of already written data.
85fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
86    let floor = page_size as usize * 2;
87    if capacity < floor {
88        warn!(
89            floor,
90            "requested buffer capacity is too low, increasing it to floor"
91        );
92        floor
93    } else {
94        capacity
95    }
96}
97
98impl<B: Blob> Append<B> {
99    /// Create a new [Append] wrapper of the provided `blob` that is known to have `blob_size`
100    /// underlying physical bytes, using the provided `pool` for read caching, and a write buffer
101    /// with capacity `capacity`. Rewinds the blob if necessary to ensure it only contains
102    /// checksum-validated data.
103    pub async fn new(
104        blob: B,
105        original_blob_size: u64,
106        capacity: usize,
107        pool_ref: PoolRef,
108    ) -> Result<Self, Error> {
109        let (partial_page_state, pages, invalid_data_found) =
110            Self::read_last_valid_page(&blob, original_blob_size, pool_ref.page_size()).await?;
111        if invalid_data_found {
112            // Invalid data was detected, trim it from the blob.
113            let new_blob_size = pages * (pool_ref.page_size() + CHECKSUM_SIZE);
114            warn!(
115                original_blob_size,
116                new_blob_size, "truncating blob to remove invalid data"
117            );
118            blob.resize(new_blob_size).await?;
119            blob.sync().await?;
120        }
121
122        let capacity = capacity_with_floor(capacity, pool_ref.page_size());
123
124        let (blob_state, data) = match partial_page_state {
125            Some((mut partial_page, crc_record)) => {
126                // A partial page exists, make sure we buffer it.
127                partial_page.reserve(capacity - partial_page.len());
128                (
129                    BlobState {
130                        blob,
131                        current_page: pages - 1,
132                        partial_page_state: Some(crc_record),
133                    },
134                    partial_page,
135                )
136            }
137            None => (
138                BlobState {
139                    blob,
140                    current_page: pages,
141                    partial_page_state: None,
142                },
143                Vec::with_capacity(capacity),
144            ),
145        };
146
147        let buffer = Buffer {
148            offset: blob_state.current_page * pool_ref.page_size(),
149            data,
150            capacity,
151            immutable: false,
152        };
153
154        Ok(Self {
155            blob_state: Arc::new(RwLock::new(blob_state)),
156            id: pool_ref.next_id().await,
157            pool_ref,
158            buffer: Arc::new(RwLock::new(buffer)),
159        })
160    }
161
162    /// Return a new [Append] wrapper of the provided `blob` that is known to have `blob_size`
163    /// underlying physical bytes, using the provided `pool` for read caching. The wrapper is for
164    /// read-only data, and any append attempts will return error. The provided `capacity` is used
165    /// only if the blob is later turned into a mutable one. Immutable blobs are assumed consistent
166    /// on disk, so any CRC verification failure results in an error without any recovery attempt.
167    pub async fn new_immutable(
168        blob: B,
169        blob_size: u64,
170        capacity: usize,
171        pool_ref: PoolRef,
172    ) -> Result<Self, Error> {
173        let (partial_page_state, pages, invalid_data_found) =
174            Self::read_last_valid_page(&blob, blob_size, pool_ref.page_size()).await?;
175        if invalid_data_found {
176            // Invalid data was detected, so this blob is not consistent.
177            return Err(Error::InvalidChecksum);
178        }
179
180        let capacity = capacity_with_floor(capacity, pool_ref.page_size());
181
182        let (blob_state, data) = match partial_page_state {
183            Some((mut partial_page, crc_record)) => {
184                // A partial page exists, so put it in the buffer.
185                partial_page.shrink_to_fit();
186                (
187                    BlobState {
188                        blob,
189                        current_page: pages - 1,
190                        partial_page_state: Some(crc_record),
191                    },
192                    partial_page,
193                )
194            }
195            None => (
196                BlobState {
197                    blob,
198                    current_page: pages,
199                    partial_page_state: None,
200                },
201                vec![],
202            ),
203        };
204        let buffer = Buffer {
205            data,
206            capacity,
207            offset: blob_state.current_page * pool_ref.page_size(),
208            immutable: true,
209        };
210
211        Ok(Self {
212            blob_state: Arc::new(RwLock::new(blob_state)),
213            id: pool_ref.next_id().await,
214            pool_ref,
215            buffer: Arc::new(RwLock::new(buffer)),
216        })
217    }
218
219    /// Returns `true` if this blob is in the immutable state.
220    pub async fn is_immutable(&self) -> bool {
221        let buffer = self.buffer.read().await;
222
223        buffer.immutable
224    }
225
226    /// Convert this blob to the immutable state if it's not already in it.
227    ///
228    /// If there is unwritten data in the buffer, it will be flushed and synced before returning.
229    pub async fn to_immutable(&self) -> Result<(), Error> {
230        // Flush any buffered data. When flush_internal returns, write_at has completed and data
231        // has been written to the underlying blob.
232        let mut buf_guard = self.buffer.write().await;
233        if buf_guard.immutable {
234            return Ok(());
235        }
236        buf_guard.immutable = true;
237        self.flush_internal(buf_guard, true).await?;
238
239        // Shrink the buffer capacity to minimum since we won't be adding to it. This requires
240        // re-acquiring the write lock.
241        {
242            let mut buf_guard = self.buffer.write().await;
243            buf_guard.data.shrink_to_fit();
244        }
245
246        // Sync the underlying blob to ensure new_immutable on restart will succeed even in the
247        // event of a crash.
248        let blob_state = self.blob_state.read().await;
249        blob_state.blob.sync().await
250    }
251
252    /// Convert this blob to the mutable state if it's not already in it.
253    pub async fn to_mutable(&self) {
254        let mut buffer = self.buffer.write().await;
255        if !buffer.immutable {
256            return;
257        }
258        buffer.immutable = false;
259    }
260
261    /// Scans backwards from the end of the blob, stopping when it finds a valid page.
262    ///
263    /// # Returns
264    ///
265    /// A tuple of `(partial_page, page_count, invalid_data_found)`:
266    ///
267    /// - `partial_page`: If the last valid page is partial (contains fewer than `page_size` logical
268    ///   bytes), returns `Some((data, crc_record))` containing the logical data and its CRC record.
269    ///   Returns `None` if the last valid page is full or if no valid pages exist.
270    ///
271    /// - `page_count`: The number of pages in the blob up to and including the last valid page
272    ///   found (whether or not it's partial). Note that it's possible earlier pages may be invalid
273    ///   since this function stops scanning when it finds one valid page.
274    ///
275    /// - `invalid_data_found`: `true` if there are any bytes in the blob that follow the last valid
276    ///   page. Typically the blob should be resized to eliminate them since their integrity cannot
277    ///   be guaranteed.
278    async fn read_last_valid_page(
279        blob: &B,
280        blob_size: u64,
281        page_size: u64,
282    ) -> Result<(Option<(Vec<u8>, Checksum)>, u64, bool), Error> {
283        let physical_page_size = page_size + CHECKSUM_SIZE;
284        let partial_bytes = blob_size % physical_page_size;
285        let mut last_page_end = blob_size - partial_bytes;
286
287        // If the last physical page in the blob is truncated, it can't have a valid CRC record and
288        // must be invalid.
289        let mut invalid_data_found = partial_bytes != 0;
290
291        while last_page_end != 0 {
292            // Read the last page and parse its CRC record.
293            let page_start = last_page_end - physical_page_size;
294            let buf = vec![0; physical_page_size as usize];
295            let buf = blob.read_at(buf, page_start).await?;
296
297            match Checksum::validate_page(buf.as_ref()) {
298                Some(crc_record) => {
299                    // Found a valid page.
300                    let (len, _) = crc_record.get_crc();
301                    let len = len as u64;
302                    if len != page_size {
303                        // The page is partial (logical data doesn't fill the page).
304                        let buf: Vec<u8> = buf.into();
305                        let logical_bytes = buf[..(len as usize)].to_vec();
306                        return Ok((
307                            Some((logical_bytes, crc_record)),
308                            last_page_end / physical_page_size,
309                            invalid_data_found,
310                        ));
311                    }
312                    // The page is full.
313                    return Ok((None, last_page_end / physical_page_size, invalid_data_found));
314                }
315                None => {
316                    // The page is invalid.
317                    last_page_end = page_start;
318                    invalid_data_found = true;
319                }
320            }
321        }
322
323        // No valid page exists in the blob.
324        Ok((None, 0, invalid_data_found))
325    }
326
327    /// Append all bytes in `buf` to the tip of the blob.
328    ///
329    /// # Errors
330    ///
331    /// * `Error::ImmutableBlob` - The blob is in the immutable state.
332    pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
333        let mut buffer = self.buffer.write().await;
334        if buffer.immutable {
335            return Err(Error::ImmutableBlob);
336        }
337
338        if !buffer.append(buf) {
339            return Ok(());
340        }
341
342        // Buffer is over capacity, so we need to write data to the blob.
343        self.flush_internal(buffer, false).await
344    }
345
346    /// Flush all full pages from the buffer to disk, resetting the buffer to contain only the bytes
347    /// in any final partial page. If `write_partial_page` is true, the partial page will be written
348    /// to the blob as well along with a CRC record.
349    async fn flush_internal(
350        &self,
351        mut buf_guard: RwLockWriteGuard<'_, Buffer>,
352        write_partial_page: bool,
353    ) -> Result<(), Error> {
354        let buffer = &mut *buf_guard;
355
356        // Cache the pages we are writing in the buffer pool so they remain cached for concurrent
357        // reads while we flush the buffer.
358        let remaining_byte_count = self
359            .pool_ref
360            .cache(self.id, &buffer.data, buffer.offset)
361            .await;
362
363        // Read the old partial page state before doing the heavy work of preparing physical pages.
364        // This is safe because partial_page_state is only modified by flush_internal, and we hold
365        // the buffer write lock which prevents concurrent flushes.
366        let old_partial_page_state = {
367            let blob_state = self.blob_state.read().await;
368            blob_state.partial_page_state.clone()
369        };
370
371        // Prepare the *physical* pages corresponding to the data in the buffer.
372        // Pass the old partial page state so the CRC record is constructed correctly.
373        let (physical_pages, partial_page_state) = self.to_physical_pages(
374            &*buffer,
375            write_partial_page,
376            old_partial_page_state.as_ref(),
377        );
378
379        // If there's nothing to write, return early.
380        if physical_pages.is_empty() {
381            return Ok(());
382        }
383
384        // Drain the provided buffer of the full pages that are now cached in the buffer pool and
385        // will be written to the blob.
386        let bytes_to_drain = buffer.data.len() - remaining_byte_count;
387        buffer.data.drain(0..bytes_to_drain);
388        buffer.offset += bytes_to_drain as u64;
389        let new_offset = buffer.offset;
390
391        // Acquire a write lock on the blob state so nobody tries to read or modify the blob while
392        // we're writing to it.
393        let mut blob_state = self.blob_state.write().await;
394
395        // Release the buffer lock to allow for concurrent reads & buffered writes while we write
396        // the physical pages.
397        drop(buf_guard);
398
399        let logical_page_size = self.pool_ref.page_size() as usize;
400        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
401        let write_at_offset = blob_state.current_page * physical_page_size as u64;
402
403        // Count only FULL pages for advancing current_page. A partial page (if included) takes
404        // up a full physical page on disk, but it's not complete - the next byte still goes to
405        // that same logical page.
406        let total_pages_in_buffer = physical_pages.len() / physical_page_size;
407        let full_pages_written = if partial_page_state.is_some() {
408            total_pages_in_buffer.saturating_sub(1)
409        } else {
410            total_pages_in_buffer
411        };
412
413        // Identify protected regions based on the OLD partial page state
414        let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
415
416        // Update state before writing. This may appear to risk data loss if writes fail,
417        // but write failures are fatal per this codebase's design - callers must not use
418        // the blob after any mutable method returns an error.
419        blob_state.current_page += full_pages_written as u64;
420        blob_state.partial_page_state = partial_page_state;
421
422        // Make sure the buffer offset and underlying blob agree on the state of the tip.
423        assert_eq!(
424            blob_state.current_page * self.pool_ref.page_size(),
425            new_offset
426        );
427
428        // Write the physical pages to the blob.
429        // If there are protected regions in the first page, we need to write around them.
430        if let Some((prefix_len, protected_crc)) = protected_regions {
431            match protected_crc {
432                ProtectedCrc::First => {
433                    // Protected CRC is first: [page_size..page_size+6]
434                    // Write 1: New data in first page [prefix_len..page_size]
435                    if prefix_len < logical_page_size {
436                        blob_state
437                            .blob
438                            .write_at(
439                                physical_pages[prefix_len..logical_page_size].to_vec(),
440                                write_at_offset + prefix_len as u64,
441                            )
442                            .await?;
443                    }
444                    // Write 2: Second CRC of first page + all remaining pages [page_size+6..end]
445                    let second_crc_start = logical_page_size + 6;
446                    blob_state
447                        .blob
448                        .write_at(
449                            physical_pages[second_crc_start..].to_vec(),
450                            write_at_offset + second_crc_start as u64,
451                        )
452                        .await?;
453                }
454                ProtectedCrc::Second => {
455                    // Protected CRC is second: [page_size+6..page_size+12]
456                    // Write 1: New data + first CRC of first page [prefix_len..page_size+6]
457                    let first_crc_end = logical_page_size + 6;
458                    if prefix_len < first_crc_end {
459                        blob_state
460                            .blob
461                            .write_at(
462                                physical_pages[prefix_len..first_crc_end].to_vec(),
463                                write_at_offset + prefix_len as u64,
464                            )
465                            .await?;
466                    }
467                    // Write 2: All remaining pages (if any) [physical_page_size..end]
468                    if physical_pages.len() > physical_page_size {
469                        blob_state
470                            .blob
471                            .write_at(
472                                physical_pages[physical_page_size..].to_vec(),
473                                write_at_offset + physical_page_size as u64,
474                            )
475                            .await?;
476                    }
477                }
478            }
479        } else {
480            // No protected regions, write everything in one operation
481            blob_state
482                .blob
483                .write_at(physical_pages, write_at_offset)
484                .await?;
485        }
486
487        Ok(())
488    }
489
490    /// Returns the logical size of the blob. This accounts for both written and buffered data.
491    pub async fn size(&self) -> u64 {
492        let buffer = self.buffer.read().await;
493        buffer.size()
494    }
495
496    /// Reads up to `buf.len()` bytes starting at `logical_offset`, but only as many as are
497    /// available.
498    ///
499    /// This is useful for reading variable-length prefixes (like varints) where you want to read
500    /// up to a maximum number of bytes but the actual data might be shorter.
501    ///
502    /// Returns the number of bytes actually read into the buffer. Returns an error if no bytes
503    /// are available at the given offset.
504    pub async fn read_up_to(
505        &self,
506        buf: impl Into<StableBuf> + Send,
507        logical_offset: u64,
508    ) -> Result<(StableBuf, usize), Error> {
509        let mut buf = buf.into();
510        if buf.is_empty() {
511            return Ok((buf, 0));
512        }
513        let blob_size = self.size().await;
514        let available = (blob_size.saturating_sub(logical_offset) as usize).min(buf.len());
515        if available == 0 {
516            return Err(Error::BlobInsufficientLength);
517        }
518        if buf.len() > available {
519            buf.truncate(available);
520        }
521        self.read_into(buf.as_mut(), logical_offset).await?;
522
523        Ok((buf, available))
524    }
525
526    /// Reads bytes starting at `logical_offset` into `buf`.
527    ///
528    /// This method allows reading directly into a mutable slice without taking ownership of the
529    /// buffer or requiring a specific buffer type.
530    pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
531        // Ensure the read doesn't overflow.
532        let end_offset = logical_offset
533            .checked_add(buf.len() as u64)
534            .ok_or(Error::OffsetOverflow)?;
535
536        // Acquire a read lock on the buffer.
537        let buffer = self.buffer.read().await;
538
539        // If the data required is beyond the size of the blob, return an error.
540        if end_offset > buffer.size() {
541            return Err(Error::BlobInsufficientLength);
542        }
543
544        // Extract any bytes from the buffer that overlap with the requested range.
545        let remaining = buffer.extract(buf.as_mut(), logical_offset);
546
547        // Release buffer lock before potential I/O.
548        drop(buffer);
549
550        if remaining == 0 {
551            return Ok(());
552        }
553
554        // Fast path: try to read *only* from pool cache without acquiring blob lock. This allows
555        // concurrent reads even while a flush is in progress.
556        let cached = self
557            .pool_ref
558            .read_cached(self.id, &mut buf[..remaining], logical_offset)
559            .await;
560
561        if cached == remaining {
562            // All bytes found in cache.
563            return Ok(());
564        }
565
566        // Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight
567        // write completes before we read from the blob.
568        let blob_guard = self.blob_state.read().await;
569
570        // Read remaining bytes that were not already obtained from the earlier cache read.
571        let uncached_offset = logical_offset + cached as u64;
572        let uncached_len = remaining - cached;
573        self.pool_ref
574            .read(
575                &blob_guard.blob,
576                self.id,
577                &mut buf[cached..cached + uncached_len],
578                uncached_offset,
579            )
580            .await
581    }
582
583    /// Returns the protected region info for a partial page, if any.
584    ///
585    /// # Returns
586    ///
587    /// `None` if there's no existing partial page.
588    ///
589    /// `Some((prefix_len, protected_crc))` where:
590    /// - `prefix_len`: bytes `[0..prefix_len]` were already written and can be substituted with
591    ///   zeros (skip writing)
592    /// - `protected_crc`: which CRC slot must not be overwritten
593    fn identify_protected_regions(
594        partial_page_state: Option<&Checksum>,
595    ) -> Option<(usize, ProtectedCrc)> {
596        let crc_record = partial_page_state?;
597        let (old_len, _) = crc_record.get_crc();
598        // The protected CRC is the one with the larger (authoritative) length.
599        let protected_crc = if crc_record.len1 >= crc_record.len2 {
600            ProtectedCrc::First
601        } else {
602            ProtectedCrc::Second
603        };
604        Some((old_len as usize, protected_crc))
605    }
606
607    /// Prepare a buffer containing the result of converting each buffered logical page in the input
608    /// into a physical page (meaning each page has a CRC record). If the last page is not yet full,
609    /// it will be included only if `include_partial_page` is true.
610    ///
611    /// # Arguments
612    ///
613    /// * `buffer` - The buffer containing logical page data
614    /// * `include_partial_page` - Whether to include a partial page if one exists
615    /// * `old_crc_record` - The CRC record from a previously committed partial page, if any.
616    ///   When present, the first page's CRC record will preserve the old CRC in its original slot
617    ///   and place the new CRC in the other slot.
618    fn to_physical_pages(
619        &self,
620        buffer: &Buffer,
621        include_partial_page: bool,
622        old_crc_record: Option<&Checksum>,
623    ) -> (Vec<u8>, Option<Checksum>) {
624        let logical_page_size = self.pool_ref.page_size() as usize;
625        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
626        let pages_to_write = buffer.data.len() / logical_page_size;
627        let mut write_buffer = Vec::with_capacity(pages_to_write * physical_page_size);
628
629        // For each logical page, copy over the data and then write a crc record for it.
630        for page in 0..pages_to_write {
631            let start_read_idx = page * logical_page_size;
632            let end_read_idx = start_read_idx + logical_page_size;
633            let logical_page = &buffer.data[start_read_idx..end_read_idx];
634            write_buffer.extend_from_slice(logical_page);
635
636            let crc = Crc32::checksum(logical_page);
637            let logical_page_size_u16 =
638                u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
639
640            // For the first page, if there's an old partial page CRC, construct the record
641            // to preserve the old CRC in its original slot.
642            let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
643                Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
644            } else {
645                Checksum::new(logical_page_size_u16, crc)
646            };
647            write_buffer.extend_from_slice(&crc_record.to_bytes());
648        }
649
650        if !include_partial_page {
651            return (write_buffer, None);
652        }
653
654        let partial_page = &buffer.data[pages_to_write * logical_page_size..];
655        if partial_page.is_empty() {
656            // No partial page data to write.
657            return (write_buffer, None);
658        }
659
660        // If there are no full pages and the partial page length matches what was already
661        // written, there's nothing new to write.
662        if pages_to_write == 0 {
663            if let Some(old_crc) = old_crc_record {
664                let (old_len, _) = old_crc.get_crc();
665                if partial_page.len() == old_len as usize {
666                    return (write_buffer, None);
667                }
668            }
669        }
670        write_buffer.extend_from_slice(partial_page);
671        let partial_len = partial_page.len();
672        let crc = Crc32::checksum(partial_page);
673
674        // Pad with zeros to fill up to logical_page_size.
675        write_buffer.resize(write_buffer.len() + (logical_page_size - partial_len), 0);
676
677        // For partial pages: if this is the first page and there's an old CRC, preserve it.
678        // Otherwise just use the new CRC in slot 0.
679        let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
680            Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
681        } else {
682            Checksum::new(partial_len as u16, crc)
683        };
684
685        write_buffer.extend_from_slice(&crc_record.to_bytes());
686
687        // Return the CRC record that matches what we wrote to disk, so that future flushes
688        // correctly identify which slot is protected.
689        (write_buffer, Some(crc_record))
690    }
691
692    /// Build a CRC record that preserves the old CRC in its original slot and places
693    /// the new CRC in the other slot.
694    const fn build_crc_record_preserving_old(
695        new_len: u16,
696        new_crc: u32,
697        old_crc: &Checksum,
698    ) -> Checksum {
699        let (old_len, old_crc_val) = old_crc.get_crc();
700        // The old CRC is in the slot with the larger length value (first slot wins ties).
701        if old_crc.len1 >= old_crc.len2 {
702            // Old CRC is in slot 0, put new CRC in slot 1
703            Checksum {
704                len1: old_len,
705                crc1: old_crc_val,
706                len2: new_len,
707                crc2: new_crc,
708            }
709        } else {
710            // Old CRC is in slot 1, put new CRC in slot 0
711            Checksum {
712                len1: new_len,
713                crc1: new_crc,
714                len2: old_len,
715                crc2: old_crc_val,
716            }
717        }
718    }
719
720    /// Flushes any buffered data, then returns a [Replay] for the underlying blob.
721    ///
722    /// The returned replay can be used to sequentially read all pages from the blob while ensuring
723    /// all data passes integrity verification. CRCs are validated but not included in the output.
724    pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
725        let logical_page_size = self.pool_ref.page_size();
726        let logical_page_size_nz =
727            NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
728
729        // Flush any buffered data (without fsync) so the reader sees all written data.
730        {
731            let buf_guard = self.buffer.write().await;
732            if !buf_guard.immutable {
733                self.flush_internal(buf_guard, true).await?;
734            }
735        }
736
737        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
738
739        // Convert buffer size (bytes) to page count
740        let prefetch_pages = buffer_size.get() / physical_page_size as usize;
741        let prefetch_pages = prefetch_pages.max(1); // At least 1 page
742        let blob_guard = self.blob_state.read().await;
743
744        // Compute both physical and logical blob sizes.
745        let (physical_blob_size, logical_blob_size) =
746            blob_guard.partial_page_state.as_ref().map_or_else(
747                || {
748                    // All pages are full.
749                    let physical = physical_page_size * blob_guard.current_page;
750                    let logical = logical_page_size * blob_guard.current_page;
751                    (physical, logical)
752                },
753                |crc_record| {
754                    // There's a partial page with a checksum.
755                    let (partial_len, _) = crc_record.get_crc();
756                    let partial_len = partial_len as u64;
757                    // Physical: all pages including the partial one (which is padded to full size).
758                    let physical = physical_page_size * (blob_guard.current_page + 1);
759                    // Logical: full pages before this + partial page's actual data length.
760                    let logical = logical_page_size * blob_guard.current_page + partial_len;
761                    (physical, logical)
762                },
763            );
764
765        let reader = PageReader::new(
766            blob_guard.blob.clone(),
767            physical_blob_size,
768            logical_blob_size,
769            prefetch_pages,
770            logical_page_size_nz,
771        );
772        Ok(Replay::new(reader))
773    }
774}
775
776impl<B: Blob> Blob for Append<B> {
777    async fn read_at(
778        &self,
779        buf: impl Into<StableBuf> + Send,
780        logical_offset: u64,
781    ) -> Result<StableBuf, Error> {
782        let mut buf = buf.into();
783        self.read_into(buf.as_mut(), logical_offset).await?;
784        Ok(buf)
785    }
786
787    async fn sync(&self) -> Result<(), Error> {
788        // Flush any buffered data, including any partial page. When flush_internal returns,
789        // write_at has completed and data has been written to the underlying blob.
790        let buf_guard = self.buffer.write().await;
791        if buf_guard.immutable {
792            return Ok(());
793        }
794        self.flush_internal(buf_guard, true).await?;
795
796        // Sync the underlying blob. We need the blob read lock here since sync() requires access
797        // to the blob, but only a read lock since we're not modifying blob state.
798        let blob_state = self.blob_state.read().await;
799        blob_state.blob.sync().await
800    }
801
802    /// This [Blob] trait method is unimplemented by [Append] and unconditionally panics.
803    async fn write_at(&self, _buf: impl Into<StableBuf> + Send, _offset: u64) -> Result<(), Error> {
804        // TODO(<https://github.com/commonwarexyz/monorepo/issues/1207>): Extend the buffer pool to
805        // support arbitrary writes.
806        unimplemented!("append-only blob type does not support write_at")
807    }
808
809    /// Resize the blob to the provided logical `size`.
810    ///
811    /// This truncates the blob to contain only `size` logical bytes. The physical blob size will
812    /// be adjusted to include the necessary CRC records for the remaining pages.
813    ///
814    /// # Warning
815    ///
816    /// - Concurrent mutable operations (append, resize) are not supported and will cause data loss.
817    /// - Concurrent readers which try to read past the new size during the resize may error.
818    /// - The resize is not guaranteed durable until the next sync.
819    async fn resize(&self, size: u64) -> Result<(), Error> {
820        let current_size = self.size().await;
821
822        // Handle growing by appending zero bytes.
823        if size > current_size {
824            let zeros_needed = (size - current_size) as usize;
825            let zeros = vec![0u8; zeros_needed];
826            self.append(&zeros).await?;
827            return Ok(());
828        }
829
830        // Implementation note: rewinding the blob across a page boundary potentially results in
831        // stale data remaining in the buffer pool's cache. We don't proactively purge the data
832        // within this function since it would be inaccessible anyway. Instead we ensure it is
833        // always updated should the blob grow back to the point where we have new data for the same
834        // page, if any old data hasn't expired naturally by then.
835
836        let logical_page_size = self.pool_ref.page_size();
837        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
838
839        // Flush any buffered data first to ensure we have a consistent state on disk.
840        self.sync().await?;
841
842        // Acquire both locks to prevent concurrent operations.
843        let mut buf_guard = self.buffer.write().await;
844        if buf_guard.immutable {
845            return Err(Error::ImmutableBlob);
846        }
847        let mut blob_guard = self.blob_state.write().await;
848
849        // Calculate the physical size needed for the new logical size.
850        let full_pages = size / logical_page_size;
851        let partial_bytes = size % logical_page_size;
852        let new_physical_size = if partial_bytes > 0 {
853            // We need full_pages + 1 physical pages to hold the partial data.
854            // The partial page will be padded to full physical page size.
855            (full_pages + 1) * physical_page_size
856        } else {
857            // No partial page needed.
858            full_pages * physical_page_size
859        };
860
861        // Resize the underlying blob.
862        blob_guard.blob.resize(new_physical_size).await?;
863        blob_guard.partial_page_state = None;
864
865        // Update blob state and buffer based on the desired logical size. The partial page data is
866        // read with CRC validation; the validated length may exceed partial_bytes (reflecting the
867        // old data length), but we only load the prefix we need. The next sync will write the
868        // correct CRC for the new length.
869        //
870        // Note: This updates state before validation completes, which could leave state
871        // inconsistent if validation fails. This is acceptable because failures from mutable
872        // methods are fatal - callers must not use the blob after any error.
873
874        blob_guard.current_page = full_pages;
875        buf_guard.offset = full_pages * logical_page_size;
876
877        if partial_bytes > 0 {
878            // There's a partial page. Read its data from disk with CRC validation.
879            let page_data =
880                super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
881
882            // Ensure the validated data covers what we need.
883            if (page_data.len() as u64) < partial_bytes {
884                return Err(Error::InvalidChecksum);
885            }
886
887            buf_guard.data = page_data.as_ref()[..partial_bytes as usize].to_vec();
888        } else {
889            // No partial page - all pages are full or blob is empty.
890            buf_guard.data = vec![];
891        }
892
893        Ok(())
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900    use crate::{deterministic, Runner as _, Storage as _};
901    use commonware_codec::ReadExt;
902    use commonware_macros::test_traced;
903    use commonware_utils::{NZUsize, NZU16};
904    use std::num::NonZeroU16;
905
906    const PAGE_SIZE: NonZeroU16 = NZU16!(103); // janky size to ensure we test page alignment
907    const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
908
909    #[test_traced("DEBUG")]
910    fn test_append_crc_empty() {
911        let executor = deterministic::Runner::default();
912        executor.start(|context: deterministic::Context| async move {
913            // Open a new blob.
914            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
915            assert_eq!(blob_size, 0);
916
917            // Create a buffer pool reference.
918            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
919
920            // Create an Append wrapper.
921            let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
922                .await
923                .unwrap();
924
925            // Verify initial size is 0.
926            assert_eq!(append.size().await, 0);
927
928            // Close & re-open.
929            append.sync().await.unwrap();
930            drop(append);
931
932            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
933            assert_eq!(blob_size, 0); // There was no need to write a crc since there was no data.
934
935            let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
936                .await
937                .unwrap();
938
939            assert_eq!(append.size().await, 0);
940        });
941    }
942
943    #[test_traced("DEBUG")]
944    fn test_append_crc_basic() {
945        let executor = deterministic::Runner::default();
946        executor.start(|context: deterministic::Context| async move {
947            // Open a new blob.
948            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
949            assert_eq!(blob_size, 0);
950
951            // Create a buffer pool reference.
952            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
953
954            // Create an Append wrapper.
955            let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
956                .await
957                .unwrap();
958
959            // Verify initial size is 0.
960            assert_eq!(append.size().await, 0);
961
962            // Append some bytes.
963            let data = vec![1, 2, 3, 4, 5];
964            append.append(&data).await.unwrap();
965
966            // Verify size reflects appended data.
967            assert_eq!(append.size().await, 5);
968
969            // Append more bytes.
970            let more_data = vec![6, 7, 8, 9, 10];
971            append.append(&more_data).await.unwrap();
972
973            // Verify size is cumulative.
974            assert_eq!(append.size().await, 10);
975
976            // Read back the first chunk and verify.
977            let read_buf = vec![0u8; 5];
978            let read_buf = append.read_at(read_buf, 0).await.unwrap();
979            assert_eq!(read_buf.as_ref(), &data[..]);
980
981            // Read back the second chunk and verify.
982            let read_buf = vec![0u8; 5];
983            let read_buf = append.read_at(read_buf, 5).await.unwrap();
984            assert_eq!(read_buf.as_ref(), &more_data[..]);
985
986            // Read all data at once and verify.
987            let read_buf = vec![0u8; 10];
988            let read_buf = append.read_at(read_buf, 0).await.unwrap();
989            assert_eq!(read_buf.as_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
990
991            // Close and reopen the blob and make sure the data is still there and the trailing
992            // checksum is written & stripped as expected.
993            append.sync().await.unwrap();
994            drop(append);
995
996            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
997            // Physical page = 103 logical + 12 Checksum = 115 bytes (padded partial page)
998            assert_eq!(blob_size, 115);
999            let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
1000                .await
1001                .unwrap();
1002            assert_eq!(append.size().await, 10); // CRC should be stripped after verification
1003
1004            // Append data that spans a page boundary.
1005            // PAGE_SIZE=103 is the logical page size. We have 10 bytes, so writing
1006            // 100 more bytes (total 110) will cross the page boundary at byte 103.
1007            let spanning_data: Vec<u8> = (11..=110).collect();
1008            append.append(&spanning_data).await.unwrap();
1009            assert_eq!(append.size().await, 110);
1010
1011            // Read back data that spans the page boundary.
1012            let read_buf = vec![0u8; 100];
1013            let read_buf = append.read_at(read_buf, 10).await.unwrap();
1014            assert_eq!(read_buf.as_ref(), &spanning_data[..]);
1015
1016            // Read all 110 bytes at once.
1017            let read_buf = vec![0u8; 110];
1018            let read_buf = append.read_at(read_buf, 0).await.unwrap();
1019            let expected: Vec<u8> = (1..=110).collect();
1020            assert_eq!(read_buf.as_ref(), &expected[..]);
1021
1022            // Drop and re-open and make sure bytes are still there.
1023            append.sync().await.unwrap();
1024            drop(append);
1025
1026            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1027            // 2 physical pages: 2 * 115 = 230 bytes
1028            assert_eq!(blob_size, 230);
1029            let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref.clone())
1030                .await
1031                .unwrap();
1032            assert_eq!(append.size().await, 110);
1033
1034            // Append data to reach exactly a page boundary.
1035            // Logical page size is 103. We have 110 bytes, next boundary is 206 (103 * 2).
1036            // So we need 96 more bytes.
1037            let boundary_data: Vec<u8> = (111..=206).collect();
1038            assert_eq!(boundary_data.len(), 96);
1039            append.append(&boundary_data).await.unwrap();
1040            assert_eq!(append.size().await, 206);
1041
1042            // Verify we can read it back.
1043            let read_buf = vec![0u8; 206];
1044            let read_buf = append.read_at(read_buf, 0).await.unwrap();
1045            let expected: Vec<u8> = (1..=206).collect();
1046            assert_eq!(read_buf.as_ref(), &expected[..]);
1047
1048            // Drop and re-open at the page boundary.
1049            append.sync().await.unwrap();
1050            drop(append);
1051
1052            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
1053            // Physical size should be exactly 2 pages: 115 * 2 = 230 bytes
1054            assert_eq!(blob_size, 230);
1055            let append = Append::new(blob, blob_size, BUFFER_SIZE, pool_ref)
1056                .await
1057                .unwrap();
1058            assert_eq!(append.size().await, 206);
1059
1060            // Verify data is still readable after reopen.
1061            let read_buf = vec![0u8; 206];
1062            let read_buf = append.read_at(read_buf, 0).await.unwrap();
1063            assert_eq!(read_buf.as_ref(), &expected[..]);
1064        });
1065    }
1066
1067    /// Helper to read the CRC record from raw blob bytes at the end of a physical page.
1068    fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1069        let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1070        Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1071    }
1072
1073    /// Dummy marker bytes with len=0 so the mangled slot is never authoritative.
1074    /// Format: [len_hi=0, len_lo=0, 0xDE, 0xAD, 0xBE, 0xEF]
1075    const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1076
1077    #[test]
1078    fn test_identify_protected_regions_equal_lengths() {
1079        // When lengths are equal, the first CRC should be protected (tie-breaking rule).
1080        let record = Checksum {
1081            len1: 50,
1082            crc1: 0xAAAAAAAA,
1083            len2: 50,
1084            crc2: 0xBBBBBBBB,
1085        };
1086
1087        let result =
1088            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1089        assert!(result.is_some());
1090        let (prefix_len, protected_crc) = result.unwrap();
1091        assert_eq!(prefix_len, 50);
1092        assert!(
1093            matches!(protected_crc, ProtectedCrc::First),
1094            "First CRC should be protected when lengths are equal"
1095        );
1096    }
1097
1098    #[test]
1099    fn test_identify_protected_regions_len1_larger() {
1100        // When len1 > len2, the first CRC should be protected.
1101        let record = Checksum {
1102            len1: 100,
1103            crc1: 0xAAAAAAAA,
1104            len2: 50,
1105            crc2: 0xBBBBBBBB,
1106        };
1107
1108        let result =
1109            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1110        assert!(result.is_some());
1111        let (prefix_len, protected_crc) = result.unwrap();
1112        assert_eq!(prefix_len, 100);
1113        assert!(
1114            matches!(protected_crc, ProtectedCrc::First),
1115            "First CRC should be protected when len1 > len2"
1116        );
1117    }
1118
1119    #[test]
1120    fn test_identify_protected_regions_len2_larger() {
1121        // When len2 > len1, the second CRC should be protected.
1122        let record = Checksum {
1123            len1: 50,
1124            crc1: 0xAAAAAAAA,
1125            len2: 100,
1126            crc2: 0xBBBBBBBB,
1127        };
1128
1129        let result =
1130            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1131        assert!(result.is_some());
1132        let (prefix_len, protected_crc) = result.unwrap();
1133        assert_eq!(prefix_len, 100);
1134        assert!(
1135            matches!(protected_crc, ProtectedCrc::Second),
1136            "Second CRC should be protected when len2 > len1"
1137        );
1138    }
1139
1140    /// Test that slot 1 is NOT overwritten when it's the protected slot.
1141    ///
1142    /// Strategy: After extending twice (so slot 1 becomes authoritative with larger len),
1143    /// mangle the non-authoritative slot 0. Then extend again - slot 0 should be overwritten
1144    /// with the new CRC, while slot 1 (protected) should remain untouched.
1145    #[test_traced("DEBUG")]
1146    fn test_crc_slot1_protected() {
1147        let executor = deterministic::Runner::default();
1148        executor.start(|context: deterministic::Context| async move {
1149            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1150            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1151            let slot0_offset = PAGE_SIZE.get() as u64;
1152            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1153
1154            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1155            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1156            let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1157                .await
1158                .unwrap();
1159            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1160            append.sync().await.unwrap();
1161            drop(append);
1162
1163            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1164            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1165            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1166                .await
1167                .unwrap();
1168            append
1169                .append(&(11..=30).collect::<Vec<u8>>())
1170                .await
1171                .unwrap();
1172            append.sync().await.unwrap();
1173            drop(append);
1174
1175            // Verify slot 1 is now authoritative
1176            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1177            let page = blob
1178                .read_at(vec![0u8; physical_page_size], 0)
1179                .await
1180                .unwrap();
1181            let crc = read_crc_record_from_page(page.as_ref());
1182            assert!(
1183                crc.len2 > crc.len1,
1184                "Slot 1 should be authoritative (len2={} > len1={})",
1185                crc.len2,
1186                crc.len1
1187            );
1188
1189            // Capture slot 1 bytes before mangling slot 0
1190            let slot1_before: Vec<u8> = blob
1191                .read_at(vec![0u8; 6], slot1_offset)
1192                .await
1193                .unwrap()
1194                .into();
1195
1196            // === Step 3: Mangle slot 0 (non-authoritative) ===
1197            blob.write_at(DUMMY_MARKER.to_vec(), slot0_offset)
1198                .await
1199                .unwrap();
1200            blob.sync().await.unwrap();
1201
1202            // Verify mangle worked
1203            let slot0_mangled: Vec<u8> = blob
1204                .read_at(vec![0u8; 6], slot0_offset)
1205                .await
1206                .unwrap()
1207                .into();
1208            assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1209
1210            // === Step 4: Extend to 50 bytes → new CRC goes to slot 0, slot 1 protected ===
1211            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1212                .await
1213                .unwrap();
1214            append
1215                .append(&(31..=50).collect::<Vec<u8>>())
1216                .await
1217                .unwrap();
1218            append.sync().await.unwrap();
1219            drop(append);
1220
1221            // === Step 5: Verify slot 0 was overwritten, slot 1 unchanged ===
1222            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1223
1224            // Slot 0 should have new CRC (not our dummy marker)
1225            let slot0_after: Vec<u8> = blob
1226                .read_at(vec![0u8; 6], slot0_offset)
1227                .await
1228                .unwrap()
1229                .into();
1230            assert_ne!(
1231                slot0_after, DUMMY_MARKER,
1232                "Slot 0 should have been overwritten with new CRC"
1233            );
1234
1235            // Slot 1 should be UNCHANGED (protected)
1236            let slot1_after: Vec<u8> = blob
1237                .read_at(vec![0u8; 6], slot1_offset)
1238                .await
1239                .unwrap()
1240                .into();
1241            assert_eq!(
1242                slot1_before, slot1_after,
1243                "Slot 1 was modified! Protected region violated."
1244            );
1245
1246            // Verify the new CRC in slot 0 has len=50
1247            let page = blob
1248                .read_at(vec![0u8; physical_page_size], 0)
1249                .await
1250                .unwrap();
1251            let crc = read_crc_record_from_page(page.as_ref());
1252            assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1253        });
1254    }
1255
1256    /// Test that slot 0 is NOT overwritten when it's the protected slot.
1257    ///
1258    /// Strategy: After extending three times (slot 0 becomes authoritative again with largest len),
1259    /// mangle the non-authoritative slot 1. Then extend again - slot 1 should be overwritten
1260    /// with the new CRC, while slot 0 (protected) should remain untouched.
1261    #[test_traced("DEBUG")]
1262    fn test_crc_slot0_protected() {
1263        let executor = deterministic::Runner::default();
1264        executor.start(|context: deterministic::Context| async move {
1265            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1266            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1267            let slot0_offset = PAGE_SIZE.get() as u64;
1268            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1269
1270            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1271            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1272            let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1273                .await
1274                .unwrap();
1275            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1276            append.sync().await.unwrap();
1277            drop(append);
1278
1279            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1280            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1281            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1282                .await
1283                .unwrap();
1284            append
1285                .append(&(11..=30).collect::<Vec<u8>>())
1286                .await
1287                .unwrap();
1288            append.sync().await.unwrap();
1289            drop(append);
1290
1291            // === Step 3: Extend to 50 bytes → slot 0 authoritative (len=50) ===
1292            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1293            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1294                .await
1295                .unwrap();
1296            append
1297                .append(&(31..=50).collect::<Vec<u8>>())
1298                .await
1299                .unwrap();
1300            append.sync().await.unwrap();
1301            drop(append);
1302
1303            // Verify slot 0 is now authoritative
1304            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1305            let page = blob
1306                .read_at(vec![0u8; physical_page_size], 0)
1307                .await
1308                .unwrap();
1309            let crc = read_crc_record_from_page(page.as_ref());
1310            assert!(
1311                crc.len1 > crc.len2,
1312                "Slot 0 should be authoritative (len1={} > len2={})",
1313                crc.len1,
1314                crc.len2
1315            );
1316
1317            // Capture slot 0 bytes before mangling slot 1
1318            let slot0_before: Vec<u8> = blob
1319                .read_at(vec![0u8; 6], slot0_offset)
1320                .await
1321                .unwrap()
1322                .into();
1323
1324            // === Step 4: Mangle slot 1 (non-authoritative) ===
1325            blob.write_at(DUMMY_MARKER.to_vec(), slot1_offset)
1326                .await
1327                .unwrap();
1328            blob.sync().await.unwrap();
1329
1330            // Verify mangle worked
1331            let slot1_mangled: Vec<u8> = blob
1332                .read_at(vec![0u8; 6], slot1_offset)
1333                .await
1334                .unwrap()
1335                .into();
1336            assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1337
1338            // === Step 5: Extend to 70 bytes → new CRC goes to slot 1, slot 0 protected ===
1339            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1340                .await
1341                .unwrap();
1342            append
1343                .append(&(51..=70).collect::<Vec<u8>>())
1344                .await
1345                .unwrap();
1346            append.sync().await.unwrap();
1347            drop(append);
1348
1349            // === Step 6: Verify slot 1 was overwritten, slot 0 unchanged ===
1350            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1351
1352            // Slot 1 should have new CRC (not our dummy marker)
1353            let slot1_after: Vec<u8> = blob
1354                .read_at(vec![0u8; 6], slot1_offset)
1355                .await
1356                .unwrap()
1357                .into();
1358            assert_ne!(
1359                slot1_after, DUMMY_MARKER,
1360                "Slot 1 should have been overwritten with new CRC"
1361            );
1362
1363            // Slot 0 should be UNCHANGED (protected)
1364            let slot0_after: Vec<u8> = blob
1365                .read_at(vec![0u8; 6], slot0_offset)
1366                .await
1367                .unwrap()
1368                .into();
1369            assert_eq!(
1370                slot0_before, slot0_after,
1371                "Slot 0 was modified! Protected region violated."
1372            );
1373
1374            // Verify the new CRC in slot 1 has len=70
1375            let page = blob
1376                .read_at(vec![0u8; physical_page_size], 0)
1377                .await
1378                .unwrap();
1379            let crc = read_crc_record_from_page(page.as_ref());
1380            assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1381        });
1382    }
1383
1384    /// Test that the data prefix is NOT overwritten when extending a partial page.
1385    ///
1386    /// Strategy: Write data, then mangle the padding area (between data end and CRC start).
1387    /// After extending, the original data should be unchanged but the mangled padding
1388    /// should be overwritten with new data.
1389    #[test_traced("DEBUG")]
1390    fn test_data_prefix_not_overwritten() {
1391        let executor = deterministic::Runner::default();
1392        executor.start(|context: deterministic::Context| async move {
1393            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1394            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1395
1396            // === Step 1: Write 20 bytes ===
1397            let (blob, _) = context
1398                .open("test_partition", b"prefix_test")
1399                .await
1400                .unwrap();
1401            let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1402                .await
1403                .unwrap();
1404            let data1: Vec<u8> = (1..=20).collect();
1405            append.append(&data1).await.unwrap();
1406            append.sync().await.unwrap();
1407            drop(append);
1408
1409            // === Step 2: Capture the first 20 bytes and mangle bytes 25-30 (in padding area) ===
1410            let (blob, size) = context
1411                .open("test_partition", b"prefix_test")
1412                .await
1413                .unwrap();
1414            assert_eq!(size, physical_page_size as u64);
1415
1416            let prefix_before: Vec<u8> = blob.read_at(vec![0u8; 20], 0).await.unwrap().into();
1417
1418            // Mangle bytes 25-30 (safely in the padding area, after our 20 bytes of data)
1419            blob.write_at(DUMMY_MARKER.to_vec(), 25).await.unwrap();
1420            blob.sync().await.unwrap();
1421
1422            // === Step 3: Extend to 40 bytes ===
1423            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1424                .await
1425                .unwrap();
1426            append
1427                .append(&(21..=40).collect::<Vec<u8>>())
1428                .await
1429                .unwrap();
1430            append.sync().await.unwrap();
1431            drop(append);
1432
1433            // === Step 4: Verify prefix unchanged, mangled area overwritten ===
1434            let (blob, _) = context
1435                .open("test_partition", b"prefix_test")
1436                .await
1437                .unwrap();
1438
1439            // Original 20 bytes should be unchanged
1440            let prefix_after: Vec<u8> = blob.read_at(vec![0u8; 20], 0).await.unwrap().into();
1441            assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1442
1443            // Bytes at offset 25-30: data (21..=40) starts at offset 20, so offset 25 has value 26
1444            let overwritten: Vec<u8> = blob.read_at(vec![0u8; 6], 25).await.unwrap().into();
1445            assert_eq!(
1446                overwritten,
1447                vec![26, 27, 28, 29, 30, 31],
1448                "New data should overwrite padding area"
1449            );
1450        });
1451    }
1452
1453    /// Test CRC slot protection when extending past a page boundary.
1454    ///
1455    /// Strategy: Write partial page, mangle slot 0 (non-authoritative after we do first extend),
1456    /// then extend past page boundary. Verify slot 0 gets new full-page CRC while
1457    /// the mangled marker is overwritten, and second page is written correctly.
1458    #[test_traced("DEBUG")]
1459    fn test_crc_slot_protection_across_page_boundary() {
1460        let executor = deterministic::Runner::default();
1461        executor.start(|context: deterministic::Context| async move {
1462            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1463            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1464            let slot0_offset = PAGE_SIZE.get() as u64;
1465            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1466
1467            // === Step 1: Write 50 bytes → slot 0 authoritative ===
1468            let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1469            let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1470                .await
1471                .unwrap();
1472            append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1473            append.sync().await.unwrap();
1474            drop(append);
1475
1476            // === Step 2: Extend to 80 bytes → slot 1 authoritative ===
1477            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1478            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1479                .await
1480                .unwrap();
1481            append
1482                .append(&(51..=80).collect::<Vec<u8>>())
1483                .await
1484                .unwrap();
1485            append.sync().await.unwrap();
1486            drop(append);
1487
1488            // Verify slot 1 is authoritative
1489            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1490            let page = blob
1491                .read_at(vec![0u8; physical_page_size], 0)
1492                .await
1493                .unwrap();
1494            let crc = read_crc_record_from_page(page.as_ref());
1495            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1496
1497            // Capture slot 1 before extending past page boundary
1498            let slot1_before: Vec<u8> = blob
1499                .read_at(vec![0u8; 6], slot1_offset)
1500                .await
1501                .unwrap()
1502                .into();
1503
1504            // Mangle slot 0 (non-authoritative)
1505            blob.write_at(DUMMY_MARKER.to_vec(), slot0_offset)
1506                .await
1507                .unwrap();
1508            blob.sync().await.unwrap();
1509
1510            // === Step 3: Extend past page boundary (80 + 40 = 120, PAGE_SIZE=103) ===
1511            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1512                .await
1513                .unwrap();
1514            append
1515                .append(&(81..=120).collect::<Vec<u8>>())
1516                .await
1517                .unwrap();
1518            append.sync().await.unwrap();
1519            drop(append);
1520
1521            // === Step 4: Verify results ===
1522            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1523            assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1524
1525            // Slot 0 should have been overwritten with full-page CRC (not dummy marker)
1526            let slot0_after: Vec<u8> = blob
1527                .read_at(vec![0u8; 6], slot0_offset)
1528                .await
1529                .unwrap()
1530                .into();
1531            assert_ne!(
1532                slot0_after, DUMMY_MARKER,
1533                "Slot 0 should have full-page CRC"
1534            );
1535
1536            // Slot 1 should be UNCHANGED (protected during boundary crossing)
1537            let slot1_after: Vec<u8> = blob
1538                .read_at(vec![0u8; 6], slot1_offset)
1539                .await
1540                .unwrap()
1541                .into();
1542            assert_eq!(
1543                slot1_before, slot1_after,
1544                "Slot 1 was modified during page boundary crossing!"
1545            );
1546
1547            // Verify page 0 has correct CRC structure
1548            let page0 = blob
1549                .read_at(vec![0u8; physical_page_size], 0)
1550                .await
1551                .unwrap();
1552            let crc0 = read_crc_record_from_page(page0.as_ref());
1553            assert_eq!(
1554                crc0.len1,
1555                PAGE_SIZE.get(),
1556                "Slot 0 should have full page length"
1557            );
1558
1559            // Verify data integrity
1560            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1561                .await
1562                .unwrap();
1563            assert_eq!(append.size().await, 120);
1564            let all_data: Vec<u8> = append.read_at(vec![0u8; 120], 0).await.unwrap().into();
1565            let expected: Vec<u8> = (1..=120).collect();
1566            assert_eq!(all_data, expected);
1567        });
1568    }
1569
1570    /// Test that corrupting the primary CRC (but not its length) causes fallback to the previous
1571    /// partial page contents.
1572    ///
1573    /// Strategy:
1574    /// 1. Write 10 bytes → slot 0 authoritative (len=10, valid crc)
1575    /// 2. Extend to 30 bytes → slot 1 authoritative (len=30, valid crc)
1576    /// 3. Corrupt ONLY the crc2 value in slot 1 (not the length)
1577    /// 4. Re-open and verify we fall back to slot 0's 10 bytes
1578    #[test_traced("DEBUG")]
1579    fn test_crc_fallback_on_corrupted_primary() {
1580        let executor = deterministic::Runner::default();
1581        executor.start(|context: deterministic::Context| async move {
1582            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1583            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1584            // crc2 is at offset: PAGE_SIZE + 6 (for len2) + 2 (skip len2 bytes) = PAGE_SIZE + 8
1585            let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1586
1587            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1588            let (blob, _) = context
1589                .open("test_partition", b"crc_fallback")
1590                .await
1591                .unwrap();
1592            let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1593                .await
1594                .unwrap();
1595            let data1: Vec<u8> = (1..=10).collect();
1596            append.append(&data1).await.unwrap();
1597            append.sync().await.unwrap();
1598            drop(append);
1599
1600            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1601            let (blob, size) = context
1602                .open("test_partition", b"crc_fallback")
1603                .await
1604                .unwrap();
1605            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1606                .await
1607                .unwrap();
1608            append
1609                .append(&(11..=30).collect::<Vec<u8>>())
1610                .await
1611                .unwrap();
1612            append.sync().await.unwrap();
1613            drop(append);
1614
1615            // Verify slot 1 is now authoritative and data reads correctly
1616            let (blob, size) = context
1617                .open("test_partition", b"crc_fallback")
1618                .await
1619                .unwrap();
1620            assert_eq!(size, physical_page_size as u64);
1621
1622            let page = blob
1623                .read_at(vec![0u8; physical_page_size], 0)
1624                .await
1625                .unwrap();
1626            let crc = read_crc_record_from_page(page.as_ref());
1627            assert!(
1628                crc.len2 > crc.len1,
1629                "Slot 1 should be authoritative (len2={} > len1={})",
1630                crc.len2,
1631                crc.len1
1632            );
1633            assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1634            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1635
1636            // Verify we can read all 30 bytes before corruption
1637            let append = Append::new(blob.clone(), size, BUFFER_SIZE, pool_ref.clone())
1638                .await
1639                .unwrap();
1640            assert_eq!(append.size().await, 30);
1641            let all_data: Vec<u8> = append.read_at(vec![0u8; 30], 0).await.unwrap().into();
1642            let expected: Vec<u8> = (1..=30).collect();
1643            assert_eq!(all_data, expected);
1644            drop(append);
1645
1646            // === Step 3: Corrupt ONLY crc2 (not len2) ===
1647            // crc2 is 4 bytes at offset PAGE_SIZE + 8
1648            blob.write_at(vec![0xDE, 0xAD, 0xBE, 0xEF], crc2_offset)
1649                .await
1650                .unwrap();
1651            blob.sync().await.unwrap();
1652
1653            // Verify corruption: len2 should still be 30, but crc2 is now garbage
1654            let page = blob
1655                .read_at(vec![0u8; physical_page_size], 0)
1656                .await
1657                .unwrap();
1658            let crc = read_crc_record_from_page(page.as_ref());
1659            assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1660            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1661
1662            // === Step 4: Re-open and verify fallback to slot 0's 10 bytes ===
1663            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1664                .await
1665                .unwrap();
1666
1667            // Should fall back to 10 bytes (slot 0's length)
1668            assert_eq!(
1669                append.size().await,
1670                10,
1671                "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1672            );
1673
1674            // Verify the data is the original 10 bytes
1675            let fallback_data: Vec<u8> = append.read_at(vec![0u8; 10], 0).await.unwrap().into();
1676            assert_eq!(
1677                fallback_data, data1,
1678                "Fallback data should match original 10 bytes"
1679            );
1680
1681            // Reading beyond 10 bytes should fail
1682            let result = append.read_at(vec![0u8; 11], 0).await;
1683            assert!(result.is_err(), "Reading beyond fallback size should fail");
1684        });
1685    }
1686
1687    /// Test that corrupting a non-last page's primary CRC fails even if fallback is valid.
1688    ///
1689    /// Non-last pages must always be full. If the primary CRC is corrupted and the fallback
1690    /// indicates a partial page, validation should fail entirely (not fall back to partial).
1691    ///
1692    /// Strategy:
1693    /// 1. Write 10 bytes → slot 0 has len=10 (partial)
1694    /// 2. Extend to full page (103 bytes) → slot 1 has len=103 (full, authoritative)
1695    /// 3. Extend past page boundary (e.g., 110 bytes) → page 0 is now non-last
1696    /// 4. Corrupt the primary CRC of page 0 (slot 1's crc, which has len=103)
1697    /// 5. Re-open and verify that reading from page 0 fails (fallback has len=10, not full)
1698    #[test_traced("DEBUG")]
1699    fn test_non_last_page_rejects_partial_fallback() {
1700        let executor = deterministic::Runner::default();
1701        executor.start(|context: deterministic::Context| async move {
1702            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1703            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1704            // crc2 for page 0 is at offset: PAGE_SIZE + 8
1705            let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1706
1707            // === Step 1: Write 10 bytes → slot 0 has len=10 ===
1708            let (blob, _) = context
1709                .open("test_partition", b"non_last_page")
1710                .await
1711                .unwrap();
1712            let append = Append::new(blob, 0, BUFFER_SIZE, pool_ref.clone())
1713                .await
1714                .unwrap();
1715            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1716            append.sync().await.unwrap();
1717            drop(append);
1718
1719            // === Step 2: Extend to exactly full page (103 bytes) → slot 1 has len=103 ===
1720            let (blob, size) = context
1721                .open("test_partition", b"non_last_page")
1722                .await
1723                .unwrap();
1724            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1725                .await
1726                .unwrap();
1727            // Add bytes 11 through 103 (93 more bytes)
1728            append
1729                .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1730                .await
1731                .unwrap();
1732            append.sync().await.unwrap();
1733            drop(append);
1734
1735            // Verify page 0 slot 1 is authoritative with len=103 (full page)
1736            let (blob, size) = context
1737                .open("test_partition", b"non_last_page")
1738                .await
1739                .unwrap();
1740            let page = blob
1741                .read_at(vec![0u8; physical_page_size], 0)
1742                .await
1743                .unwrap();
1744            let crc = read_crc_record_from_page(page.as_ref());
1745            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1746            assert_eq!(
1747                crc.len2,
1748                PAGE_SIZE.get(),
1749                "Slot 1 should have len=103 (full page)"
1750            );
1751            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1752
1753            // === Step 3: Extend past page boundary (add 10 more bytes for total of 113) ===
1754            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1755                .await
1756                .unwrap();
1757            // Add bytes 104 through 113 (10 more bytes, now on page 1)
1758            append
1759                .append(&(104..=113).collect::<Vec<u8>>())
1760                .await
1761                .unwrap();
1762            append.sync().await.unwrap();
1763            drop(append);
1764
1765            // Verify we now have 2 pages
1766            let (blob, size) = context
1767                .open("test_partition", b"non_last_page")
1768                .await
1769                .unwrap();
1770            assert_eq!(
1771                size,
1772                (physical_page_size * 2) as u64,
1773                "Should have 2 physical pages"
1774            );
1775
1776            // Verify data is readable before corruption
1777            let append = Append::new(blob.clone(), size, BUFFER_SIZE, pool_ref.clone())
1778                .await
1779                .unwrap();
1780            assert_eq!(append.size().await, 113);
1781            let all_data: Vec<u8> = append.read_at(vec![0u8; 113], 0).await.unwrap().into();
1782            let expected: Vec<u8> = (1..=113).collect();
1783            assert_eq!(all_data, expected);
1784            drop(append);
1785
1786            // === Step 4: Corrupt page 0's primary CRC (slot 1's crc2) ===
1787            blob.write_at(vec![0xDE, 0xAD, 0xBE, 0xEF], page0_crc2_offset)
1788                .await
1789                .unwrap();
1790            blob.sync().await.unwrap();
1791
1792            // Verify corruption: page 0's slot 1 still has len=103 but bad CRC
1793            let page = blob
1794                .read_at(vec![0u8; physical_page_size], 0)
1795                .await
1796                .unwrap();
1797            let crc = read_crc_record_from_page(page.as_ref());
1798            assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1799            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1800            // Slot 0 fallback has len=10 (partial), which is invalid for non-last page
1801            assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1802
1803            // === Step 5: Re-open and try to read from page 0 ===
1804            // The first page's primary CRC is bad, and fallback indicates partial (len=10).
1805            // Since page 0 is not the last page, a partial fallback is invalid.
1806            // Reading from page 0 should fail because the fallback CRC indicates a partial
1807            // page, which is not allowed for non-last pages.
1808            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1809                .await
1810                .unwrap();
1811
1812            // The blob still reports 113 bytes because init only validates the last page.
1813            // But reading from page 0 should fail because the CRC fallback is partial.
1814            assert_eq!(append.size().await, 113);
1815
1816            // Try to read from page 0 - this should fail with InvalidChecksum because
1817            // the fallback CRC has len=10 (partial), which is invalid for a non-last page.
1818            let result = append.read_at(vec![0u8; 10], 0).await;
1819            assert!(
1820                result.is_err(),
1821                "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1822                result
1823            );
1824            drop(append);
1825
1826            // Also verify that reading via Replay fails the same way.
1827            let (blob, size) = context
1828                .open("test_partition", b"non_last_page")
1829                .await
1830                .unwrap();
1831            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1832                .await
1833                .unwrap();
1834            let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1835
1836            // Try to fill pages - should fail on CRC validation.
1837            let result = replay.ensure(1).await;
1838            assert!(
1839                result.is_err(),
1840                "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1841                result
1842            );
1843        });
1844    }
1845
1846    #[test]
1847    fn test_resize_shrink_validates_crc() {
1848        // Verify that shrinking a blob to a partial page validates the CRC, rather than
1849        // blindly reading raw bytes which could silently load corrupted data.
1850        let executor = deterministic::Runner::default();
1851
1852        executor.start(|context| async move {
1853            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1854            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1855
1856            let (blob, size) = context
1857                .open("test_partition", b"resize_crc_test")
1858                .await
1859                .unwrap();
1860
1861            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1862                .await
1863                .unwrap();
1864
1865            // Write data across 3 pages: page 0 (full), page 1 (full), page 2 (partial).
1866            // PAGE_SIZE = 103, so 250 bytes = 103 + 103 + 44.
1867            let data: Vec<u8> = (0..=249).collect();
1868            append.append(&data).await.unwrap();
1869            append.sync().await.unwrap();
1870            assert_eq!(append.size().await, 250);
1871            drop(append);
1872
1873            // Corrupt the CRC record of page 1 (middle page).
1874            let (blob, size) = context
1875                .open("test_partition", b"resize_crc_test")
1876                .await
1877                .unwrap();
1878            assert_eq!(size as usize, physical_page_size * 3);
1879
1880            // Page 1 CRC record is at the end of the second physical page.
1881            let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
1882            blob.write_at(vec![0xFF; CHECKSUM_SIZE as usize], page1_crc_offset)
1883                .await
1884                .unwrap();
1885            blob.sync().await.unwrap();
1886
1887            // Open the blob - Append::new() validates the LAST page (page 2), which is still valid.
1888            // So it should open successfully with size 250.
1889            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1890                .await
1891                .unwrap();
1892            assert_eq!(append.size().await, 250);
1893
1894            // Try to shrink to 150 bytes, which ends in page 1 (the corrupted page).
1895            // 150 bytes = page 0 (103 full) + page 1 (47 partial).
1896            // This should fail because page 1's CRC is corrupted.
1897            let result = append.resize(150).await;
1898            assert!(
1899                matches!(result, Err(crate::Error::InvalidChecksum)),
1900                "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
1901                result
1902            );
1903        });
1904    }
1905
1906    #[test]
1907    fn test_immutable_blob_rejects_append_and_resize() {
1908        let executor = deterministic::Runner::default();
1909
1910        executor.start(|context| async move {
1911            const PAGE_SIZE: NonZeroU16 = NZU16!(64);
1912            const BUFFER_SIZE: usize = 256;
1913
1914            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(4));
1915
1916            let (blob, size) = context
1917                .open("test_partition", b"immutable_test")
1918                .await
1919                .unwrap();
1920
1921            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1922                .await
1923                .unwrap();
1924
1925            // Write some initial data.
1926            append.append(&[1, 2, 3, 4, 5]).await.unwrap();
1927            append.sync().await.unwrap();
1928            assert_eq!(append.size().await, 5);
1929
1930            // Convert to immutable.
1931            append.to_immutable().await.unwrap();
1932            assert!(append.is_immutable().await);
1933
1934            // Verify append() returns ImmutableBlob error.
1935            let result = append.append(&[6, 7, 8]).await;
1936            assert!(
1937                matches!(result, Err(crate::Error::ImmutableBlob)),
1938                "Expected ImmutableBlob error from append(), got: {:?}",
1939                result
1940            );
1941
1942            // Verify resize() returns ImmutableBlob error.
1943            let result = append.resize(100).await;
1944            assert!(
1945                matches!(result, Err(crate::Error::ImmutableBlob)),
1946                "Expected ImmutableBlob error from resize(), got: {:?}",
1947                result
1948            );
1949
1950            // Verify sync() returns Ok.
1951            let result = append.sync().await;
1952            assert!(
1953                result.is_ok(),
1954                "sync() on immutable blob should return Ok, got: {:?}",
1955                result
1956            );
1957
1958            // Verify data is still readable.
1959            let data: Vec<u8> = append.read_at(vec![0u8; 5], 0).await.unwrap().into();
1960            assert_eq!(data, vec![1, 2, 3, 4, 5]);
1961        });
1962    }
1963
1964    #[test]
1965    fn test_corrupted_crc_len_too_large() {
1966        let executor = deterministic::Runner::default();
1967
1968        executor.start(|context| async move {
1969            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1970            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1971
1972            // Step 1: Create blob with valid data
1973            let (blob, size) = context
1974                .open("test_partition", b"crc_len_test")
1975                .await
1976                .unwrap();
1977
1978            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
1979                .await
1980                .unwrap();
1981
1982            append.append(&[0x42; 50]).await.unwrap();
1983            append.sync().await.unwrap();
1984            drop(append);
1985
1986            // Step 2: Corrupt the CRC record to have len > page_size
1987            let (blob, size) = context
1988                .open("test_partition", b"crc_len_test")
1989                .await
1990                .unwrap();
1991            assert_eq!(size as usize, physical_page_size);
1992
1993            // CRC record is at the end of the physical page
1994            let crc_offset = PAGE_SIZE.get() as u64;
1995
1996            // Create a CRC record with len1 = 0xFFFF (65535), which is >> page_size (103)
1997            // Format: [len1_hi, len1_lo, crc1 (4 bytes), len2_hi, len2_lo, crc2 (4 bytes)]
1998            let bad_crc_record: [u8; 12] = [
1999                0xFF, 0xFF, // len1 = 65535 (way too large)
2000                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
2001                0x00, 0x00, // len2 = 0
2002                0x00, 0x00, 0x00, 0x00, // crc2 = 0
2003            ];
2004            blob.write_at(bad_crc_record.to_vec(), crc_offset)
2005                .await
2006                .unwrap();
2007            blob.sync().await.unwrap();
2008
2009            // Step 3: Try to open the blob - should NOT panic, should return error or handle gracefully
2010            let result = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone()).await;
2011
2012            // Either returns InvalidChecksum error OR truncates the corrupted data
2013            // (both are acceptable behaviors - panicking is NOT acceptable)
2014            match result {
2015                Ok(append) => {
2016                    // If it opens successfully, the corrupted page should have been truncated
2017                    let recovered_size = append.size().await;
2018                    assert_eq!(
2019                        recovered_size, 0,
2020                        "Corrupted page should be truncated, size should be 0"
2021                    );
2022                }
2023                Err(e) => {
2024                    // Error is also acceptable (for immutable blobs)
2025                    assert!(
2026                        matches!(e, crate::Error::InvalidChecksum),
2027                        "Expected InvalidChecksum error, got: {:?}",
2028                        e
2029                    );
2030                }
2031            }
2032        });
2033    }
2034
2035    #[test]
2036    fn test_corrupted_crc_both_slots_len_too_large() {
2037        let executor = deterministic::Runner::default();
2038
2039        executor.start(|context| async move {
2040            let pool_ref = PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2041
2042            // Step 1: Create blob with valid data
2043            let (blob, size) = context
2044                .open("test_partition", b"crc_both_bad")
2045                .await
2046                .unwrap();
2047
2048            let append = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
2049                .await
2050                .unwrap();
2051
2052            append.append(&[0x42; 50]).await.unwrap();
2053            append.sync().await.unwrap();
2054            drop(append);
2055
2056            // Step 2: Corrupt BOTH CRC slots to have len > page_size
2057            let (blob, size) = context
2058                .open("test_partition", b"crc_both_bad")
2059                .await
2060                .unwrap();
2061
2062            let crc_offset = PAGE_SIZE.get() as u64;
2063
2064            // Both slots have len > page_size
2065            let bad_crc_record: [u8; 12] = [
2066                0x01, 0x00, // len1 = 256 (> 103)
2067                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
2068                0x02, 0x00, // len2 = 512 (> 103)
2069                0xCA, 0xFE, 0xBA, 0xBE, // crc2 (garbage)
2070            ];
2071            blob.write_at(bad_crc_record.to_vec(), crc_offset)
2072                .await
2073                .unwrap();
2074            blob.sync().await.unwrap();
2075
2076            // Step 3: Try to open - should NOT panic
2077            let result = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone()).await;
2078
2079            match result {
2080                Ok(append) => {
2081                    // Corrupted page truncated
2082                    assert_eq!(append.size().await, 0);
2083                }
2084                Err(e) => {
2085                    assert!(
2086                        matches!(e, crate::Error::InvalidChecksum),
2087                        "Expected InvalidChecksum, got: {:?}",
2088                        e
2089                    );
2090                }
2091            }
2092        });
2093    }
2094}