use super::Checksum;
use crate::{Blob, Buf, Error, IoBuf};
use commonware_codec::FixedSize;
use std::{collections::VecDeque, num::NonZeroU16};
use tracing::error;
pub(super) struct BufferState {
buffer: IoBuf,
num_pages: usize,
last_page_len: usize,
}
pub(super) struct PageReader<B: Blob> {
blob: B,
page_size: usize,
logical_page_size: usize,
physical_blob_size: u64,
logical_blob_size: u64,
blob_page: u64,
prefetch_count: usize,
}
impl<B: Blob> PageReader<B> {
pub(super) const fn new(
blob: B,
physical_blob_size: u64,
logical_blob_size: u64,
prefetch_count: usize,
logical_page_size: NonZeroU16,
) -> Self {
let logical_page_size = logical_page_size.get() as usize;
let page_size = logical_page_size + Checksum::SIZE;
Self {
blob,
page_size,
logical_page_size,
physical_blob_size,
logical_blob_size,
blob_page: 0,
prefetch_count,
}
}
pub(super) const fn blob_size(&self) -> u64 {
self.logical_blob_size
}
pub(super) const fn page_size(&self) -> usize {
self.page_size
}
pub(super) const fn logical_page_size(&self) -> usize {
self.logical_page_size
}
pub(super) async fn fill(&mut self) -> Result<Option<(BufferState, usize)>, Error> {
let start_offset = match self.blob_page.checked_mul(self.page_size as u64) {
Some(o) => o,
None => return Err(Error::OffsetOverflow),
};
if start_offset >= self.physical_blob_size {
return Ok(None); }
let remaining_physical = (self.physical_blob_size - start_offset) as usize;
let max_pages = remaining_physical / self.page_size;
let pages_to_read = max_pages.min(self.prefetch_count);
if pages_to_read == 0 {
return Ok(None);
}
let bytes_to_read = pages_to_read * self.page_size;
let physical_buf = self
.blob
.read_at(start_offset, bytes_to_read)
.await?
.coalesce()
.freeze();
let mut total_logical = 0usize;
let mut last_len = 0usize;
let is_final_batch = pages_to_read == max_pages;
for page_idx in 0..pages_to_read {
let page_start = page_idx * self.page_size;
let page_slice = &physical_buf.as_ref()[page_start..page_start + self.page_size];
let Some(record) = Checksum::validate_page(page_slice) else {
error!(page = self.blob_page + page_idx as u64, "CRC mismatch");
return Err(Error::InvalidChecksum);
};
let (len, _) = record.get_crc();
let len = len as usize;
let is_last_page_in_blob = is_final_batch && page_idx + 1 == pages_to_read;
if !is_last_page_in_blob && len != self.logical_page_size {
error!(
page = self.blob_page + page_idx as u64,
expected = self.logical_page_size,
actual = len,
"non-last page has partial length"
);
return Err(Error::InvalidChecksum);
}
total_logical += len;
last_len = len;
}
self.blob_page += pages_to_read as u64;
let state = BufferState {
buffer: physical_buf,
num_pages: pages_to_read,
last_page_len: last_len,
};
Ok(Some((state, total_logical)))
}
}
struct ReplayBuf {
page_size: usize,
logical_page_size: usize,
buffers: VecDeque<BufferState>,
current_page: usize,
offset_in_page: usize,
remaining: usize,
}
impl ReplayBuf {
const fn new(page_size: usize, logical_page_size: usize) -> Self {
Self {
page_size,
logical_page_size,
buffers: VecDeque::new(),
current_page: 0,
offset_in_page: 0,
remaining: 0,
}
}
fn clear(&mut self) {
self.buffers.clear();
self.current_page = 0;
self.offset_in_page = 0;
self.remaining = 0;
}
fn push(&mut self, state: BufferState, logical_bytes: usize) {
let skip = if self.buffers.is_empty() {
self.offset_in_page
} else {
0
};
self.buffers.push_back(state);
self.remaining += logical_bytes.saturating_sub(skip);
}
const fn page_len(buf: &BufferState, page_idx: usize, logical_page_size: usize) -> usize {
if page_idx + 1 == buf.num_pages {
buf.last_page_len
} else {
logical_page_size
}
}
}
impl Buf for ReplayBuf {
fn remaining(&self) -> usize {
self.remaining
}
fn chunk(&self) -> &[u8] {
let Some(buf) = self.buffers.front() else {
return &[];
};
if self.current_page >= buf.num_pages {
return &[];
}
let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
let physical_start = self.current_page * self.page_size + self.offset_in_page;
let physical_end = self.current_page * self.page_size + page_len;
&buf.buffer.as_ref()[physical_start..physical_end]
}
fn advance(&mut self, mut cnt: usize) {
self.remaining = self.remaining.saturating_sub(cnt);
while cnt > 0 {
let Some(buf) = self.buffers.front() else {
break;
};
while cnt > 0 && self.current_page < buf.num_pages {
let page_len = Self::page_len(buf, self.current_page, self.logical_page_size);
let available = page_len - self.offset_in_page;
if cnt < available {
self.offset_in_page += cnt;
return;
}
cnt -= available;
self.current_page += 1;
self.offset_in_page = 0;
}
if self.current_page >= buf.num_pages {
self.buffers.pop_front();
self.current_page = 0;
self.offset_in_page = 0;
}
}
}
}
pub struct Replay<B: Blob> {
reader: PageReader<B>,
buffer: ReplayBuf,
exhausted: bool,
}
impl<B: Blob> Replay<B> {
pub(super) const fn new(reader: PageReader<B>) -> Self {
let page_size = reader.page_size();
let logical_page_size = reader.logical_page_size();
Self {
reader,
buffer: ReplayBuf::new(page_size, logical_page_size),
exhausted: false,
}
}
pub const fn blob_size(&self) -> u64 {
self.reader.blob_size()
}
pub const fn is_exhausted(&self) -> bool {
self.exhausted
}
pub async fn ensure(&mut self, n: usize) -> Result<bool, Error> {
while self.buffer.remaining < n && !self.exhausted {
match self.reader.fill().await? {
Some((state, logical_bytes)) => {
self.buffer.push(state, logical_bytes);
}
None => {
self.exhausted = true;
}
}
}
Ok(self.buffer.remaining >= n)
}
pub fn seek_to(&mut self, offset: u64) -> Result<(), Error> {
if offset > self.reader.blob_size() {
return Err(Error::BlobInsufficientLength);
}
self.buffer.clear();
self.exhausted = false;
let page_size = self.reader.logical_page_size as u64;
self.reader.blob_page = offset / page_size;
self.buffer.current_page = 0;
self.buffer.offset_in_page = (offset % page_size) as usize;
Ok(())
}
}
impl<B: Blob> Buf for Replay<B> {
fn remaining(&self) -> usize {
self.buffer.remaining()
}
fn chunk(&self) -> &[u8] {
self.buffer.chunk()
}
fn advance(&mut self, cnt: usize) {
self.buffer.advance(cnt);
}
}
#[cfg(test)]
mod tests {
use super::{super::append::Append, *};
use crate::{deterministic, Runner as _, Storage as _};
use commonware_macros::test_traced;
use commonware_utils::{NZUsize, NZU16};
const PAGE_SIZE: NonZeroU16 = NZU16!(103);
const BUFFER_PAGES: usize = 2;
#[test_traced("DEBUG")]
fn test_replay_basic() {
let executor = deterministic::Runner::default();
executor.start(|context: deterministic::Context| async move {
let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
assert_eq!(blob_size, 0);
let cache_ref =
super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
.await
.unwrap();
let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
append.append(&data).await.unwrap();
append.sync().await.unwrap();
let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
replay.ensure(300).await.unwrap();
assert_eq!(replay.remaining(), 300);
let mut collected = Vec::new();
while replay.remaining() > 0 {
let chunk = replay.chunk();
collected.extend_from_slice(chunk);
let len = chunk.len();
replay.advance(len);
}
assert_eq!(collected, data);
});
}
#[test_traced("DEBUG")]
fn test_replay_partial_page() {
let executor = deterministic::Runner::default();
executor.start(|context: deterministic::Context| async move {
let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
let cache_ref =
super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
.await
.unwrap();
let data: Vec<u8> = (1u8..=(PAGE_SIZE.get() + 10) as u8).collect();
append.append(&data).await.unwrap();
append.sync().await.unwrap();
let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
replay.ensure(data.len()).await.unwrap();
assert_eq!(replay.remaining(), data.len());
});
}
#[test_traced("DEBUG")]
fn test_replay_cross_buffer_boundary() {
let executor = deterministic::Runner::default();
executor.start(|context: deterministic::Context| async move {
let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
assert_eq!(blob_size, 0);
let cache_ref =
super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
.await
.unwrap();
let data: Vec<u8> = (0u8..=255).cycle().take(400).collect();
append.append(&data).await.unwrap();
append.sync().await.unwrap();
let mut replay = append.replay(NZUsize!(115)).await.unwrap();
assert!(replay.ensure(400).await.unwrap());
assert_eq!(replay.remaining(), 400);
let mut collected = Vec::new();
let mut chunks_read = 0;
while replay.remaining() > 0 {
let chunk = replay.chunk();
assert!(
!chunk.is_empty(),
"chunk() returned empty but remaining > 0"
);
collected.extend_from_slice(chunk);
let len = chunk.len();
replay.advance(len);
chunks_read += 1;
}
assert_eq!(collected, data);
assert!(
chunks_read >= 4,
"Expected at least 4 chunks for 4 pages, got {}",
chunks_read
);
});
}
#[test_traced("DEBUG")]
fn test_replay_empty_blob() {
let executor = deterministic::Runner::default();
executor.start(|context: deterministic::Context| async move {
let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
assert_eq!(blob_size, 0);
let cache_ref =
super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
.await
.unwrap();
assert_eq!(append.size().await, 0);
let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
assert_eq!(replay.remaining(), 0);
assert!(replay.ensure(0).await.unwrap());
assert!(!replay.ensure(1).await.unwrap());
assert!(replay.is_exhausted());
assert!(replay.chunk().is_empty());
assert_eq!(replay.remaining(), 0);
});
}
#[test_traced("DEBUG")]
fn test_replay_seek_to() {
let executor = deterministic::Runner::default();
executor.start(|context: deterministic::Context| async move {
let (blob, blob_size) = context.open("test_partition", b"test_blob").await.unwrap();
let cache_ref =
super::super::CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_PAGES));
let append = Append::new(blob.clone(), blob_size, BUFFER_PAGES * 115, cache_ref)
.await
.unwrap();
let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
append.append(&data).await.unwrap();
append.sync().await.unwrap();
let mut replay = append.replay(NZUsize!(BUFFER_PAGES)).await.unwrap();
replay.seek_to(150).unwrap();
replay.ensure(50).await.unwrap();
assert_eq!(replay.get_u8(), data[150]);
replay.seek_to(0).unwrap();
replay.ensure(1).await.unwrap();
assert_eq!(replay.get_u8(), data[0]);
assert!(replay.seek_to(data.len() as u64 + 1).is_err());
let seek_offset = 150usize;
replay.seek_to(seek_offset as u64).unwrap();
let expected_remaining = data.len() - seek_offset;
let mut collected = Vec::new();
loop {
if !replay.ensure(1).await.unwrap() {
break; }
let chunk = replay.chunk();
if chunk.is_empty() {
break;
}
collected.extend_from_slice(chunk);
let len = chunk.len();
replay.advance(len);
}
assert_eq!(
collected.len(),
expected_remaining,
"After seeking to {}, should read {} bytes but got {}",
seek_offset,
expected_remaining,
collected.len()
);
assert_eq!(collected, &data[seek_offset..]);
});
}
}