Skip to main content

commonware_runtime/utils/buffer/paged/
append.rs

1//! The [Append] wrapper consists of a [Blob] and a write buffer, and provides a logical view over
2//! the underlying blob which has a page-oriented structure that provides integrity guarantees. The
3//! wrapper also provides read caching managed by a page cache.
4//!
5//! # Recovery
6//!
7//! On `sync`, this wrapper will durably write buffered data to the underlying blob in pages. All
8//! pages have a [Checksum] at the end. If no CRC record existed before for the page being written,
9//! then one of the checksums will be all zero. If a checksum already existed for the page being
10//! written, then the write will overwrite only the checksum with the lesser length value. Should
11//! this write fail, the previously committed page state can still be recovered.
12//!
13//! During initialization, the wrapper will back up over any page that is not accompanied by a
14//! valid CRC, treating it as the result of an incomplete write that may be invalid.
15
16use super::read::{PageReader, Replay};
17use crate::{
18    buffer::{
19        paged::{CacheRef, Checksum, CHECKSUM_SIZE},
20        tip::Buffer,
21    },
22    Blob, Error, IoBuf, IoBufMut, IoBufs,
23};
24use bytes::BufMut;
25use commonware_cryptography::Crc32;
26use commonware_utils::sync::{AsyncRwLock, AsyncRwLockWriteGuard};
27use std::{
28    num::{NonZeroU16, NonZeroUsize},
29    sync::Arc,
30};
31use tracing::warn;
32
33/// Indicates which CRC slot in a page record must not be overwritten.
34#[derive(Clone, Copy)]
35enum ProtectedCrc {
36    First,
37    Second,
38}
39
40/// Describes the state of the underlying blob with respect to the buffer.
41#[derive(Clone)]
42struct BlobState<B: Blob> {
43    blob: B,
44
45    /// The page where the next appended byte will be written to.
46    current_page: u64,
47
48    /// The state of the partial page in the blob. If it was written due to a sync call, then this
49    /// will contain its CRC record.
50    partial_page_state: Option<Checksum>,
51}
52
53/// A [Blob] wrapper that supports write-cached appending of data, with checksums for data integrity
54/// and page cache managed caching.
55#[derive(Clone)]
56pub struct Append<B: Blob> {
57    /// The underlying blob being wrapped.
58    blob_state: Arc<AsyncRwLock<BlobState<B>>>,
59
60    /// Unique id assigned to this blob by the page cache.
61    id: u64,
62
63    /// A reference to the page cache that manages read caching for this blob.
64    cache_ref: CacheRef,
65
66    /// The write buffer containing any logical bytes following the last full page boundary in the
67    /// underlying blob.
68    buffer: Arc<AsyncRwLock<Buffer>>,
69}
70
71/// Returns the capacity with a floor applied to ensure it can hold at least one full page of new
72/// data even when caching a nearly-full page of already written data.
73fn capacity_with_floor(capacity: usize, page_size: u64) -> usize {
74    let floor = page_size as usize * 2;
75    if capacity < floor {
76        warn!(
77            floor,
78            "requested buffer capacity is too low, increasing it to floor"
79        );
80        floor
81    } else {
82        capacity
83    }
84}
85
86impl<B: Blob> Append<B> {
87    /// Create a new [Append] wrapper of the provided `blob` that is known to have `blob_size`
88    /// underlying physical bytes, using the provided `cache_ref` for read caching, and a write
89    /// buffer with capacity `capacity`. Rewinds the blob if necessary to ensure it only contains
90    /// checksum-validated data.
91    pub async fn new(
92        blob: B,
93        original_blob_size: u64,
94        capacity: usize,
95        cache_ref: CacheRef,
96    ) -> Result<Self, Error> {
97        let (partial_page_state, pages, invalid_data_found) =
98            Self::read_last_valid_page(&blob, original_blob_size, cache_ref.page_size()).await?;
99        if invalid_data_found {
100            // Invalid data was detected, trim it from the blob.
101            let new_blob_size = pages * (cache_ref.page_size() + CHECKSUM_SIZE);
102            warn!(
103                original_blob_size,
104                new_blob_size, "truncating blob to remove invalid data"
105            );
106            blob.resize(new_blob_size).await?;
107            blob.sync().await?;
108        }
109
110        let capacity = capacity_with_floor(capacity, cache_ref.page_size());
111
112        let (blob_state, partial_data) = match partial_page_state {
113            Some((partial_page, crc_record)) => (
114                BlobState {
115                    blob,
116                    current_page: pages - 1,
117                    partial_page_state: Some(crc_record),
118                },
119                Some(partial_page),
120            ),
121            None => (
122                BlobState {
123                    blob,
124                    current_page: pages,
125                    partial_page_state: None,
126                },
127                None,
128            ),
129        };
130
131        let buffer = Buffer::from(
132            blob_state.current_page * cache_ref.page_size(),
133            partial_data.unwrap_or_default(),
134            capacity,
135            cache_ref.pool().clone(),
136        );
137
138        Ok(Self {
139            blob_state: Arc::new(AsyncRwLock::new(blob_state)),
140            id: cache_ref.next_id(),
141            cache_ref,
142            buffer: Arc::new(AsyncRwLock::new(buffer)),
143        })
144    }
145
146    /// Scans backwards from the end of the blob, stopping when it finds a valid page.
147    ///
148    /// # Returns
149    ///
150    /// A tuple of `(partial_page, page_count, invalid_data_found)`:
151    ///
152    /// - `partial_page`: If the last valid page is partial (contains fewer than `page_size` logical
153    ///   bytes), returns `Some((data, crc_record))` containing the logical data and its CRC record.
154    ///   Returns `None` if the last valid page is full or if no valid pages exist.
155    ///
156    /// - `page_count`: The number of pages in the blob up to and including the last valid page
157    ///   found (whether or not it's partial). Note that it's possible earlier pages may be invalid
158    ///   since this function stops scanning when it finds one valid page.
159    ///
160    /// - `invalid_data_found`: `true` if there are any bytes in the blob that follow the last valid
161    ///   page. Typically the blob should be resized to eliminate them since their integrity cannot
162    ///   be guaranteed.
163    async fn read_last_valid_page(
164        blob: &B,
165        blob_size: u64,
166        page_size: u64,
167    ) -> Result<(Option<(IoBuf, Checksum)>, u64, bool), Error> {
168        let physical_page_size = page_size + CHECKSUM_SIZE;
169        let partial_bytes = blob_size % physical_page_size;
170        let mut last_page_end = blob_size - partial_bytes;
171
172        // If the last physical page in the blob is truncated, it can't have a valid CRC record and
173        // must be invalid.
174        let mut invalid_data_found = partial_bytes != 0;
175
176        while last_page_end != 0 {
177            // Read the last page and parse its CRC record.
178            let page_start = last_page_end - physical_page_size;
179            let buf = blob
180                .read_at(page_start, physical_page_size as usize)
181                .await?
182                .coalesce()
183                .freeze();
184
185            match Checksum::validate_page(buf.as_ref()) {
186                Some(crc_record) => {
187                    // Found a valid page.
188                    let (len, _) = crc_record.get_crc();
189                    let len = len as u64;
190                    if len != page_size {
191                        // The page is partial (logical data doesn't fill the page).
192                        let logical_bytes = buf.slice(..len as usize);
193                        return Ok((
194                            Some((logical_bytes, crc_record)),
195                            last_page_end / physical_page_size,
196                            invalid_data_found,
197                        ));
198                    }
199                    // The page is full.
200                    return Ok((None, last_page_end / physical_page_size, invalid_data_found));
201                }
202                None => {
203                    // The page is invalid.
204                    last_page_end = page_start;
205                    invalid_data_found = true;
206                }
207            }
208        }
209
210        // No valid page exists in the blob.
211        Ok((None, 0, invalid_data_found))
212    }
213
214    /// Append all bytes in `buf` to the tip of the blob.
215    pub async fn append(&self, buf: &[u8]) -> Result<(), Error> {
216        let mut buffer = self.buffer.write().await;
217
218        if !buffer.append(buf) {
219            return Ok(());
220        }
221
222        // Buffer is over capacity, so we need to write data to the blob.
223        self.flush_internal(buffer, false).await
224    }
225
226    /// Flush all full pages from the buffer to disk, resetting the buffer to contain only the bytes
227    /// in any final partial page. If `write_partial_page` is true, the partial page will be written
228    /// to the blob as well along with a CRC record.
229    ///
230    /// # Serialization
231    ///
232    /// This method reads `partial_page_state` from `blob_state` under a read lock, then later
233    /// acquires `blob_state` as a write lock to commit the new state. This is safe because the
234    /// caller always holds the buffer write lock (`buf_guard`), and all paths into `flush_internal`
235    /// require that lock, so concurrent flushes are impossible.
236    async fn flush_internal(
237        &self,
238        mut buf_guard: AsyncRwLockWriteGuard<'_, Buffer>,
239        write_partial_page: bool,
240    ) -> Result<(), Error> {
241        let buffer = &mut *buf_guard;
242
243        // Read the old partial page state before doing the heavy work of preparing physical pages.
244        // This is safe because partial_page_state is only modified by flush_internal, and we hold
245        // the buffer write lock which prevents concurrent flushes.
246        let old_partial_page_state = {
247            let blob_state = self.blob_state.read().await;
248            blob_state.partial_page_state.clone()
249        };
250
251        // Prepare the *physical* pages corresponding to the data in the buffer.
252        // Pass the old partial page state so the CRC record is constructed correctly.
253        let (mut physical_pages, partial_page_state) = self.to_physical_pages(
254            &*buffer,
255            write_partial_page,
256            old_partial_page_state.as_ref(),
257        );
258
259        // If there's nothing to write, return early.
260        if physical_pages.is_empty() {
261            return Ok(());
262        }
263
264        // Split buffered bytes into full logical pages to hand off now, leaving any trailing
265        // partial page in tip for continued buffering.
266        let logical_page_size = self.cache_ref.page_size() as usize;
267        let pages_to_cache = buffer.len() / logical_page_size;
268        let bytes_to_drain = pages_to_cache * logical_page_size;
269
270        // Remember the logical start offset and page bytes for caching of flushed full pages.
271        let cache_pages = if pages_to_cache > 0 {
272            Some((buffer.offset, buffer.slice(..bytes_to_drain)))
273        } else {
274            None
275        };
276
277        // Drain full pages from the buffered logical data. If the tip is fully drained, detach its
278        // backing so empty append buffers don't retain pooled storage.
279        if bytes_to_drain == buffer.len() && bytes_to_drain != 0 {
280            let _ = buffer
281                .take()
282                .expect("take must succeed when flush drains all buffered bytes");
283        } else if bytes_to_drain != 0 {
284            buffer.drop_prefix(bytes_to_drain);
285            buffer.offset += bytes_to_drain as u64;
286        }
287        let new_offset = buffer.offset;
288
289        // Cache full pages before releasing the tip lock so reads don't observe stale persisted
290        // bytes during the handoff from tip to cache.
291        if let Some((cache_offset, pages)) = cache_pages {
292            let remaining = self.cache_ref.cache(self.id, pages.as_ref(), cache_offset);
293            assert_eq!(remaining, 0, "cached full-page prefix must be page-aligned");
294        }
295
296        // Acquire a write lock on the blob state so nobody tries to read or modify the blob while
297        // we're writing to it.
298        let mut blob_state = self.blob_state.write().await;
299
300        // Release the buffer lock to allow for concurrent reads & buffered writes while we write
301        // the physical pages.
302        drop(buf_guard);
303
304        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
305        let write_at_offset = blob_state.current_page * physical_page_size as u64;
306
307        // Identify protected regions based on the OLD partial page state
308        let protected_regions = Self::identify_protected_regions(old_partial_page_state.as_ref());
309
310        // Update state before writing. This may appear to risk data loss if writes fail,
311        // but write failures are fatal per this codebase's design - callers must not use
312        // the blob after any mutable method returns an error.
313        blob_state.current_page += pages_to_cache as u64;
314        blob_state.partial_page_state = partial_page_state;
315
316        // Make sure the buffer offset and underlying blob agree on the state of the tip.
317        assert_eq!(
318            blob_state.current_page * self.cache_ref.page_size(),
319            new_offset
320        );
321
322        // Write the physical pages to the blob.
323        // If there are protected regions in the first page, we need to write around them.
324        if let Some((prefix_len, protected_crc)) = protected_regions {
325            match protected_crc {
326                ProtectedCrc::First => {
327                    // Protected CRC is first: [page_size..page_size+6]
328                    // Write 1: New data in first page [prefix_len..page_size]
329                    if prefix_len < logical_page_size {
330                        let _ = physical_pages.split_to(prefix_len);
331                        let first_payload = physical_pages.split_to(logical_page_size - prefix_len);
332                        blob_state
333                            .blob
334                            .write_at(write_at_offset + prefix_len as u64, first_payload)
335                            .await?;
336                    } else {
337                        // Skip the protected first page bytes when they are fully covered.
338                        let _ = physical_pages.split_to(logical_page_size);
339                    }
340
341                    // Write 2: Second CRC of first page + all remaining pages [page_size+6..end]
342                    if physical_pages.len() > 6 {
343                        let _ = physical_pages.split_to(6);
344                        blob_state
345                            .blob
346                            .write_at(
347                                write_at_offset + (logical_page_size + 6) as u64,
348                                physical_pages,
349                            )
350                            .await?;
351                    }
352                }
353                ProtectedCrc::Second => {
354                    // Protected CRC is second: [page_size+6..page_size+12]
355                    // Write 1: New data + first CRC of first page [prefix_len..page_size+6]
356                    let first_crc_end = logical_page_size + 6;
357                    if prefix_len < first_crc_end {
358                        let _ = physical_pages.split_to(prefix_len);
359                        let first_payload = physical_pages.split_to(first_crc_end - prefix_len);
360                        blob_state
361                            .blob
362                            .write_at(write_at_offset + prefix_len as u64, first_payload)
363                            .await?;
364                    } else {
365                        // Skip the fully protected first segment when no bytes from it need update.
366                        let _ = physical_pages.split_to(first_crc_end);
367                    }
368
369                    // Write 2: All remaining pages (if any) [physical_page_size..end]
370                    let skip = physical_page_size - first_crc_end;
371                    if physical_pages.len() > skip {
372                        let _ = physical_pages.split_to(skip);
373                        blob_state
374                            .blob
375                            .write_at(write_at_offset + physical_page_size as u64, physical_pages)
376                            .await?;
377                    }
378                }
379            }
380        } else {
381            // No protected regions, write everything in one operation
382            blob_state
383                .blob
384                .write_at(write_at_offset, physical_pages)
385                .await?;
386        }
387
388        Ok(())
389    }
390
391    /// Returns the logical size of the blob. This accounts for both written and buffered data.
392    pub async fn size(&self) -> u64 {
393        let buffer = self.buffer.read().await;
394        buffer.size()
395    }
396
397    /// Returns the logical size of the blob if it can be observed without waiting.
398    ///
399    /// This is useful for opportunistic fast paths that should fall back rather than contend with
400    /// concurrent writers.
401    pub fn try_size(&self) -> Option<u64> {
402        let buffer = self.buffer.try_read().ok()?;
403        Some(buffer.size())
404    }
405
406    /// Read into `buf` if it can be done synchronously (e.g. without I/O), returning `false` otherwise.
407    ///
408    /// Returns `true` only if all `buf.len()` bytes were satisfied. The caller must have
409    /// already validated that `offset + buf.len()` is within the blob's logical size.
410    pub fn try_read_sync(&self, offset: u64, buf: &mut [u8]) -> bool {
411        self.cache_ref.read_cached(self.id, buf, offset) == buf.len()
412    }
413
414    /// Read exactly `len` immutable bytes starting at `offset`.
415    pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
416        // Read into a temporary contiguous buffer and copy back to preserve structure.
417        // SAFETY: read_into below initializes all `len` bytes.
418        let mut buf = unsafe { self.cache_ref.pool().alloc_len(len) };
419        self.read_into(buf.as_mut(), offset).await?;
420        Ok(buf.into())
421    }
422
423    /// Reads up to `buf.len()` bytes starting at `logical_offset`, but only as many as are
424    /// available.
425    ///
426    /// This is useful for reading variable-length prefixes (like varints) where you want to read
427    /// up to a maximum number of bytes but the actual data might be shorter.
428    ///
429    /// Returns the buffer (truncated to actual bytes read) and the number of bytes read.
430    /// Returns an error if no bytes are available at the given offset.
431    pub async fn read_up_to(
432        &self,
433        logical_offset: u64,
434        len: usize,
435        bufs: impl Into<IoBufMut> + Send,
436    ) -> Result<(IoBufMut, usize), Error> {
437        let mut bufs = bufs.into();
438        if len == 0 {
439            bufs.truncate(0);
440            return Ok((bufs, 0));
441        }
442        let blob_size = self.size().await;
443        let available = (blob_size.saturating_sub(logical_offset) as usize).min(len);
444        if available == 0 {
445            return Err(Error::BlobInsufficientLength);
446        }
447        // SAFETY: read_into below fills all `available` bytes.
448        unsafe { bufs.set_len(available) };
449        self.read_into(bufs.as_mut(), logical_offset).await?;
450
451        Ok((bufs, available))
452    }
453
454    /// Reads bytes starting at `logical_offset` into `buf`.
455    ///
456    /// This method allows reading directly into a mutable slice without taking ownership of the
457    /// buffer or requiring a specific buffer type.
458    pub async fn read_into(&self, buf: &mut [u8], logical_offset: u64) -> Result<(), Error> {
459        // Ensure the read doesn't overflow.
460        let end_offset = logical_offset
461            .checked_add(buf.len() as u64)
462            .ok_or(Error::OffsetOverflow)?;
463
464        // Acquire a read lock on the buffer.
465        let buffer = self.buffer.read().await;
466
467        // If the data required is beyond the size of the blob, return an error.
468        if end_offset > buffer.size() {
469            return Err(Error::BlobInsufficientLength);
470        }
471
472        // Extract any bytes from the buffer that overlap with the requested range.
473        let remaining = if end_offset <= buffer.offset {
474            // No overlap with tip.
475            buf.len()
476        } else {
477            // Overlap is always a suffix of requested range.
478            let overlap_start = buffer.offset.max(logical_offset);
479            let dst_start = (overlap_start - logical_offset) as usize;
480            let src_start = (overlap_start - buffer.offset) as usize;
481            let copied = buf.len() - dst_start;
482            buf[dst_start..].copy_from_slice(&buffer.as_ref()[src_start..src_start + copied]);
483            dst_start
484        };
485
486        // Release buffer lock before potential I/O.
487        drop(buffer);
488
489        if remaining == 0 {
490            return Ok(());
491        }
492
493        // Fast path: try to read *only* from page cache without acquiring blob lock. This allows
494        // concurrent reads even while a flush is in progress.
495        let cached = self
496            .cache_ref
497            .read_cached(self.id, &mut buf[..remaining], logical_offset);
498
499        if cached == remaining {
500            // All bytes found in cache.
501            return Ok(());
502        }
503
504        // Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight
505        // write completes before we read from the blob.
506        let blob_guard = self.blob_state.read().await;
507
508        // Read remaining bytes that were not already obtained from the earlier cache read.
509        let uncached_offset = logical_offset + cached as u64;
510        let uncached_len = remaining - cached;
511        self.cache_ref
512            .read(
513                &blob_guard.blob,
514                self.id,
515                &mut buf[cached..cached + uncached_len],
516                uncached_offset,
517            )
518            .await
519    }
520
521    /// Returns the protected region info for a partial page, if any.
522    ///
523    /// # Returns
524    ///
525    /// `None` if there's no existing partial page.
526    ///
527    /// `Some((prefix_len, protected_crc))` where:
528    /// - `prefix_len`: bytes `[0..prefix_len]` were already written and can be substituted with
529    ///   zeros (skip writing)
530    /// - `protected_crc`: which CRC slot must not be overwritten
531    fn identify_protected_regions(
532        partial_page_state: Option<&Checksum>,
533    ) -> Option<(usize, ProtectedCrc)> {
534        let crc_record = partial_page_state?;
535        let (old_len, _) = crc_record.get_crc();
536        // The protected CRC is the one with the larger (authoritative) length.
537        let protected_crc = if crc_record.len1 >= crc_record.len2 {
538            ProtectedCrc::First
539        } else {
540            ProtectedCrc::Second
541        };
542        Some((old_len as usize, protected_crc))
543    }
544
545    /// Prepare physical-page writes from buffered logical bytes.
546    ///
547    /// Each physical page contains one logical page plus CRC record. If the last page is not yet
548    /// full, it will be included only if `include_partial_page` is true.
549    ///
550    /// # Arguments
551    ///
552    /// * `buffer` - The buffer containing logical page data
553    /// * `include_partial_page` - Whether to include a partial page if one exists
554    /// * `old_crc_record` - The CRC record from a previously committed partial page, if any.
555    ///   When present, the first page's CRC record will preserve the old CRC in its original slot
556    ///   and place the new CRC in the other slot.
557    fn to_physical_pages(
558        &self,
559        buffer: &Buffer,
560        include_partial_page: bool,
561        old_crc_record: Option<&Checksum>,
562    ) -> (IoBufs, Option<Checksum>) {
563        let logical_page_size = self.cache_ref.page_size() as usize;
564        let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
565        let pages_to_write = buffer.len() / logical_page_size;
566        let mut write_buffer = IoBufs::default();
567        let buffer_data = buffer.as_ref();
568
569        if pages_to_write > 0 {
570            let logical_page_size_u16 =
571                u16::try_from(logical_page_size).expect("page size must fit in u16 for CRC record");
572
573            // Build CRC bytes for full pages once. Full-page payload bytes are appended below as
574            // slices from tip, so we avoid copying logical payload here.
575            let mut crcs = self
576                .cache_ref
577                .pool()
578                .alloc(CHECKSUM_SIZE as usize * pages_to_write);
579            for page in 0..pages_to_write {
580                let start_read_idx = page * logical_page_size;
581                let end_read_idx = start_read_idx + logical_page_size;
582                let logical_page = &buffer_data[start_read_idx..end_read_idx];
583                let crc = Crc32::checksum(logical_page);
584
585                // For the first page, if there's an old partial page CRC, construct the record
586                // to preserve the old CRC in its original slot.
587                let crc_record = if let (0, Some(old_crc)) = (page, old_crc_record) {
588                    Self::build_crc_record_preserving_old(logical_page_size_u16, crc, old_crc)
589                } else {
590                    Checksum::new(logical_page_size_u16, crc)
591                };
592                crcs.put_slice(&crc_record.to_bytes());
593            }
594            let crc_blob = crcs.freeze();
595
596            // Physical full-page layout is [logical_page_bytes, crc_record_bytes].
597            for page in 0..pages_to_write {
598                let start_read_idx = page * logical_page_size;
599                let end_read_idx = start_read_idx + logical_page_size;
600                write_buffer.append(buffer.slice(start_read_idx..end_read_idx));
601
602                let crc_start = page * CHECKSUM_SIZE as usize;
603                write_buffer.append(crc_blob.slice(crc_start..crc_start + CHECKSUM_SIZE as usize));
604            }
605        }
606
607        if !include_partial_page {
608            return (write_buffer, None);
609        }
610
611        let partial_page = &buffer_data[pages_to_write * logical_page_size..];
612        if partial_page.is_empty() {
613            // No partial page data to write.
614            return (write_buffer, None);
615        }
616
617        // If there are no full pages and the partial page length matches what was already
618        // written, there's nothing new to write.
619        if pages_to_write == 0 {
620            if let Some(old_crc) = old_crc_record {
621                let (old_len, _) = old_crc.get_crc();
622                if partial_page.len() == old_len as usize {
623                    return (write_buffer, None);
624                }
625            }
626        }
627        let partial_len = partial_page.len();
628        let crc = Crc32::checksum(partial_page);
629
630        // For partial pages: if this is the first page and there's an old CRC, preserve it.
631        // Otherwise just use the new CRC in slot 0.
632        let crc_record = if let (0, Some(old_crc)) = (pages_to_write, old_crc_record) {
633            Self::build_crc_record_preserving_old(partial_len as u16, crc, old_crc)
634        } else {
635            Checksum::new(partial_len as u16, crc)
636        };
637
638        // A persisted partial page still occupies one full physical page:
639        // [partial logical bytes, zero padding, crc record].
640        let mut padded = self.cache_ref.pool().alloc(physical_page_size);
641        padded.put_slice(partial_page);
642        let zero_count = logical_page_size - partial_len;
643        if zero_count > 0 {
644            padded.put_bytes(0, zero_count);
645        }
646        padded.put_slice(&crc_record.to_bytes());
647        write_buffer.append(padded.freeze());
648
649        // Return the CRC record that matches what we wrote to disk, so that future flushes
650        // correctly identify which slot is protected.
651        (write_buffer, Some(crc_record))
652    }
653
654    /// Build a CRC record that preserves the old CRC in its original slot and places
655    /// the new CRC in the other slot.
656    const fn build_crc_record_preserving_old(
657        new_len: u16,
658        new_crc: u32,
659        old_crc: &Checksum,
660    ) -> Checksum {
661        let (old_len, old_crc_val) = old_crc.get_crc();
662        // The old CRC is in the slot with the larger length value (first slot wins ties).
663        if old_crc.len1 >= old_crc.len2 {
664            // Old CRC is in slot 0, put new CRC in slot 1
665            Checksum {
666                len1: old_len,
667                crc1: old_crc_val,
668                len2: new_len,
669                crc2: new_crc,
670            }
671        } else {
672            // Old CRC is in slot 1, put new CRC in slot 0
673            Checksum {
674                len1: new_len,
675                crc1: new_crc,
676                len2: old_len,
677                crc2: old_crc_val,
678            }
679        }
680    }
681
682    /// Flushes any buffered data, then returns a [Replay] for the underlying blob.
683    ///
684    /// The returned replay can be used to sequentially read all pages from the blob while ensuring
685    /// all data passes integrity verification. CRCs are validated but not included in the output.
686    pub async fn replay(&self, buffer_size: NonZeroUsize) -> Result<Replay<B>, Error> {
687        let logical_page_size = self.cache_ref.page_size();
688        let logical_page_size_nz =
689            NonZeroU16::new(logical_page_size as u16).expect("page_size is non-zero");
690
691        // Flush any buffered data (without fsync) so the reader sees all written data.
692        {
693            let buf_guard = self.buffer.write().await;
694            self.flush_internal(buf_guard, true).await?;
695        }
696
697        // Convert buffer size (bytes) to page count
698        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
699        let prefetch_pages = buffer_size.get() / physical_page_size as usize;
700        let prefetch_pages = prefetch_pages.max(1); // At least 1 page
701        let blob_guard = self.blob_state.read().await;
702
703        // Compute both physical and logical blob sizes.
704        let (physical_blob_size, logical_blob_size) =
705            blob_guard.partial_page_state.as_ref().map_or_else(
706                || {
707                    // All pages are full.
708                    let physical = physical_page_size * blob_guard.current_page;
709                    let logical = logical_page_size * blob_guard.current_page;
710                    (physical, logical)
711                },
712                |crc_record| {
713                    // There's a partial page with a checksum.
714                    let (partial_len, _) = crc_record.get_crc();
715                    let partial_len = partial_len as u64;
716                    // Physical: all pages including the partial one (which is padded to full size).
717                    let physical = physical_page_size * (blob_guard.current_page + 1);
718                    // Logical: full pages before this + partial page's actual data length.
719                    let logical = logical_page_size * blob_guard.current_page + partial_len;
720                    (physical, logical)
721                },
722            );
723
724        let reader = PageReader::new(
725            blob_guard.blob.clone(),
726            physical_blob_size,
727            logical_blob_size,
728            prefetch_pages,
729            logical_page_size_nz,
730        );
731        Ok(Replay::new(reader))
732    }
733}
734
735impl<B: Blob> Append<B> {
736    pub async fn sync(&self) -> Result<(), Error> {
737        // Flush any buffered data, including any partial page. When flush_internal returns,
738        // write_at has completed and data has been written to the underlying blob.
739        let buf_guard = self.buffer.write().await;
740        self.flush_internal(buf_guard, true).await?;
741
742        // Sync the underlying blob. We need the blob read lock here since sync() requires access
743        // to the blob, but only a read lock since we're not modifying blob state.
744        let blob_state = self.blob_state.read().await;
745        blob_state.blob.sync().await
746    }
747
748    /// Resize the blob to the provided logical `size`.
749    ///
750    /// This truncates the blob to contain only `size` logical bytes. The physical blob size will
751    /// be adjusted to include the necessary CRC records for the remaining pages.
752    ///
753    /// # Warning
754    ///
755    /// - Concurrent mutable operations (append, resize) are not supported and will cause data loss.
756    /// - Concurrent readers which try to read past the new size during the resize may error.
757    /// - The resize is not guaranteed durable until the next sync.
758    pub async fn resize(&self, size: u64) -> Result<(), Error> {
759        let current_size = self.size().await;
760
761        // Handle growing by appending zero bytes.
762        if size > current_size {
763            let zeros_needed = (size - current_size) as usize;
764            let mut zeros = self.cache_ref.pool().alloc(zeros_needed);
765            zeros.put_bytes(0, zeros_needed);
766            self.append(zeros.as_ref()).await?;
767            return Ok(());
768        }
769
770        // Implementation note: rewinding the blob across a page boundary potentially results in
771        // stale data remaining in the page cache. We don't proactively purge the data
772        // within this function since it would be inaccessible anyway. Instead we ensure it is
773        // always updated should the blob grow back to the point where we have new data for the same
774        // page, if any old data hasn't expired naturally by then.
775
776        let logical_page_size = self.cache_ref.page_size();
777        let physical_page_size = logical_page_size + CHECKSUM_SIZE;
778
779        // Flush any buffered data first to ensure we have a consistent state on disk.
780        self.sync().await?;
781
782        // Acquire both locks to prevent concurrent operations.
783        let mut buf_guard = self.buffer.write().await;
784        let mut blob_guard = self.blob_state.write().await;
785
786        // Calculate the physical size needed for the new logical size.
787        let full_pages = size / logical_page_size;
788        let partial_bytes = size % logical_page_size;
789        let new_physical_size = if partial_bytes > 0 {
790            // We need full_pages + 1 physical pages to hold the partial data.
791            // The partial page will be padded to full physical page size.
792            (full_pages + 1) * physical_page_size
793        } else {
794            // No partial page needed.
795            full_pages * physical_page_size
796        };
797
798        // Resize the underlying blob.
799        blob_guard.blob.resize(new_physical_size).await?;
800        blob_guard.partial_page_state = None;
801
802        // Update blob state and buffer based on the desired logical size. The partial page data is
803        // read with CRC validation; the validated length may exceed partial_bytes (reflecting the
804        // old data length), but we only load the prefix we need. The next sync will write the
805        // correct CRC for the new length.
806        //
807        // Note: This updates state before validation completes, which could leave state
808        // inconsistent if validation fails. This is acceptable because failures from mutable
809        // methods are fatal - callers must not use the blob after any error.
810
811        blob_guard.current_page = full_pages;
812        buf_guard.offset = full_pages * logical_page_size;
813
814        if partial_bytes > 0 {
815            // There's a partial page. Read its data from disk with CRC validation.
816            let page_data =
817                super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
818
819            // Ensure the validated data covers what we need.
820            if (page_data.len() as u64) < partial_bytes {
821                return Err(Error::InvalidChecksum);
822            }
823
824            buf_guard.clear();
825            let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]);
826            assert!(!over_capacity);
827        } else {
828            // No partial page - all pages are full or blob is empty.
829            buf_guard.clear();
830        }
831
832        Ok(())
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use super::*;
839    use crate::{deterministic, BufferPool, BufferPoolConfig, Runner as _, Storage as _};
840    use commonware_codec::ReadExt;
841    use commonware_macros::test_traced;
842    use commonware_utils::{NZUsize, NZU16};
843    use prometheus_client::registry::Registry;
844    use std::num::NonZeroU16;
845
846    const PAGE_SIZE: NonZeroU16 = NZU16!(103); // janky size to ensure we test page alignment
847    const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
848
849    #[test_traced("DEBUG")]
850    fn test_append_crc_empty() {
851        let executor = deterministic::Runner::default();
852        executor.start(|context: deterministic::Context| async move {
853            // Open a new blob.
854            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
855            assert_eq!(blob_size, 0);
856
857            // Create a page cache reference.
858            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
859
860            // Create an Append wrapper.
861            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
862                .await
863                .unwrap();
864
865            // Verify initial size is 0.
866            assert_eq!(append.size().await, 0);
867
868            // Close & re-open.
869            append.sync().await.unwrap();
870            drop(append);
871
872            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
873            assert_eq!(blob_size, 0); // There was no need to write a crc since there was no data.
874
875            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
876                .await
877                .unwrap();
878
879            assert_eq!(append.size().await, 0);
880        });
881    }
882
883    #[test_traced("DEBUG")]
884    fn test_append_crc_basic() {
885        let executor = deterministic::Runner::default();
886        executor.start(|context: deterministic::Context| async move {
887            // Open a new blob.
888            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
889            assert_eq!(blob_size, 0);
890
891            // Create a page cache reference.
892            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
893
894            // Create an Append wrapper.
895            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
896                .await
897                .unwrap();
898
899            // Verify initial size is 0.
900            assert_eq!(append.size().await, 0);
901
902            // Append some bytes.
903            let data = vec![1, 2, 3, 4, 5];
904            append.append(&data).await.unwrap();
905
906            // Verify size reflects appended data.
907            assert_eq!(append.size().await, 5);
908
909            // Append more bytes.
910            let more_data = vec![6, 7, 8, 9, 10];
911            append.append(&more_data).await.unwrap();
912
913            // Verify size is cumulative.
914            assert_eq!(append.size().await, 10);
915
916            // Read back the first chunk and verify.
917            let read_buf = append.read_at(0, 5).await.unwrap().coalesce();
918            assert_eq!(read_buf, &data[..]);
919
920            // Read back the second chunk and verify.
921            let read_buf = append.read_at(5, 5).await.unwrap().coalesce();
922            assert_eq!(read_buf, &more_data[..]);
923
924            // Read all data at once and verify.
925            let read_buf = append.read_at(0, 10).await.unwrap().coalesce();
926            assert_eq!(read_buf, &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
927
928            // Close and reopen the blob and make sure the data is still there and the trailing
929            // checksum is written & stripped as expected.
930            append.sync().await.unwrap();
931            drop(append);
932
933            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
934            // Physical page = 103 logical + 12 Checksum = 115 bytes (padded partial page)
935            assert_eq!(blob_size, 115);
936            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
937                .await
938                .unwrap();
939            assert_eq!(append.size().await, 10); // CRC should be stripped after verification
940
941            // Append data that spans a page boundary.
942            // PAGE_SIZE=103 is the logical page size. We have 10 bytes, so writing
943            // 100 more bytes (total 110) will cross the page boundary at byte 103.
944            let spanning_data: Vec<u8> = (11..=110).collect();
945            append.append(&spanning_data).await.unwrap();
946            assert_eq!(append.size().await, 110);
947
948            // Read back data that spans the page boundary.
949            let read_buf = append.read_at(10, 100).await.unwrap().coalesce();
950            assert_eq!(read_buf, &spanning_data[..]);
951
952            // Read all 110 bytes at once.
953            let read_buf = append.read_at(0, 110).await.unwrap().coalesce();
954            let expected: Vec<u8> = (1..=110).collect();
955            assert_eq!(read_buf, &expected[..]);
956
957            // Drop and re-open and make sure bytes are still there.
958            append.sync().await.unwrap();
959            drop(append);
960
961            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
962            // 2 physical pages: 2 * 115 = 230 bytes
963            assert_eq!(blob_size, 230);
964            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
965                .await
966                .unwrap();
967            assert_eq!(append.size().await, 110);
968
969            // Append data to reach exactly a page boundary.
970            // Logical page size is 103. We have 110 bytes, next boundary is 206 (103 * 2).
971            // So we need 96 more bytes.
972            let boundary_data: Vec<u8> = (111..=206).collect();
973            assert_eq!(boundary_data.len(), 96);
974            append.append(&boundary_data).await.unwrap();
975            assert_eq!(append.size().await, 206);
976
977            // Verify we can read it back.
978            let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
979            let expected: Vec<u8> = (1..=206).collect();
980            assert_eq!(read_buf, &expected[..]);
981
982            // Drop and re-open at the page boundary.
983            append.sync().await.unwrap();
984            drop(append);
985
986            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
987            // Physical size should be exactly 2 pages: 115 * 2 = 230 bytes
988            assert_eq!(blob_size, 230);
989            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
990                .await
991                .unwrap();
992            assert_eq!(append.size().await, 206);
993
994            // Verify data is still readable after reopen.
995            let read_buf = append.read_at(0, 206).await.unwrap().coalesce();
996            assert_eq!(read_buf, &expected[..]);
997        });
998    }
999
1000    #[test_traced("DEBUG")]
1001    fn test_sync_releases_tip_pool_slot_after_full_drain() {
1002        let executor = deterministic::Runner::default();
1003        executor.start(|context: deterministic::Context| async move {
1004            let mut registry = Registry::default();
1005            let pool = BufferPool::new(
1006                BufferPoolConfig::for_storage()
1007                    .with_pool_min_size(PAGE_SIZE.get() as usize)
1008                    .with_max_per_class(NZUsize!(2)),
1009                &mut registry,
1010            );
1011            let cache_ref = CacheRef::new(pool.clone(), PAGE_SIZE, NZUsize!(1));
1012
1013            let (blob, blob_size) = context
1014                .open("test_partition", b"release_tip_backing")
1015                .await
1016                .unwrap();
1017            assert_eq!(blob_size, 0);
1018
1019            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1020                .await
1021                .unwrap();
1022
1023            append
1024                .append(&vec![7; PAGE_SIZE.get() as usize])
1025                .await
1026                .unwrap();
1027
1028            // One pooled slot backs the page cache and one backs the mutable tip.
1029            assert!(
1030                matches!(
1031                    pool.try_alloc(BUFFER_SIZE),
1032                    Err(crate::iobuf::PoolError::Exhausted)
1033                ),
1034                "full-page tip should occupy the remaining pooled slot before sync"
1035            );
1036
1037            append.sync().await.unwrap();
1038
1039            // After a full drain, the tip should no longer pin that slot.
1040            assert!(
1041                pool.try_alloc(BUFFER_SIZE).is_ok(),
1042                "sync should release pooled backing when no partial tail remains"
1043            );
1044        });
1045    }
1046
1047    #[test_traced("DEBUG")]
1048    fn test_read_up_to_zero_len_truncates_buffer() {
1049        let executor = deterministic::Runner::default();
1050        executor.start(|context: deterministic::Context| async move {
1051            // Open a new blob.
1052            let (blob, blob_size) = context
1053                .open("test_partition", b"read_up_to_zero_len")
1054                .await
1055                .unwrap();
1056            assert_eq!(blob_size, 0);
1057
1058            // Create a page cache reference.
1059            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1060
1061            // Create an Append wrapper and write some data.
1062            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1063                .await
1064                .unwrap();
1065            append.append(&[1, 2, 3, 4]).await.unwrap();
1066
1067            // Request a zero-length read with a reused, non-empty buffer.
1068            let stale = vec![9, 8, 7, 6];
1069            let (buf, read) = append.read_up_to(0, 0, stale).await.unwrap();
1070
1071            assert_eq!(read, 0);
1072            assert_eq!(buf.len(), 0, "read_up_to must truncate returned buffer");
1073            assert_eq!(buf.freeze().as_ref(), b"");
1074        });
1075    }
1076
1077    /// Helper to read the CRC record from raw blob bytes at the end of a physical page.
1078    fn read_crc_record_from_page(page_bytes: &[u8]) -> Checksum {
1079        let crc_start = page_bytes.len() - CHECKSUM_SIZE as usize;
1080        Checksum::read(&mut &page_bytes[crc_start..]).unwrap()
1081    }
1082
1083    /// Dummy marker bytes with len=0 so the mangled slot is never authoritative.
1084    /// Format: [len_hi=0, len_lo=0, 0xDE, 0xAD, 0xBE, 0xEF]
1085    const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF];
1086
1087    #[test]
1088    fn test_identify_protected_regions_equal_lengths() {
1089        // When lengths are equal, the first CRC should be protected (tie-breaking rule).
1090        let record = Checksum {
1091            len1: 50,
1092            crc1: 0xAAAAAAAA,
1093            len2: 50,
1094            crc2: 0xBBBBBBBB,
1095        };
1096
1097        let result =
1098            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1099        assert!(result.is_some());
1100        let (prefix_len, protected_crc) = result.unwrap();
1101        assert_eq!(prefix_len, 50);
1102        assert!(
1103            matches!(protected_crc, ProtectedCrc::First),
1104            "First CRC should be protected when lengths are equal"
1105        );
1106    }
1107
1108    #[test]
1109    fn test_identify_protected_regions_len1_larger() {
1110        // When len1 > len2, the first CRC should be protected.
1111        let record = Checksum {
1112            len1: 100,
1113            crc1: 0xAAAAAAAA,
1114            len2: 50,
1115            crc2: 0xBBBBBBBB,
1116        };
1117
1118        let result =
1119            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1120        assert!(result.is_some());
1121        let (prefix_len, protected_crc) = result.unwrap();
1122        assert_eq!(prefix_len, 100);
1123        assert!(
1124            matches!(protected_crc, ProtectedCrc::First),
1125            "First CRC should be protected when len1 > len2"
1126        );
1127    }
1128
1129    #[test]
1130    fn test_identify_protected_regions_len2_larger() {
1131        // When len2 > len1, the second CRC should be protected.
1132        let record = Checksum {
1133            len1: 50,
1134            crc1: 0xAAAAAAAA,
1135            len2: 100,
1136            crc2: 0xBBBBBBBB,
1137        };
1138
1139        let result =
1140            Append::<crate::storage::memory::Blob>::identify_protected_regions(Some(&record));
1141        assert!(result.is_some());
1142        let (prefix_len, protected_crc) = result.unwrap();
1143        assert_eq!(prefix_len, 100);
1144        assert!(
1145            matches!(protected_crc, ProtectedCrc::Second),
1146            "Second CRC should be protected when len2 > len1"
1147        );
1148    }
1149
1150    /// Test that `to_physical_pages` emits full pages zero-copy while still materializing the
1151    /// trailing partial page into one padded physical page.
1152    #[test_traced("DEBUG")]
1153    fn test_to_physical_pages_zero_copy_full_pages_and_materialized_partial() {
1154        // Build a tip buffer containing two full logical pages plus a trailing partial
1155        // page, convert it with `to_physical_pages`, then verify:
1156        // - the result is chunked rather than one contiguous buffer for the full-page portion
1157        // - the logical payload bytes for the first two pages are preserved in order
1158        // - the partial page is padded with zeros up to one full logical page
1159        // - all three resulting physical pages validate their CRC records
1160        let executor = deterministic::Runner::default();
1161        executor.start(|context: deterministic::Context| async move {
1162            // Open a new blob.
1163            let (blob, blob_size) = context
1164                .open("test_partition", b"to_physical_pages_zero_copy")
1165                .await
1166                .unwrap();
1167            assert_eq!(blob_size, 0);
1168
1169            // Create a page cache reference.
1170            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1171
1172            // Create an Append wrapper.
1173            let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref.clone())
1174                .await
1175                .unwrap();
1176
1177            // Build logical data with exactly two full pages followed by one trailing partial page.
1178            // This lets us verify that only the partial page is materialized.
1179            let logical_page_size = PAGE_SIZE.get() as usize;
1180            let partial_len = 17usize;
1181            let data: Vec<u8> = (0..(logical_page_size * 2 + partial_len))
1182                .map(|i| (i % 251) as u8)
1183                .collect();
1184
1185            // Seed a tip buffer with the logical bytes exactly as flush_internal would see them.
1186            let mut buffer = Buffer::new(0, data.len(), cache_ref.pool().clone());
1187            let over_capacity = buffer.append(&data);
1188            assert!(!over_capacity);
1189
1190            // Convert buffered logical bytes into physical-page writes.
1191            let (physical_pages, partial_page_state) =
1192                append.to_physical_pages(&buffer, true, None);
1193
1194            // Two full pages should each contribute a logical slice and a CRC slice, and the
1195            // trailing partial page should contribute one materialized padded physical page.
1196            assert_eq!(physical_pages.chunk_count(), 5);
1197
1198            // The returned partial-page CRC state must describe the exact trailing logical length.
1199            let crc_record = partial_page_state.expect("partial page state must be returned");
1200            let (len, _) = crc_record.get_crc();
1201            assert_eq!(len as usize, partial_len);
1202
1203            // Coalesce for easier content inspection. The assembled bytes should still form three
1204            // full physical pages on disk.
1205            let physical_page_size = logical_page_size + CHECKSUM_SIZE as usize;
1206            let coalesced = physical_pages.coalesce();
1207            assert_eq!(coalesced.len(), physical_page_size * 3);
1208
1209            // The first two physical pages must preserve the two full logical pages verbatim.
1210            assert_eq!(
1211                &coalesced.as_ref()[..logical_page_size],
1212                &data[..logical_page_size]
1213            );
1214            assert_eq!(
1215                &coalesced.as_ref()[physical_page_size..physical_page_size + logical_page_size],
1216                &data[logical_page_size..logical_page_size * 2],
1217            );
1218
1219            // The trailing partial page must contain the remaining logical bytes followed by zero
1220            // padding up to one full logical page.
1221            let partial_start = physical_page_size * 2;
1222            assert_eq!(
1223                &coalesced.as_ref()[partial_start..partial_start + partial_len],
1224                &data[logical_page_size * 2..],
1225            );
1226            assert!(coalesced.as_ref()
1227                [partial_start + partial_len..partial_start + logical_page_size]
1228                .iter()
1229                .all(|byte| *byte == 0));
1230
1231            // Each assembled physical page must carry a valid CRC record.
1232            assert!(Checksum::validate_page(&coalesced.as_ref()[..physical_page_size]).is_some());
1233            assert!(Checksum::validate_page(
1234                &coalesced.as_ref()[physical_page_size..physical_page_size * 2]
1235            )
1236            .is_some());
1237            assert!(Checksum::validate_page(
1238                &coalesced.as_ref()[physical_page_size * 2..physical_page_size * 3]
1239            )
1240            .is_some());
1241        });
1242    }
1243
1244    /// Test that slot 1 is NOT overwritten when it's the protected slot.
1245    ///
1246    /// Strategy: After extending twice (so slot 1 becomes authoritative with larger len),
1247    /// mangle the non-authoritative slot 0. Then extend again - slot 0 should be overwritten
1248    /// with the new CRC, while slot 1 (protected) should remain untouched.
1249    #[test_traced("DEBUG")]
1250    fn test_crc_slot1_protected() {
1251        let executor = deterministic::Runner::default();
1252        executor.start(|context: deterministic::Context| async move {
1253            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1254            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1255            let slot0_offset = PAGE_SIZE.get() as u64;
1256            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1257
1258            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1259            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1260            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1261                .await
1262                .unwrap();
1263            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1264            append.sync().await.unwrap();
1265            drop(append);
1266
1267            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1268            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1269            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1270                .await
1271                .unwrap();
1272            append
1273                .append(&(11..=30).collect::<Vec<u8>>())
1274                .await
1275                .unwrap();
1276            append.sync().await.unwrap();
1277            drop(append);
1278
1279            // Verify slot 1 is now authoritative
1280            let (blob, size) = context.open("test_partition", b"slot1_prot").await.unwrap();
1281            let page = blob
1282                .read_at(0, physical_page_size)
1283                .await
1284                .unwrap()
1285                .coalesce();
1286            let crc = read_crc_record_from_page(page.as_ref());
1287            assert!(
1288                crc.len2 > crc.len1,
1289                "Slot 1 should be authoritative (len2={} > len1={})",
1290                crc.len2,
1291                crc.len1
1292            );
1293
1294            // Capture slot 1 bytes before mangling slot 0
1295            let slot1_before: Vec<u8> = blob
1296                .read_at(slot1_offset, 6)
1297                .await
1298                .unwrap()
1299                .coalesce()
1300                .freeze()
1301                .into();
1302
1303            // === Step 3: Mangle slot 0 (non-authoritative) ===
1304            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1305                .await
1306                .unwrap();
1307            blob.sync().await.unwrap();
1308
1309            // Verify mangle worked
1310            let slot0_mangled: Vec<u8> = blob
1311                .read_at(slot0_offset, 6)
1312                .await
1313                .unwrap()
1314                .coalesce()
1315                .freeze()
1316                .into();
1317            assert_eq!(slot0_mangled, DUMMY_MARKER, "Mangle failed");
1318
1319            // === Step 4: Extend to 50 bytes → new CRC goes to slot 0, slot 1 protected ===
1320            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1321                .await
1322                .unwrap();
1323            append
1324                .append(&(31..=50).collect::<Vec<u8>>())
1325                .await
1326                .unwrap();
1327            append.sync().await.unwrap();
1328            drop(append);
1329
1330            // === Step 5: Verify slot 0 was overwritten, slot 1 unchanged ===
1331            let (blob, _) = context.open("test_partition", b"slot1_prot").await.unwrap();
1332
1333            // Slot 0 should have new CRC (not our dummy marker)
1334            let slot0_after: Vec<u8> = blob
1335                .read_at(slot0_offset, 6)
1336                .await
1337                .unwrap()
1338                .coalesce()
1339                .freeze()
1340                .into();
1341            assert_ne!(
1342                slot0_after, DUMMY_MARKER,
1343                "Slot 0 should have been overwritten with new CRC"
1344            );
1345
1346            // Slot 1 should be UNCHANGED (protected)
1347            let slot1_after: Vec<u8> = blob
1348                .read_at(slot1_offset, 6)
1349                .await
1350                .unwrap()
1351                .coalesce()
1352                .freeze()
1353                .into();
1354            assert_eq!(
1355                slot1_before, slot1_after,
1356                "Slot 1 was modified! Protected region violated."
1357            );
1358
1359            // Verify the new CRC in slot 0 has len=50
1360            let page = blob
1361                .read_at(0, physical_page_size)
1362                .await
1363                .unwrap()
1364                .coalesce();
1365            let crc = read_crc_record_from_page(page.as_ref());
1366            assert_eq!(crc.len1, 50, "Slot 0 should have len=50");
1367        });
1368    }
1369
1370    /// Test that slot 0 is NOT overwritten when it's the protected slot.
1371    ///
1372    /// Strategy: After extending three times (slot 0 becomes authoritative again with largest len),
1373    /// mangle the non-authoritative slot 1. Then extend again - slot 1 should be overwritten
1374    /// with the new CRC, while slot 0 (protected) should remain untouched.
1375    #[test_traced("DEBUG")]
1376    fn test_crc_slot0_protected() {
1377        let executor = deterministic::Runner::default();
1378        executor.start(|context: deterministic::Context| async move {
1379            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1380            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1381            let slot0_offset = PAGE_SIZE.get() as u64;
1382            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1383
1384            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1385            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1386            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1387                .await
1388                .unwrap();
1389            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1390            append.sync().await.unwrap();
1391            drop(append);
1392
1393            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1394            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1395            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1396                .await
1397                .unwrap();
1398            append
1399                .append(&(11..=30).collect::<Vec<u8>>())
1400                .await
1401                .unwrap();
1402            append.sync().await.unwrap();
1403            drop(append);
1404
1405            // === Step 3: Extend to 50 bytes → slot 0 authoritative (len=50) ===
1406            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1407            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1408                .await
1409                .unwrap();
1410            append
1411                .append(&(31..=50).collect::<Vec<u8>>())
1412                .await
1413                .unwrap();
1414            append.sync().await.unwrap();
1415            drop(append);
1416
1417            // Verify slot 0 is now authoritative
1418            let (blob, size) = context.open("test_partition", b"slot0_prot").await.unwrap();
1419            let page = blob
1420                .read_at(0, physical_page_size)
1421                .await
1422                .unwrap()
1423                .coalesce();
1424            let crc = read_crc_record_from_page(page.as_ref());
1425            assert!(
1426                crc.len1 > crc.len2,
1427                "Slot 0 should be authoritative (len1={} > len2={})",
1428                crc.len1,
1429                crc.len2
1430            );
1431
1432            // Capture slot 0 bytes before mangling slot 1
1433            let slot0_before: Vec<u8> = blob
1434                .read_at(slot0_offset, 6)
1435                .await
1436                .unwrap()
1437                .coalesce()
1438                .freeze()
1439                .into();
1440
1441            // === Step 4: Mangle slot 1 (non-authoritative) ===
1442            blob.write_at(slot1_offset, DUMMY_MARKER.to_vec())
1443                .await
1444                .unwrap();
1445            blob.sync().await.unwrap();
1446
1447            // Verify mangle worked
1448            let slot1_mangled: Vec<u8> = blob
1449                .read_at(slot1_offset, 6)
1450                .await
1451                .unwrap()
1452                .coalesce()
1453                .freeze()
1454                .into();
1455            assert_eq!(slot1_mangled, DUMMY_MARKER, "Mangle failed");
1456
1457            // === Step 5: Extend to 70 bytes → new CRC goes to slot 1, slot 0 protected ===
1458            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1459                .await
1460                .unwrap();
1461            append
1462                .append(&(51..=70).collect::<Vec<u8>>())
1463                .await
1464                .unwrap();
1465            append.sync().await.unwrap();
1466            drop(append);
1467
1468            // === Step 6: Verify slot 1 was overwritten, slot 0 unchanged ===
1469            let (blob, _) = context.open("test_partition", b"slot0_prot").await.unwrap();
1470
1471            // Slot 1 should have new CRC (not our dummy marker)
1472            let slot1_after: Vec<u8> = blob
1473                .read_at(slot1_offset, 6)
1474                .await
1475                .unwrap()
1476                .coalesce()
1477                .freeze()
1478                .into();
1479            assert_ne!(
1480                slot1_after, DUMMY_MARKER,
1481                "Slot 1 should have been overwritten with new CRC"
1482            );
1483
1484            // Slot 0 should be UNCHANGED (protected)
1485            let slot0_after: Vec<u8> = blob
1486                .read_at(slot0_offset, 6)
1487                .await
1488                .unwrap()
1489                .coalesce()
1490                .freeze()
1491                .into();
1492            assert_eq!(
1493                slot0_before, slot0_after,
1494                "Slot 0 was modified! Protected region violated."
1495            );
1496
1497            // Verify the new CRC in slot 1 has len=70
1498            let page = blob
1499                .read_at(0, physical_page_size)
1500                .await
1501                .unwrap()
1502                .coalesce();
1503            let crc = read_crc_record_from_page(page.as_ref());
1504            assert_eq!(crc.len2, 70, "Slot 1 should have len=70");
1505        });
1506    }
1507
1508    /// Test that the data prefix is NOT overwritten when extending a partial page.
1509    ///
1510    /// Strategy: Write data, then mangle the padding area (between data end and CRC start).
1511    /// After extending, the original data should be unchanged but the mangled padding
1512    /// should be overwritten with new data.
1513    #[test_traced("DEBUG")]
1514    fn test_data_prefix_not_overwritten() {
1515        let executor = deterministic::Runner::default();
1516        executor.start(|context: deterministic::Context| async move {
1517            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1518            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1519
1520            // === Step 1: Write 20 bytes ===
1521            let (blob, _) = context
1522                .open("test_partition", b"prefix_test")
1523                .await
1524                .unwrap();
1525            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1526                .await
1527                .unwrap();
1528            let data1: Vec<u8> = (1..=20).collect();
1529            append.append(&data1).await.unwrap();
1530            append.sync().await.unwrap();
1531            drop(append);
1532
1533            // === Step 2: Capture the first 20 bytes and mangle bytes 25-30 (in padding area) ===
1534            let (blob, size) = context
1535                .open("test_partition", b"prefix_test")
1536                .await
1537                .unwrap();
1538            assert_eq!(size, physical_page_size as u64);
1539
1540            let prefix_before: Vec<u8> = blob
1541                .read_at(0, 20)
1542                .await
1543                .unwrap()
1544                .coalesce()
1545                .freeze()
1546                .into();
1547
1548            // Mangle bytes 25-30 (safely in the padding area, after our 20 bytes of data)
1549            blob.write_at(25, DUMMY_MARKER.to_vec()).await.unwrap();
1550            blob.sync().await.unwrap();
1551
1552            // === Step 3: Extend to 40 bytes ===
1553            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1554                .await
1555                .unwrap();
1556            append
1557                .append(&(21..=40).collect::<Vec<u8>>())
1558                .await
1559                .unwrap();
1560            append.sync().await.unwrap();
1561            drop(append);
1562
1563            // === Step 4: Verify prefix unchanged, mangled area overwritten ===
1564            let (blob, _) = context
1565                .open("test_partition", b"prefix_test")
1566                .await
1567                .unwrap();
1568
1569            // Original 20 bytes should be unchanged
1570            let prefix_after: Vec<u8> = blob
1571                .read_at(0, 20)
1572                .await
1573                .unwrap()
1574                .coalesce()
1575                .freeze()
1576                .into();
1577            assert_eq!(prefix_before, prefix_after, "Data prefix was modified!");
1578
1579            // Bytes at offset 25-30: data (21..=40) starts at offset 20, so offset 25 has value 26
1580            let overwritten: Vec<u8> = blob
1581                .read_at(25, 6)
1582                .await
1583                .unwrap()
1584                .coalesce()
1585                .freeze()
1586                .into();
1587            assert_eq!(
1588                overwritten,
1589                vec![26, 27, 28, 29, 30, 31],
1590                "New data should overwrite padding area"
1591            );
1592        });
1593    }
1594
1595    /// Test CRC slot protection when extending past a page boundary.
1596    ///
1597    /// Strategy: Write partial page, mangle slot 0 (non-authoritative after we do first extend),
1598    /// then extend past page boundary. Verify slot 0 gets new full-page CRC while
1599    /// the mangled marker is overwritten, and second page is written correctly.
1600    #[test_traced("DEBUG")]
1601    fn test_crc_slot_protection_across_page_boundary() {
1602        let executor = deterministic::Runner::default();
1603        executor.start(|context: deterministic::Context| async move {
1604            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1605            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1606            let slot0_offset = PAGE_SIZE.get() as u64;
1607            let slot1_offset = PAGE_SIZE.get() as u64 + 6;
1608
1609            // === Step 1: Write 50 bytes → slot 0 authoritative ===
1610            let (blob, _) = context.open("test_partition", b"boundary").await.unwrap();
1611            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1612                .await
1613                .unwrap();
1614            append.append(&(1..=50).collect::<Vec<u8>>()).await.unwrap();
1615            append.sync().await.unwrap();
1616            drop(append);
1617
1618            // === Step 2: Extend to 80 bytes → slot 1 authoritative ===
1619            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1620            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1621                .await
1622                .unwrap();
1623            append
1624                .append(&(51..=80).collect::<Vec<u8>>())
1625                .await
1626                .unwrap();
1627            append.sync().await.unwrap();
1628            drop(append);
1629
1630            // Verify slot 1 is authoritative
1631            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1632            let page = blob
1633                .read_at(0, physical_page_size)
1634                .await
1635                .unwrap()
1636                .coalesce();
1637            let crc = read_crc_record_from_page(page.as_ref());
1638            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1639
1640            // Capture slot 1 before extending past page boundary
1641            let slot1_before: Vec<u8> = blob
1642                .read_at(slot1_offset, 6)
1643                .await
1644                .unwrap()
1645                .coalesce()
1646                .freeze()
1647                .into();
1648
1649            // Mangle slot 0 (non-authoritative)
1650            blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
1651                .await
1652                .unwrap();
1653            blob.sync().await.unwrap();
1654
1655            // === Step 3: Extend past page boundary (80 + 40 = 120, PAGE_SIZE=103) ===
1656            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1657                .await
1658                .unwrap();
1659            append
1660                .append(&(81..=120).collect::<Vec<u8>>())
1661                .await
1662                .unwrap();
1663            append.sync().await.unwrap();
1664            drop(append);
1665
1666            // === Step 4: Verify results ===
1667            let (blob, size) = context.open("test_partition", b"boundary").await.unwrap();
1668            assert_eq!(size, (physical_page_size * 2) as u64, "Should have 2 pages");
1669
1670            // Slot 0 should have been overwritten with full-page CRC (not dummy marker)
1671            let slot0_after: Vec<u8> = blob
1672                .read_at(slot0_offset, 6)
1673                .await
1674                .unwrap()
1675                .coalesce()
1676                .freeze()
1677                .into();
1678            assert_ne!(
1679                slot0_after, DUMMY_MARKER,
1680                "Slot 0 should have full-page CRC"
1681            );
1682
1683            // Slot 1 should be UNCHANGED (protected during boundary crossing)
1684            let slot1_after: Vec<u8> = blob
1685                .read_at(slot1_offset, 6)
1686                .await
1687                .unwrap()
1688                .coalesce()
1689                .freeze()
1690                .into();
1691            assert_eq!(
1692                slot1_before, slot1_after,
1693                "Slot 1 was modified during page boundary crossing!"
1694            );
1695
1696            // Verify page 0 has correct CRC structure
1697            let page0 = blob
1698                .read_at(0, physical_page_size)
1699                .await
1700                .unwrap()
1701                .coalesce();
1702            let crc0 = read_crc_record_from_page(page0.as_ref());
1703            assert_eq!(
1704                crc0.len1,
1705                PAGE_SIZE.get(),
1706                "Slot 0 should have full page length"
1707            );
1708
1709            // Verify data integrity
1710            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1711                .await
1712                .unwrap();
1713            assert_eq!(append.size().await, 120);
1714            let all_data: Vec<u8> = append.read_at(0, 120).await.unwrap().coalesce().into();
1715            let expected: Vec<u8> = (1..=120).collect();
1716            assert_eq!(all_data, expected);
1717        });
1718    }
1719
1720    /// Test that corrupting the primary CRC (but not its length) causes fallback to the previous
1721    /// partial page contents.
1722    ///
1723    /// Strategy:
1724    /// 1. Write 10 bytes → slot 0 authoritative (len=10, valid crc)
1725    /// 2. Extend to 30 bytes → slot 1 authoritative (len=30, valid crc)
1726    /// 3. Corrupt ONLY the crc2 value in slot 1 (not the length)
1727    /// 4. Re-open and verify we fall back to slot 0's 10 bytes
1728    #[test_traced("DEBUG")]
1729    fn test_crc_fallback_on_corrupted_primary() {
1730        let executor = deterministic::Runner::default();
1731        executor.start(|context: deterministic::Context| async move {
1732            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1733            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1734            // crc2 is at offset: PAGE_SIZE + 6 (for len2) + 2 (skip len2 bytes) = PAGE_SIZE + 8
1735            let crc2_offset = PAGE_SIZE.get() as u64 + 8;
1736
1737            // === Step 1: Write 10 bytes → slot 0 authoritative (len=10) ===
1738            let (blob, _) = context
1739                .open("test_partition", b"crc_fallback")
1740                .await
1741                .unwrap();
1742            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1743                .await
1744                .unwrap();
1745            let data1: Vec<u8> = (1..=10).collect();
1746            append.append(&data1).await.unwrap();
1747            append.sync().await.unwrap();
1748            drop(append);
1749
1750            // === Step 2: Extend to 30 bytes → slot 1 authoritative (len=30) ===
1751            let (blob, size) = context
1752                .open("test_partition", b"crc_fallback")
1753                .await
1754                .unwrap();
1755            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1756                .await
1757                .unwrap();
1758            append
1759                .append(&(11..=30).collect::<Vec<u8>>())
1760                .await
1761                .unwrap();
1762            append.sync().await.unwrap();
1763            drop(append);
1764
1765            // Verify slot 1 is now authoritative and data reads correctly
1766            let (blob, size) = context
1767                .open("test_partition", b"crc_fallback")
1768                .await
1769                .unwrap();
1770            assert_eq!(size, physical_page_size as u64);
1771
1772            let page = blob
1773                .read_at(0, physical_page_size)
1774                .await
1775                .unwrap()
1776                .coalesce();
1777            let crc = read_crc_record_from_page(page.as_ref());
1778            assert!(
1779                crc.len2 > crc.len1,
1780                "Slot 1 should be authoritative (len2={} > len1={})",
1781                crc.len2,
1782                crc.len1
1783            );
1784            assert_eq!(crc.len2, 30, "Slot 1 should have len=30");
1785            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1786
1787            // Verify we can read all 30 bytes before corruption
1788            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1789                .await
1790                .unwrap();
1791            assert_eq!(append.size().await, 30);
1792            let all_data: Vec<u8> = append.read_at(0, 30).await.unwrap().coalesce().into();
1793            let expected: Vec<u8> = (1..=30).collect();
1794            assert_eq!(all_data, expected);
1795            drop(append);
1796
1797            // === Step 3: Corrupt ONLY crc2 (not len2) ===
1798            // crc2 is 4 bytes at offset PAGE_SIZE + 8
1799            blob.write_at(crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1800                .await
1801                .unwrap();
1802            blob.sync().await.unwrap();
1803
1804            // Verify corruption: len2 should still be 30, but crc2 is now garbage
1805            let page = blob
1806                .read_at(0, physical_page_size)
1807                .await
1808                .unwrap()
1809                .coalesce();
1810            let crc = read_crc_record_from_page(page.as_ref());
1811            assert_eq!(crc.len2, 30, "len2 should still be 30 after corruption");
1812            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be our corrupted value");
1813
1814            // === Step 4: Re-open and verify fallback to slot 0's 10 bytes ===
1815            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1816                .await
1817                .unwrap();
1818
1819            // Should fall back to 10 bytes (slot 0's length)
1820            assert_eq!(
1821                append.size().await,
1822                10,
1823                "Should fall back to slot 0's 10 bytes after primary CRC corruption"
1824            );
1825
1826            // Verify the data is the original 10 bytes
1827            let fallback_data: Vec<u8> = append.read_at(0, 10).await.unwrap().coalesce().into();
1828            assert_eq!(
1829                fallback_data, data1,
1830                "Fallback data should match original 10 bytes"
1831            );
1832
1833            // Reading beyond 10 bytes should fail
1834            let result = append.read_at(0, 11).await;
1835            assert!(result.is_err(), "Reading beyond fallback size should fail");
1836        });
1837    }
1838
1839    /// Test that corrupting a non-last page's primary CRC fails even if fallback is valid.
1840    ///
1841    /// Non-last pages must always be full. If the primary CRC is corrupted and the fallback
1842    /// indicates a partial page, validation should fail entirely (not fall back to partial).
1843    ///
1844    /// Strategy:
1845    /// 1. Write 10 bytes → slot 0 has len=10 (partial)
1846    /// 2. Extend to full page (103 bytes) → slot 1 has len=103 (full, authoritative)
1847    /// 3. Extend past page boundary (e.g., 110 bytes) → page 0 is now non-last
1848    /// 4. Corrupt the primary CRC of page 0 (slot 1's crc, which has len=103)
1849    /// 5. Re-open and verify that reading from page 0 fails (fallback has len=10, not full)
1850    #[test_traced("DEBUG")]
1851    fn test_non_last_page_rejects_partial_fallback() {
1852        let executor = deterministic::Runner::default();
1853        executor.start(|context: deterministic::Context| async move {
1854            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
1855            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
1856            // crc2 for page 0 is at offset: PAGE_SIZE + 8
1857            let page0_crc2_offset = PAGE_SIZE.get() as u64 + 8;
1858
1859            // === Step 1: Write 10 bytes → slot 0 has len=10 ===
1860            let (blob, _) = context
1861                .open("test_partition", b"non_last_page")
1862                .await
1863                .unwrap();
1864            let append = Append::new(blob, 0, BUFFER_SIZE, cache_ref.clone())
1865                .await
1866                .unwrap();
1867            append.append(&(1..=10).collect::<Vec<u8>>()).await.unwrap();
1868            append.sync().await.unwrap();
1869            drop(append);
1870
1871            // === Step 2: Extend to exactly full page (103 bytes) → slot 1 has len=103 ===
1872            let (blob, size) = context
1873                .open("test_partition", b"non_last_page")
1874                .await
1875                .unwrap();
1876            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1877                .await
1878                .unwrap();
1879            // Add bytes 11 through 103 (93 more bytes)
1880            append
1881                .append(&(11..=PAGE_SIZE.get() as u8).collect::<Vec<u8>>())
1882                .await
1883                .unwrap();
1884            append.sync().await.unwrap();
1885            drop(append);
1886
1887            // Verify page 0 slot 1 is authoritative with len=103 (full page)
1888            let (blob, size) = context
1889                .open("test_partition", b"non_last_page")
1890                .await
1891                .unwrap();
1892            let page = blob
1893                .read_at(0, physical_page_size)
1894                .await
1895                .unwrap()
1896                .coalesce();
1897            let crc = read_crc_record_from_page(page.as_ref());
1898            assert_eq!(crc.len1, 10, "Slot 0 should have len=10");
1899            assert_eq!(
1900                crc.len2,
1901                PAGE_SIZE.get(),
1902                "Slot 1 should have len=103 (full page)"
1903            );
1904            assert!(crc.len2 > crc.len1, "Slot 1 should be authoritative");
1905
1906            // === Step 3: Extend past page boundary (add 10 more bytes for total of 113) ===
1907            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1908                .await
1909                .unwrap();
1910            // Add bytes 104 through 113 (10 more bytes, now on page 1)
1911            append
1912                .append(&(104..=113).collect::<Vec<u8>>())
1913                .await
1914                .unwrap();
1915            append.sync().await.unwrap();
1916            drop(append);
1917
1918            // Verify we now have 2 pages
1919            let (blob, size) = context
1920                .open("test_partition", b"non_last_page")
1921                .await
1922                .unwrap();
1923            assert_eq!(
1924                size,
1925                (physical_page_size * 2) as u64,
1926                "Should have 2 physical pages"
1927            );
1928
1929            // Verify data is readable before corruption
1930            let append = Append::new(blob.clone(), size, BUFFER_SIZE, cache_ref.clone())
1931                .await
1932                .unwrap();
1933            assert_eq!(append.size().await, 113);
1934            let all_data: Vec<u8> = append.read_at(0, 113).await.unwrap().coalesce().into();
1935            let expected: Vec<u8> = (1..=113).collect();
1936            assert_eq!(all_data, expected);
1937            drop(append);
1938
1939            // === Step 4: Corrupt page 0's primary CRC (slot 1's crc2) ===
1940            blob.write_at(page0_crc2_offset, vec![0xDE, 0xAD, 0xBE, 0xEF])
1941                .await
1942                .unwrap();
1943            blob.sync().await.unwrap();
1944
1945            // Verify corruption: page 0's slot 1 still has len=103 but bad CRC
1946            let page = blob
1947                .read_at(0, physical_page_size)
1948                .await
1949                .unwrap()
1950                .coalesce();
1951            let crc = read_crc_record_from_page(page.as_ref());
1952            assert_eq!(crc.len2, PAGE_SIZE.get(), "len2 should still be 103");
1953            assert_eq!(crc.crc2, 0xDEADBEEF, "crc2 should be corrupted");
1954            // Slot 0 fallback has len=10 (partial), which is invalid for non-last page
1955            assert_eq!(crc.len1, 10, "Fallback slot 0 has partial length");
1956
1957            // === Step 5: Re-open and try to read from page 0 ===
1958            // The first page's primary CRC is bad, and fallback indicates partial (len=10).
1959            // Since page 0 is not the last page, a partial fallback is invalid.
1960            // Reading from page 0 should fail because the fallback CRC indicates a partial
1961            // page, which is not allowed for non-last pages.
1962            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1963                .await
1964                .unwrap();
1965
1966            // The blob still reports 113 bytes because init only validates the last page.
1967            // But reading from page 0 should fail because the CRC fallback is partial.
1968            assert_eq!(append.size().await, 113);
1969
1970            // Try to read from page 0 - this should fail with InvalidChecksum because
1971            // the fallback CRC has len=10 (partial), which is invalid for a non-last page.
1972            let result = append.read_at(0, 10).await;
1973            assert!(
1974                result.is_err(),
1975                "Reading from corrupted non-last page via Append should fail, but got: {:?}",
1976                result
1977            );
1978            drop(append);
1979
1980            // Also verify that reading via Replay fails the same way.
1981            let (blob, size) = context
1982                .open("test_partition", b"non_last_page")
1983                .await
1984                .unwrap();
1985            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
1986                .await
1987                .unwrap();
1988            let mut replay = append.replay(NZUsize!(1024)).await.unwrap();
1989
1990            // Try to fill pages - should fail on CRC validation.
1991            let result = replay.ensure(1).await;
1992            assert!(
1993                result.is_err(),
1994                "Reading from corrupted non-last page via Replay should fail, but got: {:?}",
1995                result
1996            );
1997        });
1998    }
1999
2000    #[test]
2001    fn test_resize_shrink_validates_crc() {
2002        // Verify that shrinking a blob to a partial page validates the CRC, rather than
2003        // blindly reading raw bytes which could silently load corrupted data.
2004        let executor = deterministic::Runner::default();
2005
2006        executor.start(|context| async move {
2007            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2008            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2009
2010            let (blob, size) = context
2011                .open("test_partition", b"resize_crc_test")
2012                .await
2013                .unwrap();
2014
2015            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2016                .await
2017                .unwrap();
2018
2019            // Write data across 3 pages: page 0 (full), page 1 (full), page 2 (partial).
2020            // PAGE_SIZE = 103, so 250 bytes = 103 + 103 + 44.
2021            let data: Vec<u8> = (0..=249).collect();
2022            append.append(&data).await.unwrap();
2023            append.sync().await.unwrap();
2024            assert_eq!(append.size().await, 250);
2025            drop(append);
2026
2027            // Corrupt the CRC record of page 1 (middle page).
2028            let (blob, size) = context
2029                .open("test_partition", b"resize_crc_test")
2030                .await
2031                .unwrap();
2032            assert_eq!(size as usize, physical_page_size * 3);
2033
2034            // Page 1 CRC record is at the end of the second physical page.
2035            let page1_crc_offset = (physical_page_size * 2 - CHECKSUM_SIZE as usize) as u64;
2036            blob.write_at(page1_crc_offset, vec![0xFF; CHECKSUM_SIZE as usize])
2037                .await
2038                .unwrap();
2039            blob.sync().await.unwrap();
2040
2041            // Open the blob - Append::new() validates the LAST page (page 2), which is still valid.
2042            // So it should open successfully with size 250.
2043            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2044                .await
2045                .unwrap();
2046            assert_eq!(append.size().await, 250);
2047
2048            // Try to shrink to 150 bytes, which ends in page 1 (the corrupted page).
2049            // 150 bytes = page 0 (103 full) + page 1 (47 partial).
2050            // This should fail because page 1's CRC is corrupted.
2051            let result = append.resize(150).await;
2052            assert!(
2053                matches!(result, Err(crate::Error::InvalidChecksum)),
2054                "Expected InvalidChecksum when shrinking to corrupted page, got: {:?}",
2055                result
2056            );
2057        });
2058    }
2059
2060    #[test]
2061    fn test_reopen_partial_tail_append_and_resize() {
2062        let executor = deterministic::Runner::default();
2063
2064        executor.start(|context| async move {
2065            const PAGE_SIZE: NonZeroU16 = NZU16!(64);
2066            const BUFFER_SIZE: usize = 256;
2067
2068            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(4));
2069
2070            let (blob, size) = context
2071                .open("test_partition", b"partial_tail_test")
2072                .await
2073                .unwrap();
2074
2075            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2076                .await
2077                .unwrap();
2078
2079            // Write some initial data.
2080            append.append(&[1, 2, 3, 4, 5]).await.unwrap();
2081            append.sync().await.unwrap();
2082            assert_eq!(append.size().await, 5);
2083            drop(append);
2084
2085            let (blob, size) = context
2086                .open("test_partition", b"partial_tail_test")
2087                .await
2088                .unwrap();
2089
2090            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2091                .await
2092                .unwrap();
2093            assert_eq!(append.size().await, 5);
2094
2095            append.append(&[6, 7, 8]).await.unwrap();
2096            append.resize(6).await.unwrap();
2097            append.sync().await.unwrap();
2098
2099            let data: Vec<u8> = append.read_at(0, 6).await.unwrap().coalesce().into();
2100            assert_eq!(data, vec![1, 2, 3, 4, 5, 6]);
2101        });
2102    }
2103
2104    #[test]
2105    fn test_corrupted_crc_len_too_large() {
2106        let executor = deterministic::Runner::default();
2107
2108        executor.start(|context| async move {
2109            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2110            let physical_page_size = PAGE_SIZE.get() as usize + CHECKSUM_SIZE as usize;
2111
2112            // Step 1: Create blob with valid data
2113            let (blob, size) = context
2114                .open("test_partition", b"crc_len_test")
2115                .await
2116                .unwrap();
2117
2118            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2119                .await
2120                .unwrap();
2121
2122            append.append(&[0x42; 50]).await.unwrap();
2123            append.sync().await.unwrap();
2124            drop(append);
2125
2126            // Step 2: Corrupt the CRC record to have len > page_size
2127            let (blob, size) = context
2128                .open("test_partition", b"crc_len_test")
2129                .await
2130                .unwrap();
2131            assert_eq!(size as usize, physical_page_size);
2132
2133            // CRC record is at the end of the physical page
2134            let crc_offset = PAGE_SIZE.get() as u64;
2135
2136            // Create a CRC record with len1 = 0xFFFF (65535), which is >> page_size (103)
2137            // Format: [len1_hi, len1_lo, crc1 (4 bytes), len2_hi, len2_lo, crc2 (4 bytes)]
2138            let bad_crc_record: [u8; 12] = [
2139                0xFF, 0xFF, // len1 = 65535 (way too large)
2140                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
2141                0x00, 0x00, // len2 = 0
2142                0x00, 0x00, 0x00, 0x00, // crc2 = 0
2143            ];
2144            blob.write_at(crc_offset, bad_crc_record.to_vec())
2145                .await
2146                .unwrap();
2147            blob.sync().await.unwrap();
2148
2149            // Step 3: Try to open the blob - should NOT panic, should return error or handle gracefully
2150            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2151
2152            // Either returns InvalidChecksum error OR truncates the corrupted data
2153            // (both are acceptable behaviors - panicking is NOT acceptable)
2154            match result {
2155                Ok(append) => {
2156                    // If it opens successfully, the corrupted page should have been truncated
2157                    let recovered_size = append.size().await;
2158                    assert_eq!(
2159                        recovered_size, 0,
2160                        "Corrupted page should be truncated, size should be 0"
2161                    );
2162                }
2163                Err(e) => {
2164                    // Error is also acceptable
2165                    assert!(
2166                        matches!(e, crate::Error::InvalidChecksum),
2167                        "Expected InvalidChecksum error, got: {:?}",
2168                        e
2169                    );
2170                }
2171            }
2172        });
2173    }
2174
2175    #[test]
2176    fn test_corrupted_crc_both_slots_len_too_large() {
2177        let executor = deterministic::Runner::default();
2178
2179        executor.start(|context| async move {
2180            let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2181
2182            // Step 1: Create blob with valid data
2183            let (blob, size) = context
2184                .open("test_partition", b"crc_both_bad")
2185                .await
2186                .unwrap();
2187
2188            let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2189                .await
2190                .unwrap();
2191
2192            append.append(&[0x42; 50]).await.unwrap();
2193            append.sync().await.unwrap();
2194            drop(append);
2195
2196            // Step 2: Corrupt BOTH CRC slots to have len > page_size
2197            let (blob, size) = context
2198                .open("test_partition", b"crc_both_bad")
2199                .await
2200                .unwrap();
2201
2202            let crc_offset = PAGE_SIZE.get() as u64;
2203
2204            // Both slots have len > page_size
2205            let bad_crc_record: [u8; 12] = [
2206                0x01, 0x00, // len1 = 256 (> 103)
2207                0xDE, 0xAD, 0xBE, 0xEF, // crc1 (garbage)
2208                0x02, 0x00, // len2 = 512 (> 103)
2209                0xCA, 0xFE, 0xBA, 0xBE, // crc2 (garbage)
2210            ];
2211            blob.write_at(crc_offset, bad_crc_record.to_vec())
2212                .await
2213                .unwrap();
2214            blob.sync().await.unwrap();
2215
2216            // Step 3: Try to open - should NOT panic
2217            let result = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()).await;
2218
2219            match result {
2220                Ok(append) => {
2221                    // Corrupted page truncated
2222                    assert_eq!(append.size().await, 0);
2223                }
2224                Err(e) => {
2225                    assert!(
2226                        matches!(e, crate::Error::InvalidChecksum),
2227                        "Expected InvalidChecksum, got: {:?}",
2228                        e
2229                    );
2230                }
2231            }
2232        });
2233    }
2234}