commonware_runtime/utils/buffer/pool/
read.rs

1use super::Checksum;
2use crate::{Blob, Error};
3use bytes::Buf;
4use commonware_codec::FixedSize;
5use std::{collections::VecDeque, num::NonZeroU16};
6use tracing::error;
7
8/// State for a single buffer of pages read from the blob.
9///
10/// Each fill produces one `BufferState` containing all pages read in that batch.
11/// Navigation skips CRCs by computing offsets rather than creating separate
12/// `Bytes` slices per page.
13pub(super) struct BufferState {
14    /// The raw physical buffer containing pages with interleaved CRCs.
15    buffer: Vec<u8>,
16    /// Number of pages in this buffer.
17    num_pages: usize,
18    /// Logical length of the last page (may be partial).
19    last_page_len: usize,
20}
21
22/// Async I/O component that prefetches pages and validates CRCs.
23///
24/// This handles reading batches of pages from the blob, validating their
25/// checksums, and producing `BufferState` for the sync buffering layer.
26pub(super) struct PageReader<B: Blob> {
27    /// The underlying blob to read from.
28    blob: B,
29    /// Physical page size (logical_page_size + CHECKSUM_SIZE).
30    page_size: usize,
31    /// Logical page size (data bytes per page, not including CRC).
32    logical_page_size: usize,
33    /// The physical size of the blob.
34    physical_blob_size: u64,
35    /// The logical size of the blob.
36    logical_blob_size: u64,
37    /// Next page index to read from the blob.
38    blob_page: u64,
39    /// Number of pages to prefetch at once.
40    prefetch_count: usize,
41}
42
43impl<B: Blob> PageReader<B> {
44    /// Creates a new PageReader.
45    ///
46    /// The `physical_blob_size` must already exclude any trailing invalid data
47    /// (e.g., junk pages from an interrupted write). Each physical page is the same
48    /// size on disk, but the CRC record indicates how much logical data it contains.
49    /// The last page may be logically partial (CRC length < logical page size), but
50    /// all preceding pages must be logically full. A logically partial non-last page
51    /// indicates corruption and will cause an `Error::InvalidChecksum`.
52    pub(super) const fn new(
53        blob: B,
54        physical_blob_size: u64,
55        logical_blob_size: u64,
56        prefetch_count: usize,
57        logical_page_size: NonZeroU16,
58    ) -> Self {
59        let logical_page_size = logical_page_size.get() as usize;
60        let page_size = logical_page_size + Checksum::SIZE;
61
62        Self {
63            blob,
64            page_size,
65            logical_page_size,
66            physical_blob_size,
67            logical_blob_size,
68            blob_page: 0,
69            prefetch_count,
70        }
71    }
72
73    /// Returns the logical size of the blob.
74    pub(super) const fn blob_size(&self) -> u64 {
75        self.logical_blob_size
76    }
77
78    /// Returns the physical page size.
79    pub(super) const fn page_size(&self) -> usize {
80        self.page_size
81    }
82
83    /// Returns the logical page size.
84    pub(super) const fn logical_page_size(&self) -> usize {
85        self.logical_page_size
86    }
87
88    /// Fills a buffer with the next batch of pages.
89    ///
90    /// Returns `Some((BufferState, logical_bytes))` if data was loaded,
91    /// `None` if no more data available.
92    pub(super) async fn fill(&mut self) -> Result<Option<(BufferState, usize)>, Error> {
93        // Calculate physical read offset
94        let start_offset = match self.blob_page.checked_mul(self.page_size as u64) {
95            Some(o) => o,
96            None => return Err(Error::OffsetOverflow),
97        };
98        if start_offset >= self.physical_blob_size {
99            return Ok(None); // No more data
100        }
101
102        // Calculate how many pages to read
103        let remaining_physical = (self.physical_blob_size - start_offset) as usize;
104        let max_pages = remaining_physical / self.page_size;
105        let pages_to_read = max_pages.min(self.prefetch_count);
106        if pages_to_read == 0 {
107            return Ok(None);
108        }
109        let bytes_to_read = pages_to_read * self.page_size;
110
111        // Read physical data
112        let physical_buf: Vec<u8> = self
113            .blob
114            .read_at(vec![0u8; bytes_to_read], start_offset)
115            .await?
116            .into();
117
118        // Validate CRCs and compute total logical bytes
119        let mut total_logical = 0usize;
120        let mut last_len = 0usize;
121        let is_final_batch = pages_to_read == max_pages;
122        for page_idx in 0..pages_to_read {
123            let page_start = page_idx * self.page_size;
124            let page_slice = &physical_buf[page_start..page_start + self.page_size];
125            let Some(record) = Checksum::validate_page(page_slice) else {
126                error!(page = self.blob_page + page_idx as u64, "CRC mismatch");
127                return Err(Error::InvalidChecksum);
128            };
129            let (len, _) = record.get_crc();
130            let len = len as usize;
131
132            // Only the final page in the blob may have partial length
133            let is_last_page_in_blob = is_final_batch && page_idx + 1 == pages_to_read;
134            if !is_last_page_in_blob && len != self.logical_page_size {
135                error!(
136                    page = self.blob_page + page_idx as u64,
137                    expected = self.logical_page_size,
138                    actual = len,
139                    "non-last page has partial length"
140                );
141                return Err(Error::InvalidChecksum);
142            }
143
144            total_logical += len;
145            last_len = len;
146        }
147        self.blob_page += pages_to_read as u64;
148
149        let state = BufferState {
150            buffer: physical_buf,
151            num_pages: pages_to_read,
152            last_page_len: last_len,
153        };
154
155        Ok(Some((state, total_logical)))
156    }
157}
158
159/// Sync buffering component that implements the `Buf` trait.
160///
161/// This accumulates `BufferState` from multiple fills and provides navigation
162/// across pages while skipping CRCs. Consumed buffers are cleaned up in
163/// `advance()`.
164struct ReplayBuf {
165    /// Physical page size (logical_page_size + CHECKSUM_SIZE).
166    page_size: usize,
167    /// Logical page size (data bytes per page, not including CRC).
168    logical_page_size: usize,
169    /// Accumulated buffers from fills.
170    buffers: VecDeque<BufferState>,
171    /// Current page index within the front buffer.
172    current_page: usize,
173    /// Current offset within the current page's logical data.
174    offset_in_page: usize,
175    /// Total remaining logical bytes across all buffers.
176    remaining: usize,
177}
178
179impl ReplayBuf {
180    /// Creates a new ReplayBuf.
181    const fn new(page_size: usize, logical_page_size: usize) -> Self {
182        Self {
183            page_size,
184            logical_page_size,
185            buffers: VecDeque::new(),
186            current_page: 0,
187            offset_in_page: 0,
188            remaining: 0,
189        }
190    }
191
192    /// Clears the buffer and resets the read offset to 0.
193    fn clear(&mut self) {
194        self.buffers.clear();
195        self.current_page = 0;
196        self.offset_in_page = 0;
197        self.remaining = 0;
198    }
199
200    /// Adds a buffer from a fill operation.
201    fn push(&mut self, state: BufferState, logical_bytes: usize) {
202        // If buffers is empty, this is the first fill after a seek.
203        // Skip bytes before the seek offset (offset_in_page).
204        let skip = if self.buffers.is_empty() {
205            self.offset_in_page
206        } else {
207            0
208        };
209        self.buffers.push_back(state);
210        self.remaining += logical_bytes.saturating_sub(skip);
211    }
212
213    /// Returns the logical length of the given page in the given buffer.
214    const fn page_len(buf: &BufferState, page_idx: usize, logical_page_size: usize) -> usize {
215        if page_idx + 1 == buf.num_pages {
216            buf.last_page_len
217        } else {
218            logical_page_size
219        }
220    }
221}
222
223impl Buf for ReplayBuf {
224    fn remaining(&self) -> usize {
225        self.remaining
226    }
227
228    fn chunk(&self) -> &[u8] {
229        let Some(buf) = self.buffers.front() else {
230            return &[];
231        };
232        if self.current_page >= buf.num_pages {
233            return &[];
234        }
235        let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
236        let physical_start = self.current_page * self.page_size + self.offset_in_page;
237        let physical_end = self.current_page * self.page_size + page_len;
238        &buf.buffer[physical_start..physical_end]
239    }
240
241    fn advance(&mut self, mut cnt: usize) {
242        self.remaining = self.remaining.saturating_sub(cnt);
243
244        while cnt > 0 {
245            let Some(buf) = self.buffers.front() else {
246                break;
247            };
248
249            // Advance within current buffer
250            while cnt > 0 && self.current_page < buf.num_pages {
251                let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
252                let available = page_len - self.offset_in_page;
253                if cnt < available {
254                    self.offset_in_page += cnt;
255                    return;
256                }
257                cnt -= available;
258                self.current_page += 1;
259                self.offset_in_page = 0;
260            }
261
262            // Current buffer exhausted, move to next
263            if self.current_page >= buf.num_pages {
264                self.buffers.pop_front();
265                self.current_page = 0;
266                self.offset_in_page = 0;
267            }
268        }
269    }
270}
271
272/// Replays logical data from a blob containing pages with interleaved CRCs.
273///
274/// This combines async I/O (`PageReader`) with sync buffering (`ReplayBuf`)
275/// to provide an `ensure(n)` + `Buf` interface for codec decoding.
276pub struct Replay<B: Blob> {
277    /// Async I/O component.
278    reader: PageReader<B>,
279    /// Sync buffering component.
280    buffer: ReplayBuf,
281    /// Whether the blob has been fully read.
282    exhausted: bool,
283}
284
285impl<B: Blob> Replay<B> {
286    /// Creates a new Replay from a PageReader.
287    pub(super) const fn new(reader: PageReader<B>) -> Self {
288        let page_size = reader.page_size();
289        let logical_page_size = reader.logical_page_size();
290        Self {
291            reader,
292            buffer: ReplayBuf::new(page_size, logical_page_size),
293            exhausted: false,
294        }
295    }
296
297    /// Returns the logical size of the blob.
298    pub const fn blob_size(&self) -> u64 {
299        self.reader.blob_size()
300    }
301
302    /// Returns true if the reader has been exhausted (no more pages to read).
303    ///
304    /// When exhausted, the buffer may still contain data that hasn't been consumed.
305    /// Callers should check `remaining()` to see if there's data left to process.
306    pub const fn is_exhausted(&self) -> bool {
307        self.exhausted
308    }
309
310    /// Ensures at least `n` bytes are available in the buffer.
311    ///
312    /// This method fills the buffer from the blob until either:
313    /// - At least `n` bytes are available (returns `Ok(true)`)
314    /// - The blob is exhausted with fewer than `n` bytes (returns `Ok(false)`)
315    /// - A read error occurs (returns `Err`)
316    ///
317    /// When `Ok(false)` is returned, callers should still attempt to process
318    /// the remaining bytes in the buffer (check `remaining()`), as they may
319    /// contain valid data that doesn't require the full `n` bytes.
320    pub async fn ensure(&mut self, n: usize) -> Result<bool, Error> {
321        while self.buffer.remaining < n && !self.exhausted {
322            match self.reader.fill().await? {
323                Some((state, logical_bytes)) => {
324                    self.buffer.push(state, logical_bytes);
325                }
326                None => {
327                    self.exhausted = true;
328                }
329            }
330        }
331        Ok(self.buffer.remaining >= n)
332    }
333
334    /// Seeks to `offset` in the blob, returning `Err(BlobInsufficientLength)` if `offset` exceeds
335    /// the blob size.
336    pub async fn seek_to(&mut self, offset: u64) -> Result<(), Error> {
337        if offset > self.reader.blob_size() {
338            return Err(Error::BlobInsufficientLength);
339        }
340
341        self.buffer.clear();
342        self.exhausted = false;
343
344        let page_size = self.reader.logical_page_size as u64;
345        self.reader.blob_page = offset / page_size;
346        self.buffer.current_page = 0;
347        self.buffer.offset_in_page = (offset % page_size) as usize;
348
349        Ok(())
350    }
351}
352
353impl<B: Blob> Buf for Replay<B> {
354    fn remaining(&self) -> usize {
355        self.buffer.remaining()
356    }
357
358    fn chunk(&self) -> &[u8] {
359        self.buffer.chunk()
360    }
361
362    fn advance(&mut self, cnt: usize) {
363        self.buffer.advance(cnt);
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::{super::append::Append, *};
370    use crate::{deterministic, Runner as _, Storage as _};
371    use commonware_macros::test_traced;
372    use commonware_utils::{NZUsize, NZU16};
373
374    const PAGE_SIZE: NonZeroU16 = NZU16!(103);
375    const BUFFER_PAGES: usize = 2;
376
377    #[test_traced("DEBUG")]
378    fn test_replay_basic() {
379        let executor = deterministic::Runner::default();
380        executor.start(|context: deterministic::Context| async move {
381            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
382            assert_eq!(blob_size, 0);
383
384            let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
385            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
386                .await
387                .unwrap();
388
389            // Write data spanning multiple pages
390            let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
391            append.append(&data).await.unwrap();
392            append.sync().await.unwrap();
393
394            // Create Replay
395            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
396
397            // Ensure all data is available
398            replay.ensure(300).await.unwrap();
399
400            // Verify we got all the data
401            assert_eq!(replay.remaining(), 300);
402
403            // Read all data via Buf interface
404            let mut collected = Vec::new();
405            while replay.remaining() > 0 {
406                let chunk = replay.chunk();
407                collected.extend_from_slice(chunk);
408                let len = chunk.len();
409                replay.advance(len);
410            }
411            assert_eq!(collected, data);
412        });
413    }
414
415    #[test_traced("DEBUG")]
416    fn test_replay_partial_page() {
417        let executor = deterministic::Runner::default();
418        executor.start(|context: deterministic::Context| async move {
419            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
420
421            let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
422            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
423                .await
424                .unwrap();
425
426            // Write data that doesn't fill the last page
427            let data: Vec<u8> = (1u8..=(PAGE_SIZE.get() + 10) as u8).collect();
428            append.append(&data).await.unwrap();
429            append.sync().await.unwrap();
430
431            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
432
433            // Ensure all data is available
434            replay.ensure(data.len()).await.unwrap();
435
436            assert_eq!(replay.remaining(), data.len());
437        });
438    }
439
440    #[test_traced("DEBUG")]
441    fn test_replay_cross_buffer_boundary() {
442        // Use prefetch_count=1 to force separate BufferStates per page.
443        // This tests navigation across multiple BufferStates in the VecDeque.
444        let executor = deterministic::Runner::default();
445        executor.start(|context: deterministic::Context| async move {
446            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
447            assert_eq!(blob_size, 0);
448
449            let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
450            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
451                .await
452                .unwrap();
453
454            // Write data spanning 4 pages (4 * 103 = 412 bytes, with last page partial)
455            let data: Vec<u8> = (0u8..=255).cycle().take(400).collect();
456            append.append(&data).await.unwrap();
457            append.sync().await.unwrap();
458
459            // Create Replay with buffer size that results in prefetch_count=1.
460            // Physical page size = 103 + 12 = 115 bytes.
461            // Buffer size of 115 gives prefetch_pages = 115/115 = 1.
462            let mut replay = append.replay(NZUsize!(115)).await.unwrap();
463
464            // Ensure all data - this requires 4 separate fill() calls (one per page).
465            // Each fill() creates a new BufferState, so we'll have 4 BufferStates.
466            assert!(replay.ensure(400).await.unwrap());
467            assert_eq!(replay.remaining(), 400);
468
469            // Read all data via Buf interface, verifying navigation across BufferStates.
470            let mut collected = Vec::new();
471            let mut chunks_read = 0;
472            while replay.remaining() > 0 {
473                let chunk = replay.chunk();
474                assert!(
475                    !chunk.is_empty(),
476                    "chunk() returned empty but remaining > 0"
477                );
478                collected.extend_from_slice(chunk);
479                let len = chunk.len();
480                replay.advance(len);
481                chunks_read += 1;
482            }
483
484            assert_eq!(collected, data);
485            // With prefetch_count=1 and 4 pages, we expect at least 4 chunks
486            // (one per page, though partial reads could result in more).
487            assert!(
488                chunks_read >= 4,
489                "Expected at least 4 chunks for 4 pages, got {}",
490                chunks_read
491            );
492        });
493    }
494
495    #[test_traced("DEBUG")]
496    fn test_replay_empty_blob() {
497        // Test that replaying an empty blob works correctly.
498        // ensure() should return Ok(false) when no data is available.
499        let executor = deterministic::Runner::default();
500        executor.start(|context: deterministic::Context| async move {
501            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
502            assert_eq!(blob_size, 0);
503
504            let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
505            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
506                .await
507                .unwrap();
508
509            // Don't write any data - blob remains empty
510            assert_eq!(append.size().await, 0);
511
512            // Create Replay on empty blob
513            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
514
515            // Verify initial state - remaining is 0, but not yet marked exhausted
516            // (exhausted is set after first fill attempt)
517            assert_eq!(replay.remaining(), 0);
518
519            // ensure(0) should succeed (we have >= 0 bytes)
520            assert!(replay.ensure(0).await.unwrap());
521
522            // ensure(1) should return Ok(false) - not enough data, and marks exhausted
523            assert!(!replay.ensure(1).await.unwrap());
524
525            // Now should be marked as exhausted after the fill attempt
526            assert!(replay.is_exhausted());
527
528            // chunk() should return empty slice
529            assert!(replay.chunk().is_empty());
530
531            // remaining should still be 0
532            assert_eq!(replay.remaining(), 0);
533        });
534    }
535
536    #[test_traced("DEBUG")]
537    fn test_replay_seek_to() {
538        let executor = deterministic::Runner::default();
539        executor.start(|context: deterministic::Context| async move {
540            let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
541
542            let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
543            let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
544                .await
545                .unwrap();
546
547            // Write data spanning multiple pages
548            let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
549            append.append(&data).await.unwrap();
550            append.sync().await.unwrap();
551
552            let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
553
554            // Seek forward, read, then seek backward
555            replay.seek_to(150).await.unwrap();
556            replay.ensure(50).await.unwrap();
557            assert_eq!(replay.chunk()[0], data[150]);
558
559            // Seek back to start
560            replay.seek_to(0).await.unwrap();
561            replay.ensure(1).await.unwrap();
562            assert_eq!(replay.chunk()[0], data[0]);
563
564            // Seek beyond blob size should error
565            assert!(replay.seek_to(data.len() as u64 + 1).await.is_err());
566
567            // Test that remaining() is correct after seek by reading all data.
568            let seek_offset = 150usize;
569            replay.seek_to(seek_offset as u64).await.unwrap();
570            let expected_remaining = data.len() - seek_offset;
571            // Read all bytes and verify content
572            let mut collected = Vec::new();
573            loop {
574                // Load more data if needed
575                if !replay.ensure(1).await.unwrap() {
576                    break; // No more data available
577                }
578                let chunk = replay.chunk();
579                if chunk.is_empty() {
580                    break;
581                }
582                collected.extend_from_slice(chunk);
583                let len = chunk.len();
584                replay.advance(len);
585            }
586            assert_eq!(
587                collected.len(),
588                expected_remaining,
589                "After seeking to {}, should read {} bytes but got {}",
590                seek_offset,
591                expected_remaining,
592                collected.len()
593            );
594            assert_eq!(collected, &data[seek_offset..]);
595        });
596    }
597}