use crate::error::{Error, Result};
use super::{Bitmap, DataType};
#[derive(Clone, Debug, PartialEq)]
pub struct PrimitiveCol<T> {
pub values: Vec<T>,
pub validity: Option<Bitmap>,
}
impl<T> PrimitiveCol<T> {
pub fn new(values: Vec<T>, validity: Option<Bitmap>) -> Self {
Self { values, validity }
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
}
impl<T: Copy> PrimitiveCol<T> {
pub fn value(&self, index: u32) -> Option<T> {
self.values.get(index as usize).copied()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BooleanCol {
pub values: Bitmap,
pub validity: Option<Bitmap>,
}
impl BooleanCol {
pub fn new(values: Bitmap, validity: Option<Bitmap>) -> Self {
Self { values, validity }
}
pub fn len(&self) -> usize {
self.values.len() as usize
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Utf8Col {
pub offsets: Vec<u32>,
pub bytes: Vec<u8>,
pub validity: Option<Bitmap>,
}
impl Utf8Col {
pub fn new(offsets: Vec<u32>, bytes: Vec<u8>, validity: Option<Bitmap>) -> Result<Self> {
if offsets.is_empty() {
return Err(Error::Schema(
"utf8 columns must contain at least one offset".to_string(),
));
}
if offsets.windows(2).any(|window| window[0] > window[1]) {
return Err(Error::Schema(
"utf8 offsets must be monotonically increasing".to_string(),
));
}
let last = offsets.last().copied().unwrap_or_default() as usize;
if last != bytes.len() {
return Err(Error::Schema(format!(
"utf8 offsets end at {last}, but byte buffer length is {}",
bytes.len()
)));
}
if let Some(validity) = &validity {
let rows = offsets.len().saturating_sub(1) as u32;
if validity.len() != rows {
return Err(Error::Schema(
"utf8 validity bitmap length does not match row count".to_string(),
));
}
}
Ok(Self {
offsets,
bytes,
validity,
})
}
pub fn len(&self) -> usize {
self.offsets.len().saturating_sub(1)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn value(&self, index: u32) -> Option<&str> {
let start = *self.offsets.get(index as usize)? as usize;
let end = *self.offsets.get(index as usize + 1)? as usize;
std::str::from_utf8(&self.bytes[start..end]).ok()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DictionaryCol<K> {
pub keys: Vec<K>,
pub values: Utf8Col,
pub validity: Option<Bitmap>,
}
impl<K> DictionaryCol<K> {
pub fn new(keys: Vec<K>, values: Utf8Col, validity: Option<Bitmap>) -> Self {
Self {
keys,
values,
validity,
}
}
pub fn len(&self) -> usize {
self.keys.len()
}
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}
impl DictionaryCol<u32> {
pub fn value(&self, index: u32) -> Option<&str> {
let key = *self.keys.get(index as usize)?;
self.values.value(key)
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Column {
Bool(BooleanCol),
I64(PrimitiveCol<i64>),
F64(PrimitiveCol<f64>),
Utf8(Utf8Col),
DictUtf8(DictionaryCol<u32>),
Date32(PrimitiveCol<i32>),
TimestampMs(PrimitiveCol<i64>),
}
impl Column {
pub fn len(&self) -> usize {
match self {
Self::Bool(column) => column.len(),
Self::I64(column) => column.len(),
Self::F64(column) => column.len(),
Self::Utf8(column) => column.len(),
Self::DictUtf8(column) => column.len(),
Self::Date32(column) => column.len(),
Self::TimestampMs(column) => column.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn dtype(&self) -> DataType {
match self {
Self::Bool(_) => DataType::Bool,
Self::I64(_) => DataType::I64,
Self::F64(_) => DataType::F64,
Self::Utf8(_) => DataType::Utf8,
Self::DictUtf8(_) => DataType::DictUtf8,
Self::Date32(_) => DataType::Date32,
Self::TimestampMs(_) => DataType::TimestampMs,
}
}
pub fn is_null(&self, index: u32) -> bool {
match self {
Self::Bool(column) => validity_is_null(&column.validity, index),
Self::I64(column) => validity_is_null(&column.validity, index),
Self::F64(column) => validity_is_null(&column.validity, index),
Self::Utf8(column) => validity_is_null(&column.validity, index),
Self::DictUtf8(column) => validity_is_null(&column.validity, index),
Self::Date32(column) => validity_is_null(&column.validity, index),
Self::TimestampMs(column) => validity_is_null(&column.validity, index),
}
}
pub fn bool_value(&self, index: u32) -> Option<bool> {
match self {
Self::Bool(column) if !self.is_null(index) => Some(column.values.is_set(index)),
_ => None,
}
}
pub fn i64_value(&self, index: u32) -> Option<i64> {
match self {
Self::I64(column) if !self.is_null(index) => column.value(index),
Self::TimestampMs(column) if !self.is_null(index) => column.value(index),
_ => None,
}
}
pub fn i32_value(&self, index: u32) -> Option<i32> {
match self {
Self::Date32(column) if !self.is_null(index) => column.value(index),
_ => None,
}
}
pub fn f64_value(&self, index: u32) -> Option<f64> {
match self {
Self::F64(column) if !self.is_null(index) => column.value(index),
_ => None,
}
}
pub fn utf8_value(&self, index: u32) -> Option<&str> {
match self {
Self::Utf8(column) if !self.is_null(index) => column.value(index),
Self::DictUtf8(column) if !self.is_null(index) => column.value(index),
_ => None,
}
}
pub fn dict_key_value(&self, index: u32) -> Option<u32> {
match self {
Self::DictUtf8(column) if !self.is_null(index) => {
column.keys.get(index as usize).copied()
}
_ => None,
}
}
pub(crate) fn gather(&self, indices: &[u32]) -> Result<Self> {
match self {
Self::Bool(column) => {
let values = Bitmap::from_bools(
&indices
.iter()
.map(|&index| column.values.is_set(index))
.collect::<Vec<_>>(),
);
let validity = gather_validity(&column.validity, indices);
Ok(Self::Bool(BooleanCol::new(values, validity)))
}
Self::I64(column) => Ok(Self::I64(PrimitiveCol::new(
indices
.iter()
.filter_map(|&index| column.value(index))
.collect::<Vec<_>>(),
gather_validity(&column.validity, indices),
))),
Self::F64(column) => Ok(Self::F64(PrimitiveCol::new(
indices
.iter()
.filter_map(|&index| column.value(index))
.collect::<Vec<_>>(),
gather_validity(&column.validity, indices),
))),
Self::Date32(column) => Ok(Self::Date32(PrimitiveCol::new(
indices
.iter()
.filter_map(|&index| column.value(index))
.collect::<Vec<_>>(),
gather_validity(&column.validity, indices),
))),
Self::TimestampMs(column) => Ok(Self::TimestampMs(PrimitiveCol::new(
indices
.iter()
.filter_map(|&index| column.value(index))
.collect::<Vec<_>>(),
gather_validity(&column.validity, indices),
))),
Self::Utf8(column) => gather_utf8(column, indices).map(Self::Utf8),
Self::DictUtf8(column) => Ok(Self::DictUtf8(DictionaryCol::new(
indices
.iter()
.filter_map(|&index| column.keys.get(index as usize).copied())
.collect::<Vec<_>>(),
column.values.clone(),
gather_validity(&column.validity, indices),
))),
}
}
}
fn gather_utf8(column: &Utf8Col, indices: &[u32]) -> Result<Utf8Col> {
let mut offsets = Vec::with_capacity(indices.len() + 1);
let mut bytes = Vec::new();
offsets.push(0);
for &index in indices {
let start =
*column.offsets.get(index as usize).ok_or_else(|| {
Error::InvalidSelection(format!("row index {index} is out of bounds"))
})? as usize;
let end =
*column.offsets.get(index as usize + 1).ok_or_else(|| {
Error::InvalidSelection(format!("row index {index} is out of bounds"))
})? as usize;
bytes.extend_from_slice(&column.bytes[start..end]);
offsets.push(bytes.len() as u32);
}
Utf8Col::new(offsets, bytes, gather_validity(&column.validity, indices))
}
fn gather_validity(validity: &Option<Bitmap>, indices: &[u32]) -> Option<Bitmap> {
let validity = validity.as_ref()?;
let bits = indices
.iter()
.map(|&index| validity.is_set(index))
.collect::<Vec<_>>();
Some(Bitmap::from_bools(&bits))
}
fn validity_is_null(validity: &Option<Bitmap>, index: u32) -> bool {
validity
.as_ref()
.map(|bitmap| !bitmap.is_set(index))
.unwrap_or(false)
}