rgwml 2.0.0

Typed, local-first tabular data library with columnar in-memory storage.
Documentation
use crate::error::{Error, Result};

use super::{Bitmap, DataType};

#[derive(Clone, Debug, PartialEq)]
pub struct PrimitiveCol<T> {
    pub values: Vec<T>,
    pub validity: Option<Bitmap>,
}

impl<T> PrimitiveCol<T> {
    pub fn new(values: Vec<T>, validity: Option<Bitmap>) -> Self {
        Self { values, validity }
    }

    pub fn len(&self) -> usize {
        self.values.len()
    }

    pub fn is_empty(&self) -> bool {
        self.values.is_empty()
    }
}

impl<T: Copy> PrimitiveCol<T> {
    pub fn value(&self, index: u32) -> Option<T> {
        self.values.get(index as usize).copied()
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BooleanCol {
    pub values: Bitmap,
    pub validity: Option<Bitmap>,
}

impl BooleanCol {
    pub fn new(values: Bitmap, validity: Option<Bitmap>) -> Self {
        Self { values, validity }
    }

    pub fn len(&self) -> usize {
        self.values.len() as usize
    }

    pub fn is_empty(&self) -> bool {
        self.values.is_empty()
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Utf8Col {
    pub offsets: Vec<u32>,
    pub bytes: Vec<u8>,
    pub validity: Option<Bitmap>,
}

impl Utf8Col {
    pub fn new(offsets: Vec<u32>, bytes: Vec<u8>, validity: Option<Bitmap>) -> Result<Self> {
        if offsets.is_empty() {
            return Err(Error::Schema(
                "utf8 columns must contain at least one offset".to_string(),
            ));
        }

        if offsets.windows(2).any(|window| window[0] > window[1]) {
            return Err(Error::Schema(
                "utf8 offsets must be monotonically increasing".to_string(),
            ));
        }

        let last = offsets.last().copied().unwrap_or_default() as usize;
        if last != bytes.len() {
            return Err(Error::Schema(format!(
                "utf8 offsets end at {last}, but byte buffer length is {}",
                bytes.len()
            )));
        }

        if let Some(validity) = &validity {
            let rows = offsets.len().saturating_sub(1) as u32;
            if validity.len() != rows {
                return Err(Error::Schema(
                    "utf8 validity bitmap length does not match row count".to_string(),
                ));
            }
        }

        Ok(Self {
            offsets,
            bytes,
            validity,
        })
    }

    pub fn len(&self) -> usize {
        self.offsets.len().saturating_sub(1)
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    pub fn value(&self, index: u32) -> Option<&str> {
        let start = *self.offsets.get(index as usize)? as usize;
        let end = *self.offsets.get(index as usize + 1)? as usize;
        std::str::from_utf8(&self.bytes[start..end]).ok()
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DictionaryCol<K> {
    pub keys: Vec<K>,
    pub values: Utf8Col,
    pub validity: Option<Bitmap>,
}

impl<K> DictionaryCol<K> {
    pub fn new(keys: Vec<K>, values: Utf8Col, validity: Option<Bitmap>) -> Self {
        Self {
            keys,
            values,
            validity,
        }
    }

    pub fn len(&self) -> usize {
        self.keys.len()
    }

    pub fn is_empty(&self) -> bool {
        self.keys.is_empty()
    }
}

impl DictionaryCol<u32> {
    pub fn value(&self, index: u32) -> Option<&str> {
        let key = *self.keys.get(index as usize)?;
        self.values.value(key)
    }
}

#[derive(Clone, Debug, PartialEq)]
pub enum Column {
    Bool(BooleanCol),
    I64(PrimitiveCol<i64>),
    F64(PrimitiveCol<f64>),
    Utf8(Utf8Col),
    DictUtf8(DictionaryCol<u32>),
    Date32(PrimitiveCol<i32>),
    TimestampMs(PrimitiveCol<i64>),
}

impl Column {
    pub fn len(&self) -> usize {
        match self {
            Self::Bool(column) => column.len(),
            Self::I64(column) => column.len(),
            Self::F64(column) => column.len(),
            Self::Utf8(column) => column.len(),
            Self::DictUtf8(column) => column.len(),
            Self::Date32(column) => column.len(),
            Self::TimestampMs(column) => column.len(),
        }
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    pub fn dtype(&self) -> DataType {
        match self {
            Self::Bool(_) => DataType::Bool,
            Self::I64(_) => DataType::I64,
            Self::F64(_) => DataType::F64,
            Self::Utf8(_) => DataType::Utf8,
            Self::DictUtf8(_) => DataType::DictUtf8,
            Self::Date32(_) => DataType::Date32,
            Self::TimestampMs(_) => DataType::TimestampMs,
        }
    }

    pub fn is_null(&self, index: u32) -> bool {
        match self {
            Self::Bool(column) => validity_is_null(&column.validity, index),
            Self::I64(column) => validity_is_null(&column.validity, index),
            Self::F64(column) => validity_is_null(&column.validity, index),
            Self::Utf8(column) => validity_is_null(&column.validity, index),
            Self::DictUtf8(column) => validity_is_null(&column.validity, index),
            Self::Date32(column) => validity_is_null(&column.validity, index),
            Self::TimestampMs(column) => validity_is_null(&column.validity, index),
        }
    }

    pub fn bool_value(&self, index: u32) -> Option<bool> {
        match self {
            Self::Bool(column) if !self.is_null(index) => Some(column.values.is_set(index)),
            _ => None,
        }
    }

    pub fn i64_value(&self, index: u32) -> Option<i64> {
        match self {
            Self::I64(column) if !self.is_null(index) => column.value(index),
            Self::TimestampMs(column) if !self.is_null(index) => column.value(index),
            _ => None,
        }
    }

    pub fn i32_value(&self, index: u32) -> Option<i32> {
        match self {
            Self::Date32(column) if !self.is_null(index) => column.value(index),
            _ => None,
        }
    }

    pub fn f64_value(&self, index: u32) -> Option<f64> {
        match self {
            Self::F64(column) if !self.is_null(index) => column.value(index),
            _ => None,
        }
    }

    pub fn utf8_value(&self, index: u32) -> Option<&str> {
        match self {
            Self::Utf8(column) if !self.is_null(index) => column.value(index),
            Self::DictUtf8(column) if !self.is_null(index) => column.value(index),
            _ => None,
        }
    }

    pub fn dict_key_value(&self, index: u32) -> Option<u32> {
        match self {
            Self::DictUtf8(column) if !self.is_null(index) => {
                column.keys.get(index as usize).copied()
            }
            _ => None,
        }
    }

    pub(crate) fn gather(&self, indices: &[u32]) -> Result<Self> {
        match self {
            Self::Bool(column) => {
                let values = Bitmap::from_bools(
                    &indices
                        .iter()
                        .map(|&index| column.values.is_set(index))
                        .collect::<Vec<_>>(),
                );
                let validity = gather_validity(&column.validity, indices);
                Ok(Self::Bool(BooleanCol::new(values, validity)))
            }
            Self::I64(column) => Ok(Self::I64(PrimitiveCol::new(
                indices
                    .iter()
                    .filter_map(|&index| column.value(index))
                    .collect::<Vec<_>>(),
                gather_validity(&column.validity, indices),
            ))),
            Self::F64(column) => Ok(Self::F64(PrimitiveCol::new(
                indices
                    .iter()
                    .filter_map(|&index| column.value(index))
                    .collect::<Vec<_>>(),
                gather_validity(&column.validity, indices),
            ))),
            Self::Date32(column) => Ok(Self::Date32(PrimitiveCol::new(
                indices
                    .iter()
                    .filter_map(|&index| column.value(index))
                    .collect::<Vec<_>>(),
                gather_validity(&column.validity, indices),
            ))),
            Self::TimestampMs(column) => Ok(Self::TimestampMs(PrimitiveCol::new(
                indices
                    .iter()
                    .filter_map(|&index| column.value(index))
                    .collect::<Vec<_>>(),
                gather_validity(&column.validity, indices),
            ))),
            Self::Utf8(column) => gather_utf8(column, indices).map(Self::Utf8),
            Self::DictUtf8(column) => Ok(Self::DictUtf8(DictionaryCol::new(
                indices
                    .iter()
                    .filter_map(|&index| column.keys.get(index as usize).copied())
                    .collect::<Vec<_>>(),
                column.values.clone(),
                gather_validity(&column.validity, indices),
            ))),
        }
    }
}

fn gather_utf8(column: &Utf8Col, indices: &[u32]) -> Result<Utf8Col> {
    let mut offsets = Vec::with_capacity(indices.len() + 1);
    let mut bytes = Vec::new();
    offsets.push(0);

    for &index in indices {
        let start =
            *column.offsets.get(index as usize).ok_or_else(|| {
                Error::InvalidSelection(format!("row index {index} is out of bounds"))
            })? as usize;
        let end =
            *column.offsets.get(index as usize + 1).ok_or_else(|| {
                Error::InvalidSelection(format!("row index {index} is out of bounds"))
            })? as usize;
        bytes.extend_from_slice(&column.bytes[start..end]);
        offsets.push(bytes.len() as u32);
    }

    Utf8Col::new(offsets, bytes, gather_validity(&column.validity, indices))
}

fn gather_validity(validity: &Option<Bitmap>, indices: &[u32]) -> Option<Bitmap> {
    let validity = validity.as_ref()?;
    let bits = indices
        .iter()
        .map(|&index| validity.is_set(index))
        .collect::<Vec<_>>();
    Some(Bitmap::from_bools(&bits))
}

fn validity_is_null(validity: &Option<Bitmap>, index: u32) -> bool {
    validity
        .as_ref()
        .map(|bitmap| !bitmap.is_set(index))
        .unwrap_or(false)
}