commonware_runtime/utils/buffer/paged/
read.rs1use super::Checksum;
2use crate::{Blob, Buf, Error, IoBuf};
3use commonware_codec::FixedSize;
4use std::{collections::VecDeque, num::NonZeroU16};
5use tracing::error;
6
7pub(super) struct BufferState {
13 buffer: IoBuf,
15 num_pages: usize,
17 last_page_len: usize,
19}
20
21pub(super) struct PageReader<B: Blob> {
26 blob: B,
28 page_size: usize,
30 logical_page_size: usize,
32 physical_blob_size: u64,
34 logical_blob_size: u64,
36 blob_page: u64,
38 prefetch_count: usize,
40}
41
42impl<B: Blob> PageReader<B> {
43 pub(super) const fn new(
52 blob: B,
53 physical_blob_size: u64,
54 logical_blob_size: u64,
55 prefetch_count: usize,
56 logical_page_size: NonZeroU16,
57 ) -> Self {
58 let logical_page_size = logical_page_size.get() as usize;
59 let page_size = logical_page_size + Checksum::SIZE;
60
61 Self {
62 blob,
63 page_size,
64 logical_page_size,
65 physical_blob_size,
66 logical_blob_size,
67 blob_page: 0,
68 prefetch_count,
69 }
70 }
71
72 pub(super) const fn blob_size(&self) -> u64 {
74 self.logical_blob_size
75 }
76
77 pub(super) const fn page_size(&self) -> usize {
79 self.page_size
80 }
81
82 pub(super) const fn logical_page_size(&self) -> usize {
84 self.logical_page_size
85 }
86
87 pub(super) async fn fill(&mut self) -> Result<Option<(BufferState, usize)>, Error> {
92 let start_offset = match self.blob_page.checked_mul(self.page_size as u64) {
94 Some(o) => o,
95 None => return Err(Error::OffsetOverflow),
96 };
97 if start_offset >= self.physical_blob_size {
98 return Ok(None); }
100
101 let remaining_physical = (self.physical_blob_size - start_offset) as usize;
103 let max_pages = remaining_physical / self.page_size;
104 let pages_to_read = max_pages.min(self.prefetch_count);
105 if pages_to_read == 0 {
106 return Ok(None);
107 }
108 let bytes_to_read = pages_to_read * self.page_size;
109
110 let physical_buf = self
112 .blob
113 .read_at(start_offset, bytes_to_read)
114 .await?
115 .coalesce()
116 .freeze();
117
118 let mut total_logical = 0usize;
120 let mut last_len = 0usize;
121 let is_final_batch = pages_to_read == max_pages;
122 for page_idx in 0..pages_to_read {
123 let page_start = page_idx * self.page_size;
124 let page_slice = &physical_buf.as_ref()[page_start..page_start + self.page_size];
125 let Some(record) = Checksum::validate_page(page_slice) else {
126 error!(page = self.blob_page + page_idx as u64, "CRC mismatch");
127 return Err(Error::InvalidChecksum);
128 };
129 let (len, _) = record.get_crc();
130 let len = len as usize;
131
132 let is_last_page_in_blob = is_final_batch && page_idx + 1 == pages_to_read;
134 if !is_last_page_in_blob && len != self.logical_page_size {
135 error!(
136 page = self.blob_page + page_idx as u64,
137 expected = self.logical_page_size,
138 actual = len,
139 "non-last page has partial length"
140 );
141 return Err(Error::InvalidChecksum);
142 }
143
144 total_logical += len;
145 last_len = len;
146 }
147 self.blob_page += pages_to_read as u64;
148
149 let state = BufferState {
150 buffer: physical_buf,
151 num_pages: pages_to_read,
152 last_page_len: last_len,
153 };
154
155 Ok(Some((state, total_logical)))
156 }
157}
158
159struct ReplayBuf {
165 page_size: usize,
167 logical_page_size: usize,
169 buffers: VecDeque<BufferState>,
171 current_page: usize,
173 offset_in_page: usize,
175 remaining: usize,
177}
178
179impl ReplayBuf {
180 const fn new(page_size: usize, logical_page_size: usize) -> Self {
182 Self {
183 page_size,
184 logical_page_size,
185 buffers: VecDeque::new(),
186 current_page: 0,
187 offset_in_page: 0,
188 remaining: 0,
189 }
190 }
191
192 fn clear(&mut self) {
194 self.buffers.clear();
195 self.current_page = 0;
196 self.offset_in_page = 0;
197 self.remaining = 0;
198 }
199
200 fn push(&mut self, state: BufferState, logical_bytes: usize) {
202 let skip = if self.buffers.is_empty() {
205 self.offset_in_page
206 } else {
207 0
208 };
209 self.buffers.push_back(state);
210 self.remaining += logical_bytes.saturating_sub(skip);
211 }
212
213 const fn page_len(buf: &BufferState, page_idx: usize, logical_page_size: usize) -> usize {
215 if page_idx + 1 == buf.num_pages {
216 buf.last_page_len
217 } else {
218 logical_page_size
219 }
220 }
221}
222
223impl Buf for ReplayBuf {
224 fn remaining(&self) -> usize {
225 self.remaining
226 }
227
228 fn chunk(&self) -> &[u8] {
229 let Some(buf) = self.buffers.front() else {
230 return &[];
231 };
232 if self.current_page >= buf.num_pages {
233 return &[];
234 }
235 let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
236 let physical_start = self.current_page * self.page_size + self.offset_in_page;
237 let physical_end = self.current_page * self.page_size + page_len;
238 &buf.buffer.as_ref()[physical_start..physical_end]
239 }
240
241 fn advance(&mut self, mut cnt: usize) {
242 self.remaining = self.remaining.saturating_sub(cnt);
243
244 while cnt > 0 {
245 let Some(buf) = self.buffers.front() else {
246 break;
247 };
248
249 while cnt > 0 && self.current_page < buf.num_pages {
251 let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
252 let available = page_len - self.offset_in_page;
253 if cnt < available {
254 self.offset_in_page += cnt;
255 return;
256 }
257 cnt -= available;
258 self.current_page += 1;
259 self.offset_in_page = 0;
260 }
261
262 if self.current_page >= buf.num_pages {
264 self.buffers.pop_front();
265 self.current_page = 0;
266 self.offset_in_page = 0;
267 }
268 }
269 }
270}
271
272pub struct Replay<B: Blob> {
277 reader: PageReader<B>,
279 buffer: ReplayBuf,
281 exhausted: bool,
283}
284
285impl<B: Blob> Replay<B> {
286 pub(super) const fn new(reader: PageReader<B>) -> Self {
288 let page_size = reader.page_size();
289 let logical_page_size = reader.logical_page_size();
290 Self {
291 reader,
292 buffer: ReplayBuf::new(page_size, logical_page_size),
293 exhausted: false,
294 }
295 }
296
297 pub const fn blob_size(&self) -> u64 {
299 self.reader.blob_size()
300 }
301
302 pub const fn is_exhausted(&self) -> bool {
307 self.exhausted
308 }
309
310 pub async fn ensure(&mut self, n: usize) -> Result<bool, Error> {
321 while self.buffer.remaining < n && !self.exhausted {
322 match self.reader.fill().await? {
323 Some((state, logical_bytes)) => {
324 self.buffer.push(state, logical_bytes);
325 }
326 None => {
327 self.exhausted = true;
328 }
329 }
330 }
331 Ok(self.buffer.remaining >= n)
332 }
333
334 pub fn seek_to(&mut self, offset: u64) -> Result<(), Error> {
337 if offset > self.reader.blob_size() {
338 return Err(Error::BlobInsufficientLength);
339 }
340
341 self.buffer.clear();
342 self.exhausted = false;
343
344 let page_size = self.reader.logical_page_size as u64;
345 self.reader.blob_page = offset / page_size;
346 self.buffer.current_page = 0;
347 self.buffer.offset_in_page = (offset % page_size) as usize;
348
349 Ok(())
350 }
351}
352
353impl<B: Blob> Buf for Replay<B> {
354 fn remaining(&self) -> usize {
355 self.buffer.remaining()
356 }
357
358 fn chunk(&self) -> &[u8] {
359 self.buffer.chunk()
360 }
361
362 fn advance(&mut self, cnt: usize) {
363 self.buffer.advance(cnt);
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::{super::append::Append, *};
370 use crate::{deterministic, Runner as _, Storage as _};
371 use commonware_macros::test_traced;
372 use commonware_utils::{NZUsize, NZU16};
373
374 const PAGE_SIZE: NonZeroU16 = NZU16!(103);
375 const BUFFER_PAGES: usize = 2;
376
377 #[test_traced("DEBUG")]
378 fn test_replay_basic() {
379 let executor = deterministic::Runner::default();
380 executor.start(|context: deterministic::Context| async move {
381 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
382 assert_eq!(blob_size, 0);
383
384 let cache_ref =
385 super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
386 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
387 .await
388 .unwrap();
389
390 let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
392 append.append(&data).await.unwrap();
393 append.sync().await.unwrap();
394
395 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
397
398 replay.ensure(300).await.unwrap();
400
401 assert_eq!(replay.remaining(), 300);
403
404 let mut collected = Vec::new();
406 while replay.remaining() > 0 {
407 let chunk = replay.chunk();
408 collected.extend_from_slice(chunk);
409 let len = chunk.len();
410 replay.advance(len);
411 }
412 assert_eq!(collected, data);
413 });
414 }
415
416 #[test_traced("DEBUG")]
417 fn test_replay_partial_page() {
418 let executor = deterministic::Runner::default();
419 executor.start(|context: deterministic::Context| async move {
420 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
421
422 let cache_ref =
423 super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
424 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
425 .await
426 .unwrap();
427
428 let data: Vec<u8> = (1u8..=(PAGE_SIZE.get() + 10) as u8).collect();
430 append.append(&data).await.unwrap();
431 append.sync().await.unwrap();
432
433 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
434
435 replay.ensure(data.len()).await.unwrap();
437
438 assert_eq!(replay.remaining(), data.len());
439 });
440 }
441
442 #[test_traced("DEBUG")]
443 fn test_replay_cross_buffer_boundary() {
444 let executor = deterministic::Runner::default();
447 executor.start(|context: deterministic::Context| async move {
448 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
449 assert_eq!(blob_size, 0);
450
451 let cache_ref =
452 super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
453 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
454 .await
455 .unwrap();
456
457 let data: Vec<u8> = (0u8..=255).cycle().take(400).collect();
459 append.append(&data).await.unwrap();
460 append.sync().await.unwrap();
461
462 let mut replay = append.replay(NZUsize!(115)).await.unwrap();
466
467 assert!(replay.ensure(400).await.unwrap());
470 assert_eq!(replay.remaining(), 400);
471
472 let mut collected = Vec::new();
474 let mut chunks_read = 0;
475 while replay.remaining() > 0 {
476 let chunk = replay.chunk();
477 assert!(
478 !chunk.is_empty(),
479 "chunk() returned empty but remaining > 0"
480 );
481 collected.extend_from_slice(chunk);
482 let len = chunk.len();
483 replay.advance(len);
484 chunks_read += 1;
485 }
486
487 assert_eq!(collected, data);
488 assert!(
491 chunks_read >= 4,
492 "Expected at least 4 chunks for 4 pages, got {}",
493 chunks_read
494 );
495 });
496 }
497
498 #[test_traced("DEBUG")]
499 fn test_replay_empty_blob() {
500 let executor = deterministic::Runner::default();
503 executor.start(|context: deterministic::Context| async move {
504 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
505 assert_eq!(blob_size, 0);
506
507 let cache_ref =
508 super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
509 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
510 .await
511 .unwrap();
512
513 assert_eq!(append.size().await, 0);
515
516 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
518
519 assert_eq!(replay.remaining(), 0);
522
523 assert!(replay.ensure(0).await.unwrap());
525
526 assert!(!replay.ensure(1).await.unwrap());
528
529 assert!(replay.is_exhausted());
531
532 assert!(replay.chunk().is_empty());
534
535 assert_eq!(replay.remaining(), 0);
537 });
538 }
539
540 #[test_traced("DEBUG")]
541 fn test_replay_seek_to() {
542 let executor = deterministic::Runner::default();
543 executor.start(|context: deterministic::Context| async move {
544 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
545
546 let cache_ref =
547 super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
548 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
549 .await
550 .unwrap();
551
552 let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
554 append.append(&data).await.unwrap();
555 append.sync().await.unwrap();
556
557 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
558
559 replay.seek_to(150).unwrap();
561 replay.ensure(50).await.unwrap();
562 assert_eq!(replay.get_u8(), data[150]);
563
564 replay.seek_to(0).unwrap();
566 replay.ensure(1).await.unwrap();
567 assert_eq!(replay.get_u8(), data[0]);
568
569 assert!(replay.seek_to(data.len() as u64 + 1).is_err());
571
572 let seek_offset = 150usize;
574 replay.seek_to(seek_offset as u64).unwrap();
575 let expected_remaining = data.len() - seek_offset;
576 let mut collected = Vec::new();
578 loop {
579 if !replay.ensure(1).await.unwrap() {
581 break; }
583 let chunk = replay.chunk();
584 if chunk.is_empty() {
585 break;
586 }
587 collected.extend_from_slice(chunk);
588 let len = chunk.len();
589 replay.advance(len);
590 }
591 assert_eq!(
592 collected.len(),
593 expected_remaining,
594 "After seeking to {}, should read {} bytes but got {}",
595 seek_offset,
596 expected_remaining,
597 collected.len()
598 );
599 assert_eq!(collected, &data[seek_offset..]);
600 });
601 }
602}