Skip to main content

commonware_runtime/utils/buffer/paged/
read.rs

1use super::Checksum;
2use crate::{Blob, Buf, Error, IoBufMut};
3use commonware_codec::FixedSize;
4use std::{collections::VecDeque, num::NonZeroU16};
5use tracing::error;
6
7/// State for a single buffer of pages read from the blob.
8///
9/// Each fill produces one `BufferState` containing all pages read in that batch.
10/// Navigation skips CRCs by computing offsets rather than creating separate
11/// `Bytes` slices per page.
12pub(super) struct BufferState {
13    /// The raw physical buffer containing pages with interleaved CRCs.
14    buffer: Vec<u8>,
15    /// Number of pages in this buffer.
16    num_pages: usize,
17    /// Logical length of the last page (may be partial).
18    last_page_len: usize,
19}
20
21/// Async I/O component that prefetches pages and validates CRCs.
22///
23/// This handles reading batches of pages from the blob, validating their
24/// checksums, and producing `BufferState` for the sync buffering layer.
25pub(super) struct PageReader<B: Blob> {
26    /// The underlying blob to read from.
27    blob: B,
28    /// Physical page size (logical_page_size + CHECKSUM_SIZE).
29    page_size: usize,
30    /// Logical page size (data bytes per page, not including CRC).
31    logical_page_size: usize,
32    /// The physical size of the blob.
33    physical_blob_size: u64,
34    /// The logical size of the blob.
35    logical_blob_size: u64,
36    /// Next page index to read from the blob.
37    blob_page: u64,
38    /// Number of pages to prefetch at once.
39    prefetch_count: usize,
40}
41
42impl<B: Blob> PageReader<B> {
43    /// Creates a new PageReader.
44    ///
45    /// The `physical_blob_size` must already exclude any trailing invalid data
46    /// (e.g., junk pages from an interrupted write). Each physical page is the same
47    /// size on disk, but the CRC record indicates how much logical data it contains.
48    /// The last page may be logically partial (CRC length < logical page size), but
49    /// all preceding pages must be logically full. A logically partial non-last page
50    /// indicates corruption and will cause an `Error::InvalidChecksum`.
51    pub(super) const fn new(
52        blob: B,
53        physical_blob_size: u64,
54        logical_blob_size: u64,
55        prefetch_count: usize,
56        logical_page_size: NonZeroU16,
57    ) -> Self {
58        let logical_page_size = logical_page_size.get() as usize;
59        let page_size = logical_page_size + Checksum::SIZE;
60
61        Self {
62            blob,
63            page_size,
64            logical_page_size,
65            physical_blob_size,
66            logical_blob_size,
67            blob_page: 0,
68            prefetch_count,
69        }
70    }
71
72    /// Returns the logical size of the blob.
73    pub(super) const fn blob_size(&self) -> u64 {
74        self.logical_blob_size
75    }
76
77    /// Returns the physical page size.
78    pub(super) const fn page_size(&self) -> usize {
79        self.page_size
80    }
81
82    /// Returns the logical page size.
83    pub(super) const fn logical_page_size(&self) -> usize {
84        self.logical_page_size
85    }
86
87    /// Fills a buffer with the next batch of pages.
88    ///
89    /// Returns `Some((BufferState, logical_bytes))` if data was loaded,
90    /// `None` if no more data available.
91    pub(super) async fn fill(&mut self) -> Result<Option<(BufferState, usize)>, Error> {
92        // Calculate physical read offset
93        let start_offset = match self.blob_page.checked_mul(self.page_size as u64) {
94            Some(o) => o,
95            None => return Err(Error::OffsetOverflow),
96        };
97        if start_offset >= self.physical_blob_size {
98            return Ok(None); // No more data
99        }
100
101        // Calculate how many pages to read
102        let remaining_physical = (self.physical_blob_size - start_offset) as usize;
103        let max_pages = remaining_physical / self.page_size;
104        let pages_to_read = max_pages.min(self.prefetch_count);
105        if pages_to_read == 0 {
106            return Ok(None);
107        }
108        let bytes_to_read = pages_to_read * self.page_size;
109
110        // Read physical data
111        let buf = IoBufMut::zeroed(bytes_to_read);
112        let physical_buf = self
113            .blob
114            .read_at(start_offset, buf)
115            .await?
116            .coalesce()
117            .freeze();
118
119        // Validate CRCs and compute total logical bytes
120        let mut total_logical = 0usize;
121        let mut last_len = 0usize;
122        let is_final_batch = pages_to_read == max_pages;
123        for page_idx in 0..pages_to_read {
124            let page_start = page_idx * self.page_size;
125            let page_slice = &physical_buf.as_ref()[page_start..page_start + self.page_size];
126            let Some(record) = Checksum::validate_page(page_slice) else {
127                error!(page = self.blob_page + page_idx as u64, "CRC mismatch");
128                return Err(Error::InvalidChecksum);
129            };
130            let (len, _) = record.get_crc();
131            let len = len as usize;
132
133            // Only the final page in the blob may have partial length
134            let is_last_page_in_blob = is_final_batch && page_idx + 1 == pages_to_read;
135            if !is_last_page_in_blob && len != self.logical_page_size {
136                error!(
137                    page = self.blob_page + page_idx as u64,
138                    expected = self.logical_page_size,
139                    actual = len,
140                    "non-last page has partial length"
141                );
142                return Err(Error::InvalidChecksum);
143            }
144
145            total_logical += len;
146            last_len = len;
147        }
148        self.blob_page += pages_to_read as u64;
149
150        let state = BufferState {
151            buffer: physical_buf.into(),
152            num_pages: pages_to_read,
153            last_page_len: last_len,
154        };
155
156        Ok(Some((state, total_logical)))
157    }
158}
159
160/// Sync buffering component that implements the `Buf` trait.
161///
162/// This accumulates `BufferState` from multiple fills and provides navigation
163/// across pages while skipping CRCs. Consumed buffers are cleaned up in
164/// `advance()`.
165struct ReplayBuf {
166    /// Physical page size (logical_page_size + CHECKSUM_SIZE).
167    page_size: usize,
168    /// Logical page size (data bytes per page, not including CRC).
169    logical_page_size: usize,
170    /// Accumulated buffers from fills.
171    buffers: VecDeque<BufferState>,
172    /// Current page index within the front buffer.
173    current_page: usize,
174    /// Current offset within the current page's logical data.
175    offset_in_page: usize,
176    /// Total remaining logical bytes across all buffers.
177    remaining: usize,
178}
179
180impl ReplayBuf {
181    /// Creates a new ReplayBuf.
182    const fn new(page_size: usize, logical_page_size: usize) -> Self {
183        Self {
184            page_size,
185            logical_page_size,
186            buffers: VecDeque::new(),
187            current_page: 0,
188            offset_in_page: 0,
189            remaining: 0,
190        }
191    }
192
193    /// Clears the buffer and resets the read offset to 0.
194    fn clear(&mut self) {
195        self.buffers.clear();
196        self.current_page = 0;
197        self.offset_in_page = 0;
198        self.remaining = 0;
199    }
200
201    /// Adds a buffer from a fill operation.
202    fn push(&mut self, state: BufferState, logical_bytes: usize) {
203        // If buffers is empty, this is the first fill after a seek.
204        // Skip bytes before the seek offset (offset_in_page).
205        let skip = if self.buffers.is_empty() {
206            self.offset_in_page
207        } else {
208            0
209        };
210        self.buffers.push_back(state);
211        self.remaining += logical_bytes.saturating_sub(skip);
212    }
213
214    /// Returns the logical length of the given page in the given buffer.
215    const fn page_len(buf: &BufferState, page_idx: usize, logical_page_size: usize) -> usize {
216        if page_idx + 1 == buf.num_pages {
217            buf.last_page_len
218        } else {
219            logical_page_size
220        }
221    }
222}
223
224impl Buf for ReplayBuf {
225    fn remaining(&self) -> usize {
226        self.remaining
227    }
228
229    fn chunk(&self) -> &[u8] {
230        let Some(buf) = self.buffers.front() else {
231            return &[];
232        };
233        if self.current_page >= buf.num_pages {
234            return &[];
235        }
236        let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
237        let physical_start = self.current_page * self.page_size + self.offset_in_page;
238        let physical_end = self.current_page * self.page_size + page_len;
239        &buf.buffer[physical_start..physical_end]
240    }
241
242    fn advance(&mut self, mut cnt: usize) {
243        self.remaining = self.remaining.saturating_sub(cnt);
244
245        while cnt > 0 {
246            let Some(buf) = self.buffers.front() else {
247                break;
248            };
249
250            // Advance within current buffer
251            while cnt > 0 && self.current_page < buf.num_pages {
252                let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
253                let available = page_len - self.offset_in_page;
254                if cnt < available {
255                    self.offset_in_page += cnt;
256                    return;
257                }
258                cnt -= available;
259                self.current_page += 1;
260                self.offset_in_page = 0;
261            }
262
263            // Current buffer exhausted, move to next
264            if self.current_page >= buf.num_pages {
265                self.buffers.pop_front();
266                self.current_page = 0;
267                self.offset_in_page = 0;
268            }
269        }
270    }
271}
272
273/// Replays logical data from a blob containing pages with interleaved CRCs.
274///
275/// This combines async I/O (`PageReader`) with sync buffering (`ReplayBuf`)
276/// to provide an `ensure(n)` + `Buf` interface for codec decoding.
277pub struct Replay<B: Blob> {
278    /// Async I/O component.
279    reader: PageReader<B>,
280    /// Sync buffering component.
281    buffer: ReplayBuf,
282    /// Whether the blob has been fully read.
283    exhausted: bool,
284}
285
286impl<B: Blob> Replay<B> {
287    /// Creates a new Replay from a PageReader.
288    pub(super) const fn new(reader: PageReader<B>) -> Self {
289        let page_size = reader.page_size();
290        let logical_page_size = reader.logical_page_size();
291        Self {
292            reader,
293            buffer: ReplayBuf::new(page_size, logical_page_size),
294            exhausted: false,
295        }
296    }
297
298    /// Returns the logical size of the blob.
299    pub const fn blob_size(&self) -> u64 {
300        self.reader.blob_size()
301    }
302
303    /// Returns true if the reader has been exhausted (no more pages to read).
304    ///
305    /// When exhausted, the buffer may still contain data that hasn't been consumed.
306    /// Callers should check `remaining()` to see if there's data left to process.
307    pub const fn is_exhausted(&self) -> bool {
308        self.exhausted
309    }
310
311    /// Ensures at least `n` bytes are available in the buffer.
312    ///
313    /// This method fills the buffer from the blob until either:
314    /// - At least `n` bytes are available (returns `Ok(true)`)
315    /// - The blob is exhausted with fewer than `n` bytes (returns `Ok(false)`)
316    /// - A read error occurs (returns `Err`)
317    ///
318    /// When `Ok(false)` is returned, callers should still attempt to process
319    /// the remaining bytes in the buffer (check `remaining()`), as they may
320    /// contain valid data that doesn't require the full `n` bytes.
321    pub async fn ensure(&mut self, n: usize) -> Result<bool, Error> {
322        while self.buffer.remaining < n && !self.exhausted {
323            match self.reader.fill().await? {
324                Some((state, logical_bytes)) => {
325                    self.buffer.push(state, logical_bytes);
326                }
327                None => {
328                    self.exhausted = true;
329                }
330            }
331        }
332        Ok(self.buffer.remaining >= n)
333    }
334
335    /// Seeks to `offset` in the blob, returning `Err(BlobInsufficientLength)` if `offset` exceeds
336    /// the blob size.
337    pub async fn seek_to(&mut self, offset: u64) -> Result<(), Error> {
338        if offset > self.reader.blob_size() {
339            return Err(Error::BlobInsufficientLength);
340        }
341
342        self.buffer.clear();
343        self.exhausted = false;
344
345        let page_size = self.reader.logical_page_size as u64;
346        self.reader.blob_page = offset / page_size;
347        self.buffer.current_page = 0;
348        self.buffer.offset_in_page = (offset % page_size) as usize;
349
350        Ok(())
351    }
352}
353
354impl<B: Blob> Buf for Replay<B> {
355    fn remaining(&self) -> usize {
356        self.buffer.remaining()
357    }
358
359    fn chunk(&self) -> &[u8] {
360        self.buffer.chunk()
361    }
362
363    fn advance(&mut self, cnt: usize) {
364        self.buffer.advance(cnt);
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::{super::append::Append, *};
371    use crate::{deterministic, Runner as _, Storage as _};
372    use commonware_macros::test_traced;
373    use commonware_utils::{NZUsize, NZU16};
374
375    const PAGE_SIZE: NonZeroU16 = NZU16!(103);
376    const BUFFER_PAGES: usize = 2;
377
378    #[test_traced("DEBUG")]
379    fn test_replay_basic() {
380        let executor = deterministic::Runner::default();
381        executor.start(|context: deterministic::Context| async move {
382            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
383            assert_eq!(blob_size, 0);
384
385            let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
386            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
387                .await
388                .unwrap();
389
390            // Write data spanning multiple pages
391            let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
392            append.append(&data).await.unwrap();
393            append.sync().await.unwrap();
394
395            // Create Replay
396            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
397
398            // Ensure all data is available
399            replay.ensure(300).await.unwrap();
400
401            // Verify we got all the data
402            assert_eq!(replay.remaining(), 300);
403
404            // Read all data via Buf interface
405            let mut collected = Vec::new();
406            while replay.remaining() > 0 {
407                let chunk = replay.chunk();
408                collected.extend_from_slice(chunk);
409                let len = chunk.len();
410                replay.advance(len);
411            }
412            assert_eq!(collected, data);
413        });
414    }
415
416    #[test_traced("DEBUG")]
417    fn test_replay_partial_page() {
418        let executor = deterministic::Runner::default();
419        executor.start(|context: deterministic::Context| async move {
420            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
421
422            let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
423            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
424                .await
425                .unwrap();
426
427            // Write data that doesn't fill the last page
428            let data: Vec<u8> = (1u8..=(PAGE_SIZE.get() + 10) as u8).collect();
429            append.append(&data).await.unwrap();
430            append.sync().await.unwrap();
431
432            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
433
434            // Ensure all data is available
435            replay.ensure(data.len()).await.unwrap();
436
437            assert_eq!(replay.remaining(), data.len());
438        });
439    }
440
441    #[test_traced("DEBUG")]
442    fn test_replay_cross_buffer_boundary() {
443        // Use prefetch_count=1 to force separate BufferStates per page.
444        // This tests navigation across multiple BufferStates in the VecDeque.
445        let executor = deterministic::Runner::default();
446        executor.start(|context: deterministic::Context| async move {
447            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
448            assert_eq!(blob_size, 0);
449
450            let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
451            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
452                .await
453                .unwrap();
454
455            // Write data spanning 4 pages (4 * 103 = 412 bytes, with last page partial)
456            let data: Vec<u8> = (0u8..=255).cycle().take(400).collect();
457            append.append(&data).await.unwrap();
458            append.sync().await.unwrap();
459
460            // Create Replay with buffer size that results in prefetch_count=1.
461            // Physical page size = 103 + 12 = 115 bytes.
462            // Buffer size of 115 gives prefetch_pages = 115/115 = 1.
463            let mut replay = append.replay(NZUsize!(115)).await.unwrap();
464
465            // Ensure all data - this requires 4 separate fill() calls (one per page).
466            // Each fill() creates a new BufferState, so we'll have 4 BufferStates.
467            assert!(replay.ensure(400).await.unwrap());
468            assert_eq!(replay.remaining(), 400);
469
470            // Read all data via Buf interface, verifying navigation across BufferStates.
471            let mut collected = Vec::new();
472            let mut chunks_read = 0;
473            while replay.remaining() > 0 {
474                let chunk = replay.chunk();
475                assert!(
476                    !chunk.is_empty(),
477                    "chunk() returned empty but remaining > 0"
478                );
479                collected.extend_from_slice(chunk);
480                let len = chunk.len();
481                replay.advance(len);
482                chunks_read += 1;
483            }
484
485            assert_eq!(collected, data);
486            // With prefetch_count=1 and 4 pages, we expect at least 4 chunks
487            // (one per page, though partial reads could result in more).
488            assert!(
489                chunks_read >= 4,
490                "Expected at least 4 chunks for 4 pages, got {}",
491                chunks_read
492            );
493        });
494    }
495
496    #[test_traced("DEBUG")]
497    fn test_replay_empty_blob() {
498        // Test that replaying an empty blob works correctly.
499        // ensure() should return Ok(false) when no data is available.
500        let executor = deterministic::Runner::default();
501        executor.start(|context: deterministic::Context| async move {
502            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
503            assert_eq!(blob_size, 0);
504
505            let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
506            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
507                .await
508                .unwrap();
509
510            // Don't write any data - blob remains empty
511            assert_eq!(append.size().await, 0);
512
513            // Create Replay on empty blob
514            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
515
516            // Verify initial state - remaining is 0, but not yet marked exhausted
517            // (exhausted is set after first fill attempt)
518            assert_eq!(replay.remaining(), 0);
519
520            // ensure(0) should succeed (we have >= 0 bytes)
521            assert!(replay.ensure(0).await.unwrap());
522
523            // ensure(1) should return Ok(false) - not enough data, and marks exhausted
524            assert!(!replay.ensure(1).await.unwrap());
525
526            // Now should be marked as exhausted after the fill attempt
527            assert!(replay.is_exhausted());
528
529            // chunk() should return empty slice
530            assert!(replay.chunk().is_empty());
531
532            // remaining should still be 0
533            assert_eq!(replay.remaining(), 0);
534        });
535    }
536
537    #[test_traced("DEBUG")]
538    fn test_replay_seek_to() {
539        let executor = deterministic::Runner::default();
540        executor.start(|context: deterministic::Context| async move {
541            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
542
543            let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
544            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
545                .await
546                .unwrap();
547
548            // Write data spanning multiple pages
549            let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
550            append.append(&data).await.unwrap();
551            append.sync().await.unwrap();
552
553            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
554
555            // Seek forward, read, then seek backward
556            replay.seek_to(150).await.unwrap();
557            replay.ensure(50).await.unwrap();
558            assert_eq!(replay.get_u8(), data[150]);
559
560            // Seek back to start
561            replay.seek_to(0).await.unwrap();
562            replay.ensure(1).await.unwrap();
563            assert_eq!(replay.get_u8(), data[0]);
564
565            // Seek beyond blob size should error
566            assert!(replay.seek_to(data.len() as u64 + 1).await.is_err());
567
568            // Test that remaining() is correct after seek by reading all data.
569            let seek_offset = 150usize;
570            replay.seek_to(seek_offset as u64).await.unwrap();
571            let expected_remaining = data.len() - seek_offset;
572            // Read all bytes and verify content
573            let mut collected = Vec::new();
574            loop {
575                // Load more data if needed
576                if !replay.ensure(1).await.unwrap() {
577                    break; // No more data available
578                }
579                let chunk = replay.chunk();
580                if chunk.is_empty() {
581                    break;
582                }
583                collected.extend_from_slice(chunk);
584                let len = chunk.len();
585                replay.advance(len);
586            }
587            assert_eq!(
588                collected.len(),
589                expected_remaining,
590                "After seeking to {}, should read {} bytes but got {}",
591                seek_offset,
592                expected_remaining,
593                collected.len()
594            );
595            assert_eq!(collected, &data[seek_offset..]);
596        });
597    }
598}