Skip to main content

commonware_runtime/utils/buffer/paged/
read.rs

1use super::Checksum;
2use crate::{Blob, Buf, Error, IoBuf};
3use commonware_codec::FixedSize;
4use std::{collections::VecDeque, num::NonZeroU16};
5use tracing::error;
6
7/// State for a single buffer of pages read from the blob.
8///
9/// Each fill produces one `BufferState` containing all pages read in that batch.
10/// Navigation skips CRCs by computing offsets rather than creating separate
11/// `Bytes` slices per page.
12pub(super) struct BufferState {
13    /// The raw physical buffer containing pages with interleaved CRCs.
14    buffer: IoBuf,
15    /// Number of pages in this buffer.
16    num_pages: usize,
17    /// Logical length of the last page (may be partial).
18    last_page_len: usize,
19}
20
21/// Async I/O component that prefetches pages and validates CRCs.
22///
23/// This handles reading batches of pages from the blob, validating their
24/// checksums, and producing `BufferState` for the sync buffering layer.
25pub(super) struct PageReader<B: Blob> {
26    /// The underlying blob to read from.
27    blob: B,
28    /// Physical page size (logical_page_size + CHECKSUM_SIZE).
29    page_size: usize,
30    /// Logical page size (data bytes per page, not including CRC).
31    logical_page_size: usize,
32    /// The physical size of the blob.
33    physical_blob_size: u64,
34    /// The logical size of the blob.
35    logical_blob_size: u64,
36    /// Next page index to read from the blob.
37    blob_page: u64,
38    /// Number of pages to prefetch at once.
39    prefetch_count: usize,
40}
41
42impl<B: Blob> PageReader<B> {
43    /// Creates a new PageReader.
44    ///
45    /// The `physical_blob_size` must already exclude any trailing invalid data
46    /// (e.g., junk pages from an interrupted write). Each physical page is the same
47    /// size on disk, but the CRC record indicates how much logical data it contains.
48    /// The last page may be logically partial (CRC length < logical page size), but
49    /// all preceding pages must be logically full. A logically partial non-last page
50    /// indicates corruption and will cause an `Error::InvalidChecksum`.
51    pub(super) const fn new(
52        blob: B,
53        physical_blob_size: u64,
54        logical_blob_size: u64,
55        prefetch_count: usize,
56        logical_page_size: NonZeroU16,
57    ) -> Self {
58        let logical_page_size = logical_page_size.get() as usize;
59        let page_size = logical_page_size + Checksum::SIZE;
60
61        Self {
62            blob,
63            page_size,
64            logical_page_size,
65            physical_blob_size,
66            logical_blob_size,
67            blob_page: 0,
68            prefetch_count,
69        }
70    }
71
72    /// Returns the logical size of the blob.
73    pub(super) const fn blob_size(&self) -> u64 {
74        self.logical_blob_size
75    }
76
77    /// Returns the physical page size.
78    pub(super) const fn page_size(&self) -> usize {
79        self.page_size
80    }
81
82    /// Returns the logical page size.
83    pub(super) const fn logical_page_size(&self) -> usize {
84        self.logical_page_size
85    }
86
87    /// Fills a buffer with the next batch of pages.
88    ///
89    /// Returns `Some((BufferState, logical_bytes))` if data was loaded,
90    /// `None` if no more data available.
91    pub(super) async fn fill(&mut self) -> Result<Option<(BufferState, usize)>, Error> {
92        // Calculate physical read offset
93        let start_offset = match self.blob_page.checked_mul(self.page_size as u64) {
94            Some(o) => o,
95            None => return Err(Error::OffsetOverflow),
96        };
97        if start_offset >= self.physical_blob_size {
98            return Ok(None); // No more data
99        }
100
101        // Calculate how many pages to read
102        let remaining_physical = (self.physical_blob_size - start_offset) as usize;
103        let max_pages = remaining_physical / self.page_size;
104        let pages_to_read = max_pages.min(self.prefetch_count);
105        if pages_to_read == 0 {
106            return Ok(None);
107        }
108        let bytes_to_read = pages_to_read * self.page_size;
109
110        // Read physical data
111        let physical_buf = self
112            .blob
113            .read_at(start_offset, bytes_to_read)
114            .await?
115            .coalesce()
116            .freeze();
117
118        // Validate CRCs and compute total logical bytes
119        let mut total_logical = 0usize;
120        let mut last_len = 0usize;
121        let is_final_batch = pages_to_read == max_pages;
122        for page_idx in 0..pages_to_read {
123            let page_start = page_idx * self.page_size;
124            let page_slice = &physical_buf.as_ref()[page_start..page_start + self.page_size];
125            let Some(record) = Checksum::validate_page(page_slice) else {
126                error!(page = self.blob_page + page_idx as u64, "CRC mismatch");
127                return Err(Error::InvalidChecksum);
128            };
129            let (len, _) = record.get_crc();
130            let len = len as usize;
131
132            // Only the final page in the blob may have partial length
133            let is_last_page_in_blob = is_final_batch && page_idx + 1 == pages_to_read;
134            if !is_last_page_in_blob && len != self.logical_page_size {
135                error!(
136                    page = self.blob_page + page_idx as u64,
137                    expected = self.logical_page_size,
138                    actual = len,
139                    "non-last page has partial length"
140                );
141                return Err(Error::InvalidChecksum);
142            }
143
144            total_logical += len;
145            last_len = len;
146        }
147        self.blob_page += pages_to_read as u64;
148
149        let state = BufferState {
150            buffer: physical_buf,
151            num_pages: pages_to_read,
152            last_page_len: last_len,
153        };
154
155        Ok(Some((state, total_logical)))
156    }
157}
158
159/// Sync buffering component that implements the `Buf` trait.
160///
161/// This accumulates `BufferState` from multiple fills and provides navigation
162/// across pages while skipping CRCs. Consumed buffers are cleaned up in
163/// `advance()`.
164struct ReplayBuf {
165    /// Physical page size (logical_page_size + CHECKSUM_SIZE).
166    page_size: usize,
167    /// Logical page size (data bytes per page, not including CRC).
168    logical_page_size: usize,
169    /// Accumulated buffers from fills.
170    buffers: VecDeque<BufferState>,
171    /// Current page index within the front buffer.
172    current_page: usize,
173    /// Current offset within the current page's logical data.
174    offset_in_page: usize,
175    /// Total remaining logical bytes across all buffers.
176    remaining: usize,
177}
178
179impl ReplayBuf {
180    /// Creates a new ReplayBuf.
181    const fn new(page_size: usize, logical_page_size: usize) -> Self {
182        Self {
183            page_size,
184            logical_page_size,
185            buffers: VecDeque::new(),
186            current_page: 0,
187            offset_in_page: 0,
188            remaining: 0,
189        }
190    }
191
192    /// Clears the buffer and resets the read offset to 0.
193    fn clear(&mut self) {
194        self.buffers.clear();
195        self.current_page = 0;
196        self.offset_in_page = 0;
197        self.remaining = 0;
198    }
199
200    /// Adds a buffer from a fill operation.
201    fn push(&mut self, state: BufferState, logical_bytes: usize) {
202        // If buffers is empty, this is the first fill after a seek.
203        // Skip bytes before the seek offset (offset_in_page).
204        let skip = if self.buffers.is_empty() {
205            self.offset_in_page
206        } else {
207            0
208        };
209        self.buffers.push_back(state);
210        self.remaining += logical_bytes.saturating_sub(skip);
211    }
212
213    /// Returns the logical length of the given page in the given buffer.
214    const fn page_len(buf: &BufferState, page_idx: usize, logical_page_size: usize) -> usize {
215        if page_idx + 1 == buf.num_pages {
216            buf.last_page_len
217        } else {
218            logical_page_size
219        }
220    }
221}
222
223impl Buf for ReplayBuf {
224    fn remaining(&self) -> usize {
225        self.remaining
226    }
227
228    fn chunk(&self) -> &[u8] {
229        let Some(buf) = self.buffers.front() else {
230            return &[];
231        };
232        if self.current_page >= buf.num_pages {
233            return &[];
234        }
235        let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
236        let physical_start = self.current_page * self.page_size + self.offset_in_page;
237        let physical_end = self.current_page * self.page_size + page_len;
238        &buf.buffer.as_ref()[physical_start..physical_end]
239    }
240
241    fn advance(&mut self, mut cnt: usize) {
242        self.remaining = self.remaining.saturating_sub(cnt);
243
244        while cnt > 0 {
245            let Some(buf) = self.buffers.front() else {
246                break;
247            };
248
249            // Advance within current buffer
250            while cnt > 0 && self.current_page < buf.num_pages {
251                let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
252                let available = page_len - self.offset_in_page;
253                if cnt < available {
254                    self.offset_in_page += cnt;
255                    return;
256                }
257                cnt -= available;
258                self.current_page += 1;
259                self.offset_in_page = 0;
260            }
261
262            // Current buffer exhausted, move to next
263            if self.current_page >= buf.num_pages {
264                self.buffers.pop_front();
265                self.current_page = 0;
266                self.offset_in_page = 0;
267            }
268        }
269    }
270}
271
272/// Replays logical data from a blob containing pages with interleaved CRCs.
273///
274/// This combines async I/O (`PageReader`) with sync buffering (`ReplayBuf`)
275/// to provide an `ensure(n)` + `Buf` interface for codec decoding.
276pub struct Replay<B: Blob> {
277    /// Async I/O component.
278    reader: PageReader<B>,
279    /// Sync buffering component.
280    buffer: ReplayBuf,
281    /// Whether the blob has been fully read.
282    exhausted: bool,
283}
284
285impl<B: Blob> Replay<B> {
286    /// Creates a new Replay from a PageReader.
287    pub(super) const fn new(reader: PageReader<B>) -> Self {
288        let page_size = reader.page_size();
289        let logical_page_size = reader.logical_page_size();
290        Self {
291            reader,
292            buffer: ReplayBuf::new(page_size, logical_page_size),
293            exhausted: false,
294        }
295    }
296
297    /// Returns the logical size of the blob.
298    pub const fn blob_size(&self) -> u64 {
299        self.reader.blob_size()
300    }
301
302    /// Returns true if the reader has been exhausted (no more pages to read).
303    ///
304    /// When exhausted, the buffer may still contain data that hasn't been consumed.
305    /// Callers should check `remaining()` to see if there's data left to process.
306    pub const fn is_exhausted(&self) -> bool {
307        self.exhausted
308    }
309
310    /// Ensures at least `n` bytes are available in the buffer.
311    ///
312    /// This method fills the buffer from the blob until either:
313    /// - At least `n` bytes are available (returns `Ok(true)`)
314    /// - The blob is exhausted with fewer than `n` bytes (returns `Ok(false)`)
315    /// - A read error occurs (returns `Err`)
316    ///
317    /// When `Ok(false)` is returned, callers should still attempt to process
318    /// the remaining bytes in the buffer (check `remaining()`), as they may
319    /// contain valid data that doesn't require the full `n` bytes.
320    pub async fn ensure(&mut self, n: usize) -> Result<bool, Error> {
321        while self.buffer.remaining < n && !self.exhausted {
322            match self.reader.fill().await? {
323                Some((state, logical_bytes)) => {
324                    self.buffer.push(state, logical_bytes);
325                }
326                None => {
327                    self.exhausted = true;
328                }
329            }
330        }
331        Ok(self.buffer.remaining >= n)
332    }
333
334    /// Seeks to `offset` in the blob, returning `Err(BlobInsufficientLength)` if `offset` exceeds
335    /// the blob size.
336    pub fn seek_to(&mut self, offset: u64) -> Result<(), Error> {
337        if offset > self.reader.blob_size() {
338            return Err(Error::BlobInsufficientLength);
339        }
340
341        self.buffer.clear();
342        self.exhausted = false;
343
344        let page_size = self.reader.logical_page_size as u64;
345        self.reader.blob_page = offset / page_size;
346        self.buffer.current_page = 0;
347        self.buffer.offset_in_page = (offset % page_size) as usize;
348
349        Ok(())
350    }
351}
352
353impl<B: Blob> Buf for Replay<B> {
354    fn remaining(&self) -> usize {
355        self.buffer.remaining()
356    }
357
358    fn chunk(&self) -> &[u8] {
359        self.buffer.chunk()
360    }
361
362    fn advance(&mut self, cnt: usize) {
363        self.buffer.advance(cnt);
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::{super::append::Append, *};
370    use crate::{deterministic, Runner as _, Storage as _};
371    use commonware_macros::test_traced;
372    use commonware_utils::{NZUsize, NZU16};
373
374    const PAGE_SIZE: NonZeroU16 = NZU16!(103);
375    const BUFFER_PAGES: usize = 2;
376
377    #[test_traced("DEBUG")]
378    fn test_replay_basic() {
379        let executor = deterministic::Runner::default();
380        executor.start(|context: deterministic::Context| async move {
381            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
382            assert_eq!(blob_size, 0);
383
384            let cache_ref =
385                super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
386            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
387                .await
388                .unwrap();
389
390            // Write data spanning multiple pages
391            let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
392            append.append(&data).await.unwrap();
393            append.sync().await.unwrap();
394
395            // Create Replay
396            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
397
398            // Ensure all data is available
399            replay.ensure(300).await.unwrap();
400
401            // Verify we got all the data
402            assert_eq!(replay.remaining(), 300);
403
404            // Read all data via Buf interface
405            let mut collected = Vec::new();
406            while replay.remaining() > 0 {
407                let chunk = replay.chunk();
408                collected.extend_from_slice(chunk);
409                let len = chunk.len();
410                replay.advance(len);
411            }
412            assert_eq!(collected, data);
413        });
414    }
415
416    #[test_traced("DEBUG")]
417    fn test_replay_partial_page() {
418        let executor = deterministic::Runner::default();
419        executor.start(|context: deterministic::Context| async move {
420            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
421
422            let cache_ref =
423                super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
424            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
425                .await
426                .unwrap();
427
428            // Write data that doesn't fill the last page
429            let data: Vec<u8> = (1u8..=(PAGE_SIZE.get() + 10) as u8).collect();
430            append.append(&data).await.unwrap();
431            append.sync().await.unwrap();
432
433            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
434
435            // Ensure all data is available
436            replay.ensure(data.len()).await.unwrap();
437
438            assert_eq!(replay.remaining(), data.len());
439        });
440    }
441
442    #[test_traced("DEBUG")]
443    fn test_replay_cross_buffer_boundary() {
444        // Use prefetch_count=1 to force separate BufferStates per page.
445        // This tests navigation across multiple BufferStates in the VecDeque.
446        let executor = deterministic::Runner::default();
447        executor.start(|context: deterministic::Context| async move {
448            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
449            assert_eq!(blob_size, 0);
450
451            let cache_ref =
452                super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
453            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
454                .await
455                .unwrap();
456
457            // Write data spanning 4 pages (4 * 103 = 412 bytes, with last page partial)
458            let data: Vec<u8> = (0u8..=255).cycle().take(400).collect();
459            append.append(&data).await.unwrap();
460            append.sync().await.unwrap();
461
462            // Create Replay with buffer size that results in prefetch_count=1.
463            // Physical page size = 103 + 12 = 115 bytes.
464            // Buffer size of 115 gives prefetch_pages = 115/115 = 1.
465            let mut replay = append.replay(NZUsize!(115)).await.unwrap();
466
467            // Ensure all data - this requires 4 separate fill() calls (one per page).
468            // Each fill() creates a new BufferState, so we'll have 4 BufferStates.
469            assert!(replay.ensure(400).await.unwrap());
470            assert_eq!(replay.remaining(), 400);
471
472            // Read all data via Buf interface, verifying navigation across BufferStates.
473            let mut collected = Vec::new();
474            let mut chunks_read = 0;
475            while replay.remaining() > 0 {
476                let chunk = replay.chunk();
477                assert!(
478                    !chunk.is_empty(),
479                    "chunk() returned empty but remaining > 0"
480                );
481                collected.extend_from_slice(chunk);
482                let len = chunk.len();
483                replay.advance(len);
484                chunks_read += 1;
485            }
486
487            assert_eq!(collected, data);
488            // With prefetch_count=1 and 4 pages, we expect at least 4 chunks
489            // (one per page, though partial reads could result in more).
490            assert!(
491                chunks_read >= 4,
492                "Expected at least 4 chunks for 4 pages, got {}",
493                chunks_read
494            );
495        });
496    }
497
498    #[test_traced("DEBUG")]
499    fn test_replay_empty_blob() {
500        // Test that replaying an empty blob works correctly.
501        // ensure() should return Ok(false) when no data is available.
502        let executor = deterministic::Runner::default();
503        executor.start(|context: deterministic::Context| async move {
504            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
505            assert_eq!(blob_size, 0);
506
507            let cache_ref =
508                super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
509            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
510                .await
511                .unwrap();
512
513            // Don't write any data - blob remains empty
514            assert_eq!(append.size().await, 0);
515
516            // Create Replay on empty blob
517            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
518
519            // Verify initial state - remaining is 0, but not yet marked exhausted
520            // (exhausted is set after first fill attempt)
521            assert_eq!(replay.remaining(), 0);
522
523            // ensure(0) should succeed (we have >= 0 bytes)
524            assert!(replay.ensure(0).await.unwrap());
525
526            // ensure(1) should return Ok(false) - not enough data, and marks exhausted
527            assert!(!replay.ensure(1).await.unwrap());
528
529            // Now should be marked as exhausted after the fill attempt
530            assert!(replay.is_exhausted());
531
532            // chunk() should return empty slice
533            assert!(replay.chunk().is_empty());
534
535            // remaining should still be 0
536            assert_eq!(replay.remaining(), 0);
537        });
538    }
539
540    #[test_traced("DEBUG")]
541    fn test_replay_seek_to() {
542        let executor = deterministic::Runner::default();
543        executor.start(|context: deterministic::Context| async move {
544            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
545
546            let cache_ref =
547                super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
548            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
549                .await
550                .unwrap();
551
552            // Write data spanning multiple pages
553            let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
554            append.append(&data).await.unwrap();
555            append.sync().await.unwrap();
556
557            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
558
559            // Seek forward, read, then seek backward
560            replay.seek_to(150).unwrap();
561            replay.ensure(50).await.unwrap();
562            assert_eq!(replay.get_u8(), data[150]);
563
564            // Seek back to start
565            replay.seek_to(0).unwrap();
566            replay.ensure(1).await.unwrap();
567            assert_eq!(replay.get_u8(), data[0]);
568
569            // Seek beyond blob size should error
570            assert!(replay.seek_to(data.len() as u64 + 1).is_err());
571
572            // Test that remaining() is correct after seek by reading all data.
573            let seek_offset = 150usize;
574            replay.seek_to(seek_offset as u64).unwrap();
575            let expected_remaining = data.len() - seek_offset;
576            // Read all bytes and verify content
577            let mut collected = Vec::new();
578            loop {
579                // Load more data if needed
580                if !replay.ensure(1).await.unwrap() {
581                    break; // No more data available
582                }
583                let chunk = replay.chunk();
584                if chunk.is_empty() {
585                    break;
586                }
587                collected.extend_from_slice(chunk);
588                let len = chunk.len();
589                replay.advance(len);
590            }
591            assert_eq!(
592                collected.len(),
593                expected_remaining,
594                "After seeking to {}, should read {} bytes but got {}",
595                seek_offset,
596                expected_remaining,
597                collected.len()
598            );
599            assert_eq!(collected, &data[seek_offset..]);
600        });
601    }
602}