fwob-v2 1.5.2

Fixed-page compressed storage for fast FWOB range queries and bulk append
Documentation
use std::io::{Read, Write};

use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use crc32fast::Hasher;
use fwob_core::Key;

use crate::{Codec, Result, V2Error};

pub const PAGE_MAGIC: &[u8; 4] = b"FWP2";
pub const PAGE_HEADER_LEN: usize = 80;
const PAGE_HEADER_VERSION: u8 = 2;
const KEY_SLOT_LEN: usize = 16;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum Encoding {
    RowRawV1 = 0,
    ColumnarBasicV1 = 1,
    ColumnarDeltaV1 = 2,
}

impl Encoding {
    pub fn from_id(id: u8) -> Result<Self> {
        match id {
            0 => Ok(Self::RowRawV1),
            1 => Ok(Self::ColumnarBasicV1),
            2 => Ok(Self::ColumnarDeltaV1),
            other => Err(V2Error::UnsupportedEncoding(other)),
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PageHeader {
    pub header_version: u8,
    pub codec: Codec,
    pub encoding: Encoding,
    pub flags: u8,
    pub header_crc32: u32,
    pub payload_crc32: u32,
    pub first_key: Key,
    pub last_key: Key,
    pub frame_count: u32,
    pub uncompressed_len: u32,
    pub compressed_len: u32,
    pub first_frame_index: u64,
}

impl PageHeader {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        codec: Codec,
        encoding: Encoding,
        first_key: Key,
        last_key: Key,
        frame_count: u32,
        uncompressed_len: u32,
        compressed_len: u32,
        first_frame_index: u64,
        payload: &[u8],
    ) -> Self {
        let payload_crc32 = crc32(payload);
        let mut header = Self {
            header_version: PAGE_HEADER_VERSION,
            codec,
            encoding,
            flags: 0,
            header_crc32: 0,
            payload_crc32,
            first_key,
            last_key,
            frame_count,
            uncompressed_len,
            compressed_len,
            first_frame_index,
        };
        header.header_crc32 = crc32(&header.bytes_with_zero_crc());
        header
    }

    pub fn read<R: Read>(reader: &mut R, page_index: u64) -> Result<Self> {
        let mut magic = [0u8; 4];
        reader.read_exact(&mut magic)?;
        if &magic != PAGE_MAGIC {
            return Err(V2Error::InvalidPageHeader(page_index));
        }
        let header_version = reader.read_u8()?;
        if header_version != PAGE_HEADER_VERSION {
            return Err(V2Error::InvalidPageHeader(page_index));
        }
        let codec = Codec::from_id(reader.read_u8()?)?;
        let encoding = Encoding::from_id(reader.read_u8()?)?;
        let flags = reader.read_u8()?;
        let header_crc32 = reader.read_u32::<LittleEndian>()?;
        let payload_crc32 = reader.read_u32::<LittleEndian>()?;
        let first_key = read_key(reader)?;
        let last_key = read_key(reader)?;
        let frame_count = reader.read_u32::<LittleEndian>()?;
        let uncompressed_len = reader.read_u32::<LittleEndian>()?;
        let compressed_len = reader.read_u32::<LittleEndian>()?;
        let first_frame_index = reader.read_u64::<LittleEndian>()?;

        let mut reserved = [0u8; 10];
        reader.read_exact(&mut reserved)?;

        let header = Self {
            header_version,
            codec,
            encoding,
            flags,
            header_crc32,
            payload_crc32,
            first_key,
            last_key,
            frame_count,
            uncompressed_len,
            compressed_len,
            first_frame_index,
        };
        if crc32(&header.bytes_with_zero_crc()) != header.header_crc32 {
            return Err(V2Error::InvalidPageHeader(page_index));
        }
        Ok(header)
    }

    pub fn write<W: Write>(&self, writer: &mut W) -> Result<()> {
        writer.write_all(PAGE_MAGIC)?;
        writer.write_u8(self.header_version)?;
        writer.write_u8(self.codec as u8)?;
        writer.write_u8(self.encoding as u8)?;
        writer.write_u8(self.flags)?;
        writer.write_u32::<LittleEndian>(self.header_crc32)?;
        writer.write_u32::<LittleEndian>(self.payload_crc32)?;
        write_key(writer, self.first_key)?;
        write_key(writer, self.last_key)?;
        writer.write_u32::<LittleEndian>(self.frame_count)?;
        writer.write_u32::<LittleEndian>(self.uncompressed_len)?;
        writer.write_u32::<LittleEndian>(self.compressed_len)?;
        writer.write_u64::<LittleEndian>(self.first_frame_index)?;
        writer.write_all(&[0u8; 10])?;
        Ok(())
    }

    pub fn validate_payload(&self, payload: &[u8]) -> Result<()> {
        if crc32(payload) == self.payload_crc32 {
            Ok(())
        } else {
            Err(V2Error::ChecksumMismatch)
        }
    }

    pub fn set_first_frame_index(&mut self, first_frame_index: u64) {
        self.first_frame_index = first_frame_index;
        self.header_crc32 = crc32(&self.bytes_with_zero_crc());
    }

    fn bytes_with_zero_crc(&self) -> Vec<u8> {
        let mut bytes = Vec::with_capacity(PAGE_HEADER_LEN);
        bytes.extend_from_slice(PAGE_MAGIC);
        bytes.push(self.header_version);
        bytes.push(self.codec as u8);
        bytes.push(self.encoding as u8);
        bytes.push(self.flags);
        bytes.extend_from_slice(&0u32.to_le_bytes());
        bytes.extend_from_slice(&self.payload_crc32.to_le_bytes());
        write_key(&mut bytes, self.first_key).unwrap();
        write_key(&mut bytes, self.last_key).unwrap();
        bytes.extend_from_slice(&self.frame_count.to_le_bytes());
        bytes.extend_from_slice(&self.uncompressed_len.to_le_bytes());
        bytes.extend_from_slice(&self.compressed_len.to_le_bytes());
        bytes.extend_from_slice(&self.first_frame_index.to_le_bytes());
        bytes.extend_from_slice(&[0u8; 10]);
        bytes
    }
}

fn crc32(bytes: &[u8]) -> u32 {
    let mut hasher = Hasher::new();
    hasher.update(bytes);
    hasher.finalize()
}

fn write_key<W: Write>(writer: &mut W, key: Key) -> std::io::Result<()> {
    let tag = match key {
        Key::I8(_) => 0,
        Key::I16(_) => 1,
        Key::I32(_) => 2,
        Key::I64(_) => 3,
        Key::U8(_) => 4,
        Key::U16(_) => 5,
        Key::U32(_) => 6,
        Key::U64(_) => 7,
        Key::F32(_) => 8,
        Key::F64(_) => 9,
        Key::Decimal(_) => 10,
    };
    writer.write_all(&[tag])?;
    let mut raw = Vec::with_capacity(KEY_SLOT_LEN);
    key.encode(&mut raw);
    raw.resize(KEY_SLOT_LEN, 0);
    writer.write_all(&raw)?;
    Ok(())
}

fn read_key<R: Read>(reader: &mut R) -> Result<Key> {
    let tag = reader.read_u8()?;
    let mut raw = [0u8; KEY_SLOT_LEN];
    reader.read_exact(&mut raw)?;
    Ok(match tag {
        0 => Key::I8(raw[0] as i8),
        1 => Key::I16(i16::from_le_bytes(raw[..2].try_into().unwrap())),
        2 => Key::I32(i32::from_le_bytes(raw[..4].try_into().unwrap())),
        3 => Key::I64(i64::from_le_bytes(raw[..8].try_into().unwrap())),
        4 => Key::U8(raw[0]),
        5 => Key::U16(u16::from_le_bytes(raw[..2].try_into().unwrap())),
        6 => Key::U32(u32::from_le_bytes(raw[..4].try_into().unwrap())),
        7 => Key::U64(u64::from_le_bytes(raw[..8].try_into().unwrap())),
        8 => Key::F32(f32::from_le_bytes(raw[..4].try_into().unwrap())),
        9 => Key::F64(f64::from_le_bytes(raw[..8].try_into().unwrap())),
        10 => Key::decode(fwob_core::KeyType::Decimal, &raw)?,
        _ => return Err(V2Error::InvalidPageHeader(0)),
    })
}