use std::{
fs::File,
io::{Read, Seek, SeekFrom},
path::Path,
};
use fwob_core::{Key, KeyType, OwnedFrame};
use crate::{
encoding::decode_page_payload,
file_header::{read_file_header, FileHeader},
page::PageHeader,
Result, V2Error,
};
pub struct Reader<R> {
inner: R,
header: FileHeader,
key_type: KeyType,
cached_page: Option<CachedPage>,
}
struct CachedPage {
index: u64,
header: PageHeader,
raw: Vec<u8>,
}
impl Reader<File> {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let file = File::open(path)?;
Self::new(file)
}
}
impl<R: Read + Seek> Reader<R> {
pub fn new(mut inner: R) -> Result<Self> {
let header = read_file_header(&mut inner)?;
let key_type = KeyType::from_field(header.schema.key_field())?;
Ok(Self {
inner,
header,
key_type,
cached_page: None,
})
}
pub fn header(&self) -> &FileHeader {
&self.header
}
pub fn read_page_header(&mut self, page_index: u64) -> Result<PageHeader> {
if page_index >= self.header.page_count {
return Err(V2Error::InvalidPageHeader(page_index));
}
self.inner
.seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
PageHeader::read(&mut self.inner, page_index)
}
pub fn read_page_frames(&mut self, page_index: u64) -> Result<Vec<OwnedFrame>> {
self.load_page(page_index)?;
let raw = &self.cached_page.as_ref().expect("page loaded").raw;
let frame_len = self.header.schema.frame_len as usize;
let mut frames = Vec::with_capacity(raw.len() / frame_len);
for chunk in raw.chunks_exact(frame_len) {
frames.push(OwnedFrame::new(&self.header.schema, chunk.to_vec())?);
}
Ok(frames)
}
pub fn read_page_raw_frames(&mut self, page_index: u64) -> Result<Vec<u8>> {
self.load_page(page_index)?;
Ok(self.cached_page.as_ref().expect("page loaded").raw.clone())
}
pub fn read_frame_at(&mut self, index: u64) -> Result<Option<OwnedFrame>> {
let Some(page_index) = self.page_for_index_with_cache(index)? else {
return Ok(None);
};
self.load_page(page_index)?;
let cached = self.cached_page.as_ref().expect("page loaded");
let local_index = (index - cached.header.first_frame_index) as usize;
let frame_len = self.header.schema.frame_len as usize;
let offset = local_index * frame_len;
Ok(Some(OwnedFrame::new(
&self.header.schema,
cached.raw[offset..offset + frame_len].to_vec(),
)?))
}
pub fn first_frame(&mut self) -> Result<Option<OwnedFrame>> {
if self.header.page_count == 0 {
return Ok(None);
}
self.read_frame_from_page(0, 0).map(Some)
}
pub fn last_frame(&mut self) -> Result<Option<OwnedFrame>> {
if self.header.page_count == 0 {
return Ok(None);
}
let page_index = self.header.page_count - 1;
let page = self.read_page_header(page_index)?;
let local_index = page
.frame_count
.checked_sub(1)
.ok_or(V2Error::InvalidPageHeader(page_index))? as usize;
self.read_frame_from_page(page_index, local_index).map(Some)
}
pub fn read_key_at(&mut self, index: u64) -> Result<Option<Key>> {
let Some(page_index) = self.page_for_index_with_cache(index)? else {
return Ok(None);
};
self.load_page(page_index)?;
let cached = self.cached_page.as_ref().expect("page loaded");
let local_index = index - cached.header.first_frame_index;
Ok(Some(self.cached_key(local_index as usize)?))
}
pub fn first_key(&mut self) -> Result<Option<Key>> {
if self.header.page_count == 0 {
Ok(None)
} else {
Ok(Some(self.read_page_header(0)?.first_key))
}
}
pub fn last_key(&mut self) -> Result<Option<Key>> {
if self.header.page_count == 0 {
Ok(None)
} else {
Ok(Some(
self.read_page_header(self.header.page_count - 1)?.last_key,
))
}
}
pub fn find_page_for_index(&mut self, index: u64) -> Result<Option<u64>> {
if index >= self.header.frame_count || self.header.page_count == 0 {
return Ok(None);
}
let mut lo = 0;
let mut hi = self.header.page_count;
while lo < hi {
let mid = lo + ((hi - lo) >> 1);
let page = self.read_page_header(mid)?;
if page.first_frame_index <= index {
lo = mid + 1;
} else {
hi = mid;
}
}
let page_index = lo.saturating_sub(1);
let page = self.read_page_header(page_index)?;
if index < page.first_frame_index + u64::from(page.frame_count) {
Ok(Some(page_index))
} else {
Err(V2Error::InvalidPageHeader(page_index))
}
}
fn page_for_index_with_cache(&mut self, index: u64) -> Result<Option<u64>> {
if index >= self.header.frame_count {
return Ok(None);
}
if let Some(cached) = &self.cached_page {
let start = cached.header.first_frame_index;
let end = start + u64::from(cached.header.frame_count);
if (start..end).contains(&index) {
return Ok(Some(cached.index));
}
if index == end && cached.index + 1 < self.header.page_count {
return Ok(Some(cached.index + 1));
}
}
self.find_page_for_index(index)
}
pub fn lower_bound(&mut self, key: Key) -> Result<u64> {
let mut lo = 0;
let mut hi = self.header.page_count;
while lo < hi {
let mid = lo + ((hi - lo) >> 1);
if self.read_page_header(mid)?.last_key < key {
lo = mid + 1;
} else {
hi = mid;
}
}
if lo == self.header.page_count {
return Ok(self.header.frame_count);
}
self.load_page(lo)?;
let cached = self.cached_page.as_ref().expect("page loaded");
let mut frame_lo = 0usize;
let mut frame_hi = cached.header.frame_count as usize;
while frame_lo < frame_hi {
let mid = frame_lo + ((frame_hi - frame_lo) >> 1);
if self.cached_key(mid)? < key {
frame_lo = mid + 1;
} else {
frame_hi = mid;
}
}
Ok(cached.header.first_frame_index + frame_lo as u64)
}
pub fn upper_bound(&mut self, key: Key) -> Result<u64> {
let mut lo = 0;
let mut hi = self.header.page_count;
while lo < hi {
let mid = lo + ((hi - lo) >> 1);
if self.read_page_header(mid)?.last_key <= key {
lo = mid + 1;
} else {
hi = mid;
}
}
if lo == self.header.page_count {
return Ok(self.header.frame_count);
}
self.load_page(lo)?;
let cached = self.cached_page.as_ref().expect("page loaded");
let mut frame_lo = 0usize;
let mut frame_hi = cached.header.frame_count as usize;
while frame_lo < frame_hi {
let mid = frame_lo + ((frame_hi - frame_lo) >> 1);
if self.cached_key(mid)? <= key {
frame_lo = mid + 1;
} else {
frame_hi = mid;
}
}
Ok(cached.header.first_frame_index + frame_lo as u64)
}
pub fn equal_range(&mut self, key: Key) -> Result<(u64, u64)> {
let Some(lower_page) = self.first_page_with_last_key(key, false)? else {
return Ok((self.header.frame_count, self.header.frame_count));
};
self.load_page(lower_page)?;
let lower = self.bound_in_cached_page(key, false)?;
let cached = self.cached_page.as_ref().expect("page loaded");
if cached.header.last_key > key {
let upper = self.bound_in_cached_page(key, true)?;
return Ok((lower, upper));
}
let Some(upper_page) = self.first_page_with_last_key_from(key, true, lower_page + 1)?
else {
return Ok((lower, self.header.frame_count));
};
self.load_page(upper_page)?;
let upper = self.bound_in_cached_page(key, true)?;
Ok((lower, upper))
}
fn first_page_with_last_key(&mut self, key: Key, strict: bool) -> Result<Option<u64>> {
self.first_page_with_last_key_from(key, strict, 0)
}
fn first_page_with_last_key_from(
&mut self,
key: Key,
strict: bool,
start: u64,
) -> Result<Option<u64>> {
let mut lo = start;
let mut hi = self.header.page_count;
while lo < hi {
let mid = lo + ((hi - lo) >> 1);
let last_key = self.read_page_header(mid)?.last_key;
if last_key < key || (strict && last_key == key) {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok((lo < self.header.page_count).then_some(lo))
}
fn bound_in_cached_page(&self, key: Key, upper: bool) -> Result<u64> {
let cached = self.cached_page.as_ref().expect("page loaded");
let mut lo = 0usize;
let mut hi = cached.header.frame_count as usize;
while lo < hi {
let mid = lo + ((hi - lo) >> 1);
let mid_key = self.cached_key(mid)?;
if mid_key < key || (upper && mid_key == key) {
lo = mid + 1;
} else {
hi = mid;
}
}
Ok(cached.header.first_frame_index + lo as u64)
}
fn load_page(&mut self, page_index: u64) -> Result<()> {
if self
.cached_page
.as_ref()
.is_some_and(|cached| cached.index == page_index)
{
return Ok(());
}
let page_header = self.read_page_header(page_index)?;
let mut compressed = vec![0u8; page_header.compressed_len as usize];
self.inner.read_exact(&mut compressed)?;
page_header.validate_payload(&compressed)?;
let encoded = page_header
.codec
.decompress(&compressed, page_header.uncompressed_len as usize)?;
let raw = decode_page_payload(
&self.header.schema,
page_header.encoding,
&encoded,
page_header.frame_count as usize,
)?;
self.cached_page = Some(CachedPage {
index: page_index,
header: page_header,
raw,
});
Ok(())
}
fn read_frame_from_page(&mut self, page_index: u64, local_index: usize) -> Result<OwnedFrame> {
self.load_page(page_index)?;
let cached = self.cached_page.as_ref().expect("page loaded");
let frame_len = self.header.schema.frame_len as usize;
let offset = local_index * frame_len;
OwnedFrame::new(
&self.header.schema,
cached.raw[offset..offset + frame_len].to_vec(),
)
.map_err(Into::into)
}
fn cached_key(&self, local_index: usize) -> Result<Key> {
let cached = self.cached_page.as_ref().expect("page loaded");
let frame_len = self.header.schema.frame_len as usize;
let key_field = self.header.schema.key_field();
let offset = local_index * frame_len + key_field.offset as usize;
let end = offset + key_field.length as usize;
Ok(Key::decode(self.key_type, &cached.raw[offset..end])?)
}
pub fn find_page_for_key(&mut self, key: Key) -> Result<Option<u64>> {
if self.header.page_count == 0 {
return Ok(None);
}
let mut lo = 0;
let mut hi = self.header.page_count;
while lo < hi {
let mid = lo + ((hi - lo) >> 1);
let page = self.read_page_header(mid)?;
if page.last_key < key {
lo = mid + 1;
} else {
hi = mid;
}
}
if lo >= self.header.page_count {
return Ok(None);
}
let page = self.read_page_header(lo)?;
if key < page.first_key {
Ok(None)
} else {
Ok(Some(lo))
}
}
pub fn frames_between(&mut self, first: Key, last: Key) -> Result<Vec<OwnedFrame>> {
if first > last {
return Ok(Vec::new());
}
let start = self.lower_bound(first)?;
let end = self.upper_bound(last)?;
let mut out = Vec::with_capacity((end - start) as usize);
for index in start..end {
out.push(self.read_frame_at(index)?.expect("index is in range"));
}
Ok(out)
}
pub fn read_all_frames(&mut self) -> Result<Vec<OwnedFrame>> {
let mut out = Vec::with_capacity(self.header.frame_count as usize);
for page_index in 0..self.header.page_count {
out.extend(self.read_page_frames(page_index)?);
}
Ok(out)
}
pub fn verify(&mut self) -> Result<()> {
let mut last_key = None;
let mut count = 0u64;
for page_index in 0..self.header.page_count {
let page = self.read_page_header(page_index)?;
if page.first_frame_index != count {
return Err(V2Error::InvalidPageHeader(page_index));
}
let frames = self.read_page_frames(page_index)?;
if frames.len() != page.frame_count as usize {
return Err(V2Error::InvalidPageHeader(page_index));
}
for frame in &frames {
let key = frame.as_ref().key(&self.header.schema, self.key_type)?;
if let Some(last) = last_key {
if key < last {
return Err(V2Error::KeyOrderViolation);
}
}
last_key = Some(key);
}
count += frames.len() as u64;
}
if count != self.header.frame_count {
return Err(V2Error::InvalidFileHeader);
}
Ok(())
}
}