fwob-v2 1.0.0

Fixed-page compressed storage for fast FWOB range queries and bulk append
Documentation
use std::{
    fs::File,
    io::{Read, Seek, SeekFrom},
    path::Path,
};

use fwob_core::{Key, KeyType, OwnedFrame};

use crate::{
    encoding::decode_page_payload,
    file_header::{read_file_header, FileHeader},
    page::PageHeader,
    Result, V2Error,
};

pub struct Reader<R> {
    inner: R,
    header: FileHeader,
    key_type: KeyType,
}

impl Reader<File> {
    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
        let file = File::open(path)?;
        Self::new(file)
    }
}

impl<R: Read + Seek> Reader<R> {
    pub fn new(mut inner: R) -> Result<Self> {
        let header = read_file_header(&mut inner)?;
        let key_type = KeyType::from_field(header.schema.key_field())?;
        Ok(Self {
            inner,
            header,
            key_type,
        })
    }

    pub fn header(&self) -> &FileHeader {
        &self.header
    }

    pub fn read_page_header(&mut self, page_index: u64) -> Result<PageHeader> {
        if page_index >= self.header.page_count {
            return Err(V2Error::InvalidPageHeader(page_index));
        }
        self.inner
            .seek(SeekFrom::Start(self.header.page_offset(page_index)))?;
        PageHeader::read(&mut self.inner, page_index)
    }

    pub fn read_page_frames(&mut self, page_index: u64) -> Result<Vec<OwnedFrame>> {
        let raw = self.read_page_raw_frames(page_index)?;
        let frame_len = self.header.schema.frame_len as usize;
        let mut frames = Vec::with_capacity(raw.len() / frame_len);
        for chunk in raw.chunks_exact(frame_len) {
            frames.push(OwnedFrame::new(&self.header.schema, chunk.to_vec())?);
        }
        Ok(frames)
    }

    pub fn read_page_raw_frames(&mut self, page_index: u64) -> Result<Vec<u8>> {
        let page_header = self.read_page_header(page_index)?;
        let mut compressed = vec![0u8; page_header.compressed_len as usize];
        self.inner.read_exact(&mut compressed)?;
        page_header.validate_payload(&compressed)?;
        let encoded = page_header
            .codec
            .decompress(&compressed, page_header.uncompressed_len as usize)?;
        let raw = decode_page_payload(
            &self.header.schema,
            page_header.encoding,
            &encoded,
            page_header.frame_count as usize,
        )?;
        Ok(raw)
    }

    pub fn find_page_for_key(&mut self, key: Key) -> Result<Option<u64>> {
        if self.header.page_count == 0 {
            return Ok(None);
        }
        let mut lo = 0;
        let mut hi = self.header.page_count;
        while lo < hi {
            let mid = lo + ((hi - lo) >> 1);
            let page = self.read_page_header(mid)?;
            if page.last_key < key {
                lo = mid + 1;
            } else {
                hi = mid;
            }
        }
        if lo >= self.header.page_count {
            return Ok(None);
        }
        let page = self.read_page_header(lo)?;
        if key < page.first_key {
            Ok(None)
        } else {
            Ok(Some(lo))
        }
    }

    pub fn frames_between(&mut self, first: Key, last: Key) -> Result<Vec<OwnedFrame>> {
        if first > last {
            return Ok(Vec::new());
        }
        let mut out = Vec::new();
        let Some(mut page_index) = self.find_page_for_key(first)? else {
            return Ok(out);
        };
        while page_index < self.header.page_count {
            let page = self.read_page_header(page_index)?;
            if page.first_key > last {
                break;
            }
            let frames = self.read_page_frames(page_index)?;
            for frame in frames {
                let key = frame.as_ref().key(&self.header.schema, self.key_type)?;
                if key >= first && key <= last {
                    out.push(frame);
                }
            }
            page_index += 1;
        }
        Ok(out)
    }

    pub fn read_all_frames(&mut self) -> Result<Vec<OwnedFrame>> {
        let mut out = Vec::with_capacity(self.header.frame_count as usize);
        for page_index in 0..self.header.page_count {
            out.extend(self.read_page_frames(page_index)?);
        }
        Ok(out)
    }

    pub fn verify(&mut self) -> Result<()> {
        let mut last_key = None;
        let mut count = 0u64;
        for page_index in 0..self.header.page_count {
            let page = self.read_page_header(page_index)?;
            let frames = self.read_page_frames(page_index)?;
            if frames.len() != page.frame_count as usize {
                return Err(V2Error::InvalidPageHeader(page_index));
            }
            for frame in &frames {
                let key = frame.as_ref().key(&self.header.schema, self.key_type)?;
                if let Some(last) = last_key {
                    if key < last {
                        return Err(V2Error::KeyOrderViolation);
                    }
                }
                last_key = Some(key);
            }
            count += frames.len() as u64;
        }
        if count != self.header.frame_count {
            return Err(V2Error::InvalidFileHeader);
        }
        Ok(())
    }
}