cobble 0.1.0

A flexible embedded key-value storage engine for distributed systems as well as single-node applications.
Documentation
use crate::error::{Error, Result};
use bytes::{BufMut, Bytes, BytesMut};
use std::convert::TryInto;

const PARQUET_META_MAGIC: &[u8; 4] = b"pqm1";

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct ParquetRowGroupRange {
    pub(crate) start_key: Vec<u8>,
    pub(crate) end_key: Vec<u8>,
}

#[derive(Clone, Debug)]
pub(crate) struct ParquetMeta {
    row_count: u64,
    row_groups: Vec<ParquetRowGroupRange>,
}

impl ParquetMeta {
    pub(crate) fn new(row_count: u64, row_groups: Vec<ParquetRowGroupRange>) -> Self {
        Self {
            row_count,
            row_groups,
        }
    }

    pub(crate) fn row_count(&self) -> u64 {
        self.row_count
    }

    pub(crate) fn row_groups(&self) -> &[ParquetRowGroupRange] {
        &self.row_groups
    }

    pub(crate) fn encode(self) -> Bytes {
        let mut out = BytesMut::with_capacity(
            4 + 8
                + 4
                + self
                    .row_groups
                    .iter()
                    .map(|group| 8 + group.start_key.len() + group.end_key.len())
                    .sum::<usize>(),
        );
        out.extend_from_slice(PARQUET_META_MAGIC);
        out.put_u64_le(self.row_count);
        out.put_u32_le(self.row_groups.len() as u32);
        for group in self.row_groups {
            out.put_u32_le(group.start_key.len() as u32);
            out.extend_from_slice(&group.start_key);
            out.put_u32_le(group.end_key.len() as u32);
            out.extend_from_slice(&group.end_key);
        }
        out.freeze()
    }

    fn decode(bytes: &[u8]) -> Result<Self> {
        if bytes.len() < 16 {
            return Err(Error::IoError(format!(
                "Invalid parquet meta size: expected at least 16, got {}",
                bytes.len()
            )));
        }
        if &bytes[..4] != PARQUET_META_MAGIC {
            return Err(Error::IoError("Invalid parquet meta magic".to_string()));
        }

        let row_count = u64::from_le_bytes(
            bytes[4..12]
                .try_into()
                .map_err(|_| Error::IoError("Invalid parquet meta row_count".to_string()))?,
        );
        let row_group_count = u32::from_le_bytes(
            bytes[12..16]
                .try_into()
                .map_err(|_| Error::IoError("Invalid parquet meta row_group_count".to_string()))?,
        ) as usize;

        let mut offset = 16usize;
        let mut row_groups = Vec::with_capacity(row_group_count);
        for _ in 0..row_group_count {
            if offset + 4 > bytes.len() {
                return Err(Error::IoError(
                    "Invalid parquet meta: missing start key length".to_string(),
                ));
            }
            let start_len =
                u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
                    Error::IoError("Invalid parquet meta start key length".to_string())
                })?) as usize;
            offset += 4;
            if offset + start_len > bytes.len() {
                return Err(Error::IoError(
                    "Invalid parquet meta: start key out of range".to_string(),
                ));
            }
            let start_key = bytes[offset..offset + start_len].to_vec();
            offset += start_len;

            if offset + 4 > bytes.len() {
                return Err(Error::IoError(
                    "Invalid parquet meta: missing end key length".to_string(),
                ));
            }
            let end_len =
                u32::from_le_bytes(bytes[offset..offset + 4].try_into().map_err(|_| {
                    Error::IoError("Invalid parquet meta end key length".to_string())
                })?) as usize;
            offset += 4;
            if offset + end_len > bytes.len() {
                return Err(Error::IoError(
                    "Invalid parquet meta: end key out of range".to_string(),
                ));
            }
            let end_key = bytes[offset..offset + end_len].to_vec();
            offset += end_len;

            row_groups.push(ParquetRowGroupRange { start_key, end_key });
        }
        if offset != bytes.len() {
            return Err(Error::IoError(
                "Invalid parquet meta: trailing bytes detected".to_string(),
            ));
        }
        Ok(Self {
            row_count,
            row_groups,
        })
    }
}

pub(crate) fn decode_meta(meta_bytes: Option<Bytes>) -> Result<Option<ParquetMeta>> {
    let Some(meta) = meta_bytes else {
        return Ok(None);
    };
    ParquetMeta::decode(meta.as_ref()).map(Some)
}

pub(crate) fn decode_meta_row_count(meta_bytes: Option<Bytes>) -> Result<Option<u64>> {
    Ok(decode_meta(meta_bytes)?.map(|meta| meta.row_count()))
}

pub(crate) fn decode_meta_row_group_ranges(
    meta_bytes: Option<Bytes>,
) -> Result<Option<Vec<ParquetRowGroupRange>>> {
    Ok(decode_meta(meta_bytes)?.map(|meta| meta.row_groups().to_vec()))
}