commonware_runtime/utils/buffer/pool/
read.rs1use super::Checksum;
2use crate::{Blob, Error};
3use bytes::Buf;
4use commonware_codec::FixedSize;
5use std::{collections::VecDeque, num::NonZeroU16};
6use tracing::error;
7
8pub(super) struct BufferState {
14 buffer: Vec<u8>,
16 num_pages: usize,
18 last_page_len: usize,
20}
21
22pub(super) struct PageReader<B: Blob> {
27 blob: B,
29 page_size: usize,
31 logical_page_size: usize,
33 physical_blob_size: u64,
35 logical_blob_size: u64,
37 blob_page: u64,
39 prefetch_count: usize,
41}
42
43impl<B: Blob> PageReader<B> {
44 pub(super) const fn new(
53 blob: B,
54 physical_blob_size: u64,
55 logical_blob_size: u64,
56 prefetch_count: usize,
57 logical_page_size: NonZeroU16,
58 ) -> Self {
59 let logical_page_size = logical_page_size.get() as usize;
60 let page_size = logical_page_size + Checksum::SIZE;
61
62 Self {
63 blob,
64 page_size,
65 logical_page_size,
66 physical_blob_size,
67 logical_blob_size,
68 blob_page: 0,
69 prefetch_count,
70 }
71 }
72
73 pub(super) const fn blob_size(&self) -> u64 {
75 self.logical_blob_size
76 }
77
78 pub(super) const fn page_size(&self) -> usize {
80 self.page_size
81 }
82
83 pub(super) const fn logical_page_size(&self) -> usize {
85 self.logical_page_size
86 }
87
88 pub(super) async fn fill(&mut self) -> Result<Option<(BufferState, usize)>, Error> {
93 let start_offset = match self.blob_page.checked_mul(self.page_size as u64) {
95 Some(o) => o,
96 None => return Err(Error::OffsetOverflow),
97 };
98 if start_offset >= self.physical_blob_size {
99 return Ok(None); }
101
102 let remaining_physical = (self.physical_blob_size - start_offset) as usize;
104 let max_pages = remaining_physical / self.page_size;
105 let pages_to_read = max_pages.min(self.prefetch_count);
106 if pages_to_read == 0 {
107 return Ok(None);
108 }
109 let bytes_to_read = pages_to_read * self.page_size;
110
111 let physical_buf: Vec<u8> = self
113 .blob
114 .read_at(vec![0u8; bytes_to_read], start_offset)
115 .await?
116 .into();
117
118 let mut total_logical = 0usize;
120 let mut last_len = 0usize;
121 let is_final_batch = pages_to_read == max_pages;
122 for page_idx in 0..pages_to_read {
123 let page_start = page_idx * self.page_size;
124 let page_slice = &physical_buf[page_start..page_start + self.page_size];
125 let Some(record) = Checksum::validate_page(page_slice) else {
126 error!(page = self.blob_page + page_idx as u64, "CRC mismatch");
127 return Err(Error::InvalidChecksum);
128 };
129 let (len, _) = record.get_crc();
130 let len = len as usize;
131
132 let is_last_page_in_blob = is_final_batch && page_idx + 1 == pages_to_read;
134 if !is_last_page_in_blob && len != self.logical_page_size {
135 error!(
136 page = self.blob_page + page_idx as u64,
137 expected = self.logical_page_size,
138 actual = len,
139 "non-last page has partial length"
140 );
141 return Err(Error::InvalidChecksum);
142 }
143
144 total_logical += len;
145 last_len = len;
146 }
147 self.blob_page += pages_to_read as u64;
148
149 let state = BufferState {
150 buffer: physical_buf,
151 num_pages: pages_to_read,
152 last_page_len: last_len,
153 };
154
155 Ok(Some((state, total_logical)))
156 }
157}
158
159struct ReplayBuf {
165 page_size: usize,
167 logical_page_size: usize,
169 buffers: VecDeque<BufferState>,
171 current_page: usize,
173 offset_in_page: usize,
175 remaining: usize,
177}
178
179impl ReplayBuf {
180 const fn new(page_size: usize, logical_page_size: usize) -> Self {
182 Self {
183 page_size,
184 logical_page_size,
185 buffers: VecDeque::new(),
186 current_page: 0,
187 offset_in_page: 0,
188 remaining: 0,
189 }
190 }
191
192 fn clear(&mut self) {
194 self.buffers.clear();
195 self.current_page = 0;
196 self.offset_in_page = 0;
197 self.remaining = 0;
198 }
199
200 fn push(&mut self, state: BufferState, logical_bytes: usize) {
202 let skip = if self.buffers.is_empty() {
205 self.offset_in_page
206 } else {
207 0
208 };
209 self.buffers.push_back(state);
210 self.remaining += logical_bytes.saturating_sub(skip);
211 }
212
213 const fn page_len(buf: &BufferState, page_idx: usize, logical_page_size: usize) -> usize {
215 if page_idx + 1 == buf.num_pages {
216 buf.last_page_len
217 } else {
218 logical_page_size
219 }
220 }
221}
222
223impl Buf for ReplayBuf {
224 fn remaining(&self) -> usize {
225 self.remaining
226 }
227
228 fn chunk(&self) -> &[u8] {
229 let Some(buf) = self.buffers.front() else {
230 return &[];
231 };
232 if self.current_page >= buf.num_pages {
233 return &[];
234 }
235 let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
236 let physical_start = self.current_page * self.page_size + self.offset_in_page;
237 let physical_end = self.current_page * self.page_size + page_len;
238 &buf.buffer[physical_start..physical_end]
239 }
240
241 fn advance(&mut self, mut cnt: usize) {
242 self.remaining = self.remaining.saturating_sub(cnt);
243
244 while cnt > 0 {
245 let Some(buf) = self.buffers.front() else {
246 break;
247 };
248
249 while cnt > 0 && self.current_page < buf.num_pages {
251 let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
252 let available = page_len - self.offset_in_page;
253 if cnt < available {
254 self.offset_in_page += cnt;
255 return;
256 }
257 cnt -= available;
258 self.current_page += 1;
259 self.offset_in_page = 0;
260 }
261
262 if self.current_page >= buf.num_pages {
264 self.buffers.pop_front();
265 self.current_page = 0;
266 self.offset_in_page = 0;
267 }
268 }
269 }
270}
271
272pub struct Replay<B: Blob> {
277 reader: PageReader<B>,
279 buffer: ReplayBuf,
281 exhausted: bool,
283}
284
285impl<B: Blob> Replay<B> {
286 pub(super) const fn new(reader: PageReader<B>) -> Self {
288 let page_size = reader.page_size();
289 let logical_page_size = reader.logical_page_size();
290 Self {
291 reader,
292 buffer: ReplayBuf::new(page_size, logical_page_size),
293 exhausted: false,
294 }
295 }
296
297 pub const fn blob_size(&self) -> u64 {
299 self.reader.blob_size()
300 }
301
302 pub const fn is_exhausted(&self) -> bool {
307 self.exhausted
308 }
309
310 pub async fn ensure(&mut self, n: usize) -> Result<bool, Error> {
321 while self.buffer.remaining < n && !self.exhausted {
322 match self.reader.fill().await? {
323 Some((state, logical_bytes)) => {
324 self.buffer.push(state, logical_bytes);
325 }
326 None => {
327 self.exhausted = true;
328 }
329 }
330 }
331 Ok(self.buffer.remaining >= n)
332 }
333
334 pub async fn seek_to(&mut self, offset: u64) -> Result<(), Error> {
337 if offset > self.reader.blob_size() {
338 return Err(Error::BlobInsufficientLength);
339 }
340
341 self.buffer.clear();
342 self.exhausted = false;
343
344 let page_size = self.reader.logical_page_size as u64;
345 self.reader.blob_page = offset / page_size;
346 self.buffer.current_page = 0;
347 self.buffer.offset_in_page = (offset % page_size) as usize;
348
349 Ok(())
350 }
351}
352
353impl<B: Blob> Buf for Replay<B> {
354 fn remaining(&self) -> usize {
355 self.buffer.remaining()
356 }
357
358 fn chunk(&self) -> &[u8] {
359 self.buffer.chunk()
360 }
361
362 fn advance(&mut self, cnt: usize) {
363 self.buffer.advance(cnt);
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::{super::append::Append, *};
370 use crate::{deterministic, Runner as _, Storage as _};
371 use commonware_macros::test_traced;
372 use commonware_utils::{NZUsize, NZU16};
373
374 const PAGE_SIZE: NonZeroU16 = NZU16!(103);
375 const BUFFER_PAGES: usize = 2;
376
377 #[test_traced("DEBUG")]
378 fn test_replay_basic() {
379 let executor = deterministic::Runner::default();
380 executor.start(|context: deterministic::Context| async move {
381 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
382 assert_eq!(blob_size, 0);
383
384 let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
385 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
386 .await
387 .unwrap();
388
389 let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
391 append.append(&data).await.unwrap();
392 append.sync().await.unwrap();
393
394 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
396
397 replay.ensure(300).await.unwrap();
399
400 assert_eq!(replay.remaining(), 300);
402
403 let mut collected = Vec::new();
405 while replay.remaining() > 0 {
406 let chunk = replay.chunk();
407 collected.extend_from_slice(chunk);
408 let len = chunk.len();
409 replay.advance(len);
410 }
411 assert_eq!(collected, data);
412 });
413 }
414
415 #[test_traced("DEBUG")]
416 fn test_replay_partial_page() {
417 let executor = deterministic::Runner::default();
418 executor.start(|context: deterministic::Context| async move {
419 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
420
421 let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
422 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
423 .await
424 .unwrap();
425
426 let data: Vec<u8> = (1u8..=(PAGE_SIZE.get() + 10) as u8).collect();
428 append.append(&data).await.unwrap();
429 append.sync().await.unwrap();
430
431 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
432
433 replay.ensure(data.len()).await.unwrap();
435
436 assert_eq!(replay.remaining(), data.len());
437 });
438 }
439
440 #[test_traced("DEBUG")]
441 fn test_replay_cross_buffer_boundary() {
442 let executor = deterministic::Runner::default();
445 executor.start(|context: deterministic::Context| async move {
446 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
447 assert_eq!(blob_size, 0);
448
449 let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
450 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
451 .await
452 .unwrap();
453
454 let data: Vec<u8> = (0u8..=255).cycle().take(400).collect();
456 append.append(&data).await.unwrap();
457 append.sync().await.unwrap();
458
459 let mut replay = append.replay(NZUsize!(115)).await.unwrap();
463
464 assert!(replay.ensure(400).await.unwrap());
467 assert_eq!(replay.remaining(), 400);
468
469 let mut collected = Vec::new();
471 let mut chunks_read = 0;
472 while replay.remaining() > 0 {
473 let chunk = replay.chunk();
474 assert!(
475 !chunk.is_empty(),
476 "chunk() returned empty but remaining > 0"
477 );
478 collected.extend_from_slice(chunk);
479 let len = chunk.len();
480 replay.advance(len);
481 chunks_read += 1;
482 }
483
484 assert_eq!(collected, data);
485 assert!(
488 chunks_read >= 4,
489 "Expected at least 4 chunks for 4 pages, got {}",
490 chunks_read
491 );
492 });
493 }
494
495 #[test_traced("DEBUG")]
496 fn test_replay_empty_blob() {
497 let executor = deterministic::Runner::default();
500 executor.start(|context: deterministic::Context| async move {
501 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
502 assert_eq!(blob_size, 0);
503
504 let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
505 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
506 .await
507 .unwrap();
508
509 assert_eq!(append.size().await, 0);
511
512 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
514
515 assert_eq!(replay.remaining(), 0);
518
519 assert!(replay.ensure(0).await.unwrap());
521
522 assert!(!replay.ensure(1).await.unwrap());
524
525 assert!(replay.is_exhausted());
527
528 assert!(replay.chunk().is_empty());
530
531 assert_eq!(replay.remaining(), 0);
533 });
534 }
535
536 #[test_traced("DEBUG")]
537 fn test_replay_seek_to() {
538 let executor = deterministic::Runner::default();
539 executor.start(|context: deterministic::Context| async move {
540 let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
541
542 let pool_ref = super::super::PoolRef::new(PAGE_SIZE, NZUsize!(BUFFER_PAGES));
543 let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, pool_ref)
544 .await
545 .unwrap();
546
547 let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
549 append.append(&data).await.unwrap();
550 append.sync().await.unwrap();
551
552 let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
553
554 replay.seek_to(150).await.unwrap();
556 replay.ensure(50).await.unwrap();
557 assert_eq!(replay.chunk()[0], data[150]);
558
559 replay.seek_to(0).await.unwrap();
561 replay.ensure(1).await.unwrap();
562 assert_eq!(replay.chunk()[0], data[0]);
563
564 assert!(replay.seek_to(data.len() as u64 + 1).await.is_err());
566
567 let seek_offset = 150usize;
569 replay.seek_to(seek_offset as u64).await.unwrap();
570 let expected_remaining = data.len() - seek_offset;
571 let mut collected = Vec::new();
573 loop {
574 if !replay.ensure(1).await.unwrap() {
576 break; }
578 let chunk = replay.chunk();
579 if chunk.is_empty() {
580 break;
581 }
582 collected.extend_from_slice(chunk);
583 let len = chunk.len();
584 replay.advance(len);
585 }
586 assert_eq!(
587 collected.len(),
588 expected_remaining,
589 "After seeking to {}, should read {} bytes but got {}",
590 seek_offset,
591 expected_remaining,
592 collected.len()
593 );
594 assert_eq!(collected, &data[seek_offset..]);
595 });
596 }
597}