commonware_runtime/utils/buffer/paged/
read.rs1use super::Checksum;
2use crate::{Blob, Buf, Error, IoBufMut};
3use commonware_codec::FixedSize;
4use std::{collections::VecDeque, num::NonZeroU16};
5use tracing::error;
6
7pub(super) struct BufferState {
13 buffer: Vec<u8>,
15 num_pages: usize,
17 last_page_len: usize,
19}
20
21pub(super) struct PageReader<B: Blob> {
26 blob: B,
28 page_size: usize,
30 logical_page_size: usize,
32 physical_blob_size: u64,
34 logical_blob_size: u64,
36 blob_page: u64,
38 prefetch_count: usize,
40}
41
42impl<B: Blob> PageReader<B> {
43 pub(super) const fn new(
52 blob: B,
53 physical_blob_size: u64,
54 logical_blob_size: u64,
55 prefetch_count: usize,
56 logical_page_size: NonZeroU16,
57 ) -> Self {
58 let logical_page_size = logical_page_size.get() as usize;
59 let page_size = logical_page_size + Checksum::SIZE;
60
61 Self {
62 blob,
63 page_size,
64 logical_page_size,
65 physical_blob_size,
66 logical_blob_size,
67 blob_page: 0,
68 prefetch_count,
69 }
70 }
71
72 pub(super) const fn blob_size(&self) -> u64 {
74 self.logical_blob_size
75 }
76
77 pub(super) const fn page_size(&self) -> usize {
79 self.page_size
80 }
81
82 pub(super) const fn logical_page_size(&self) -> usize {
84 self.logical_page_size
85 }
86
87 pub(super) async fn fill(&mut self) -> Result<Option<(BufferState, usize)>, Error> {
92 let start_offset = match self.blob_page.checked_mul(self.page_size as u64) {
94 Some(o) => o,
95 None => return Err(Error::OffsetOverflow),
96 };
97 if start_offset >= self.physical_blob_size {
98 return Ok(None); }
100
101 let remaining_physical = (self.physical_blob_size - start_offset) as usize;
103 let max_pages = remaining_physical / self.page_size;
104 let pages_to_read = max_pages.min(self.prefetch_count);
105 if pages_to_read == 0 {
106 return Ok(None);
107 }
108 let bytes_to_read = pages_to_read * self.page_size;
109
110 let buf = IoBufMut::zeroed(bytes_to_read);
112 let physical_buf = self
113 .blob
114 .read_at(start_offset, buf)
115 .await?
116 .coalesce()
117 .freeze();
118
119 let mut total_logical = 0usize;
121 let mut last_len = 0usize;
122 let is_final_batch = pages_to_read == max_pages;
123 for page_idx in 0..pages_to_read {
124 let page_start = page_idx * self.page_size;
125 let page_slice = &physical_buf.as_ref()[page_start..page_start + self.page_size];
126 let Some(record) = Checksum::validate_page(page_slice) else {
127 error!(page = self.blob_page + page_idx as u64, "CRC mismatch");
128 return Err(Error::InvalidChecksum);
129 };
130 let (len, _) = record.get_crc();
131 let len = len as usize;
132
133 let is_last_page_in_blob = is_final_batch && page_idx + 1 == pages_to_read;
135 if !is_last_page_in_blob && len != self.logical_page_size {
136 error!(
137 page = self.blob_page + page_idx as u64,
138 expected = self.logical_page_size,
139 actual = len,
140 "non-last page has partial length"
141 );
142 return Err(Error::InvalidChecksum);
143 }
144
145 total_logical += len;
146 last_len = len;
147 }
148 self.blob_page += pages_to_read as u64;
149
150 let state = BufferState {
151 buffer: physical_buf.into(),
152 num_pages: pages_to_read,
153 last_page_len: last_len,
154 };
155
156 Ok(Some((state, total_logical)))
157 }
158}
159
160struct ReplayBuf {
166 page_size: usize,
168 logical_page_size: usize,
170 buffers: VecDeque<BufferState>,
172 current_page: usize,
174 offset_in_page: usize,
176 remaining: usize,
178}
179
180impl ReplayBuf {
181 const fn new(page_size: usize, logical_page_size: usize) -> Self {
183 Self {
184 page_size,
185 logical_page_size,
186 buffers: VecDeque::new(),
187 current_page: 0,
188 offset_in_page: 0,
189 remaining: 0,
190 }
191 }
192
193 fn clear(&mut self) {
195 self.buffers.clear();
196 self.current_page = 0;
197 self.offset_in_page = 0;
198 self.remaining = 0;
199 }
200
201 fn push(&mut self, state: BufferState, logical_bytes: usize) {
203 let skip = if self.buffers.is_empty() {
206 self.offset_in_page
207 } else {
208 0
209 };
210 self.buffers.push_back(state);
211 self.remaining += logical_bytes.saturating_sub(skip);
212 }
213
214 const fn page_len(buf: &BufferState, page_idx: usize, logical_page_size: usize) -> usize {
216 if page_idx + 1 == buf.num_pages {
217 buf.last_page_len
218 } else {
219 logical_page_size
220 }
221 }
222}
223
224impl Buf for ReplayBuf {
225 fn remaining(&self) -> usize {
226 self.remaining
227 }
228
229 fn chunk(&self) -> &[u8] {
230 let Some(buf) = self.buffers.front() else {
231 return &[];
232 };
233 if self.current_page >= buf.num_pages {
234 return &[];
235 }
236 let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
237 let physical_start = self.current_page * self.page_size + self.offset_in_page;
238 let physical_end = self.current_page * self.page_size + page_len;
239 &buf.buffer[physical_start..physical_end]
240 }
241
242 fn advance(&mut self, mut cnt: usize) {
243 self.remaining = self.remaining.saturating_sub(cnt);
244
245 while cnt > 0 {
246 let Some(buf) = self.buffers.front() else {
247 break;
248 };
249
250 while cnt > 0 && self.current_page < buf.num_pages {
252 let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
253 let available = page_len - self.offset_in_page;
254 if cnt < available {
255 self.offset_in_page += cnt;
256 return;
257 }
258 cnt -= available;
259 self.current_page += 1;
260 self.offset_in_page = 0;
261 }
262
263 if self.current_page >= buf.num_pages {
265 self.buffers.pop_front();
266 self.current_page = 0;
267 self.offset_in_page = 0;
268 }
269 }
270 }
271}
272
273pub struct Replay<B: Blob> {
278 reader: PageReader<B>,
280 buffer: ReplayBuf,
282 exhausted: bool,
284}
285
286impl<B: Blob> Replay<B> {
287 pub(super) const fn new(reader: PageReader<B>) -> Self {
289 let page_size = reader.page_size();
290 let logical_page_size = reader.logical_page_size();
291 Self {
292 reader,
293 buffer: ReplayBuf::new(page_size, logical_page_size),
294 exhausted: false,
295 }
296 }
297
298 pub const fn blob_size(&self) -> u64 {
300 self.reader.blob_size()
301 }
302
303 pub const fn is_exhausted(&self) -> bool {
308 self.exhausted
309 }
310
311 pub async fn ensure(&mut self, n: usize) -> Result<bool, Error> {
322 while self.buffer.remaining < n && !self.exhausted {
323 match self.reader.fill().await? {
324 Some((state, logical_bytes)) => {
325 self.buffer.push(state, logical_bytes);
326 }
327 None => {
328 self.exhausted = true;
329 }
330 }
331 }
332 Ok(self.buffer.remaining >= n)
333 }
334
335 pub async fn seek_to(&mut self, offset: u64) -> Result<(), Error> {
338 if offset > self.reader.blob_size() {
339 return Err(Error::BlobInsufficientLength);
340 }
341
342 self.buffer.clear();
343 self.exhausted = false;
344
345 let page_size = self.reader.logical_page_size as u64;
346 self.reader.blob_page = offset / page_size;
347 self.buffer.current_page = 0;
348 self.buffer.offset_in_page = (offset % page_size) as usize;
349
350 Ok(())
351 }
352}
353
354impl<B: Blob> Buf for Replay<B> {
355 fn remaining(&self) -> usize {
356 self.buffer.remaining()
357 }
358
359 fn chunk(&self) -> &[u8] {
360 self.buffer.chunk()
361 }
362
363 fn advance(&mut self, cnt: usize) {
364 self.buffer.advance(cnt);
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::{super::append::Append, *};
371 use crate::{deterministic, Runner as _, Storage as _};
372 use commonware_macros::test_traced;
373 use commonware_utils::{NZUsize, NZU16};
374
375 const PAGE_SIZE: NonZeroU16 = NZU16!(103);
376 const BUFFER_PAGES: usize = 2;
377
378 #[test_traced("DEBUG")]
379 fn test_replay_basic() {
380 let executor = deterministic::Runner::default();
381 executor.start(|context: deterministic::Context| async move {
382 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
383 assert_eq!(blob_size, 0);
384
385 let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
386 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
387 .await
388 .unwrap();
389
390 let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
392 append.append(&data).await.unwrap();
393 append.sync().await.unwrap();
394
395 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
397
398 replay.ensure(300).await.unwrap();
400
401 assert_eq!(replay.remaining(), 300);
403
404 let mut collected = Vec::new();
406 while replay.remaining() > 0 {
407 let chunk = replay.chunk();
408 collected.extend_from_slice(chunk);
409 let len = chunk.len();
410 replay.advance(len);
411 }
412 assert_eq!(collected, data);
413 });
414 }
415
416 #[test_traced("DEBUG")]
417 fn test_replay_partial_page() {
418 let executor = deterministic::Runner::default();
419 executor.start(|context: deterministic::Context| async move {
420 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
421
422 let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
423 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
424 .await
425 .unwrap();
426
427 let data: Vec<u8> = (1u8..=(PAGE_SIZE.get() + 10) as u8).collect();
429 append.append(&data).await.unwrap();
430 append.sync().await.unwrap();
431
432 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
433
434 replay.ensure(data.len()).await.unwrap();
436
437 assert_eq!(replay.remaining(), data.len());
438 });
439 }
440
441 #[test_traced("DEBUG")]
442 fn test_replay_cross_buffer_boundary() {
443 let executor = deterministic::Runner::default();
446 executor.start(|context: deterministic::Context| async move {
447 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
448 assert_eq!(blob_size, 0);
449
450 let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
451 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
452 .await
453 .unwrap();
454
455 let data: Vec<u8> = (0u8..=255).cycle().take(400).collect();
457 append.append(&data).await.unwrap();
458 append.sync().await.unwrap();
459
460 let mut replay = append.replay(NZUsize!(115)).await.unwrap();
464
465 assert!(replay.ensure(400).await.unwrap());
468 assert_eq!(replay.remaining(), 400);
469
470 let mut collected = Vec::new();
472 let mut chunks_read = 0;
473 while replay.remaining() > 0 {
474 let chunk = replay.chunk();
475 assert!(
476 !chunk.is_empty(),
477 "chunk() returned empty but remaining > 0"
478 );
479 collected.extend_from_slice(chunk);
480 let len = chunk.len();
481 replay.advance(len);
482 chunks_read += 1;
483 }
484
485 assert_eq!(collected, data);
486 assert!(
489 chunks_read >= 4,
490 "Expected at least 4 chunks for 4 pages, got {}",
491 chunks_read
492 );
493 });
494 }
495
496 #[test_traced("DEBUG")]
497 fn test_replay_empty_blob() {
498 let executor = deterministic::Runner::default();
501 executor.start(|context: deterministic::Context| async move {
502 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
503 assert_eq!(blob_size, 0);
504
505 let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
506 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
507 .await
508 .unwrap();
509
510 assert_eq!(append.size().await, 0);
512
513 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
515
516 assert_eq!(replay.remaining(), 0);
519
520 assert!(replay.ensure(0).await.unwrap());
522
523 assert!(!replay.ensure(1).await.unwrap());
525
526 assert!(replay.is_exhausted());
528
529 assert!(replay.chunk().is_empty());
531
532 assert_eq!(replay.remaining(), 0);
534 });
535 }
536
537 #[test_traced("DEBUG")]
538 fn test_replay_seek_to() {
539 let executor = deterministic::Runner::default();
540 executor.start(|context: deterministic::Context| async move {
541 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
542
543 let cache_ref = super::super::CacheRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
544 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
545 .await
546 .unwrap();
547
548 let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
550 append.append(&data).await.unwrap();
551 append.sync().await.unwrap();
552
553 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
554
555 replay.seek_to(150).await.unwrap();
557 replay.ensure(50).await.unwrap();
558 assert_eq!(replay.get_u8(), data[150]);
559
560 replay.seek_to(0).await.unwrap();
562 replay.ensure(1).await.unwrap();
563 assert_eq!(replay.get_u8(), data[0]);
564
565 assert!(replay.seek_to(data.len() as u64 + 1).await.is_err());
567
568 let seek_offset = 150usize;
570 replay.seek_to(seek_offset as u64).await.unwrap();
571 let expected_remaining = data.len() - seek_offset;
572 let mut collected = Vec::new();
574 loop {
575 if !replay.ensure(1).await.unwrap() {
577 break; }
579 let chunk = replay.chunk();
580 if chunk.is_empty() {
581 break;
582 }
583 collected.extend_from_slice(chunk);
584 let len = chunk.len();
585 replay.advance(len);
586 }
587 assert_eq!(
588 collected.len(),
589 expected_remaining,
590 "After seeking to {}, should read {} bytes but got {}",
591 seek_offset,
592 expected_remaining,
593 collected.len()
594 );
595 assert_eq!(collected, &data[seek_offset..]);
596 });
597 }
598}