use ndarray::{Array, Array1, Array2, ArrayView1, ArrayView2, Axis, ShapeBuilder};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::dataframe::column_store::typed_array::{TypedData, TypedDataArray};
use crate::error::Error;
use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
use data_value::{DataValue, Extract};
use tracing::*;
mod from;
mod key_index;
mod ops;
pub mod sorted_df;
pub use key_index::KeyIndex;
pub mod filter_df;
pub mod typed_array;
#[derive(Debug, Clone, Default, PartialEq, Serialize)]
pub struct ColumnFrame {
pub index: KeyIndex,
pub data_frame: Vec<TypedDataArray>,
}
impl<'de> Deserialize<'de> for ColumnFrame {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum WireDataframe {
V3(Vec<TypedDataArray>),
V2(Vec<TypedData>),
V1(Array2<DataValue>),
}
#[derive(Debug, Deserialize)]
struct WireData {
index: KeyIndex,
data_frame: WireDataframe,
}
let helper = WireData::deserialize(deserializer)?;
match helper.data_frame {
WireDataframe::V1(data_frame) => {
let ncols = data_frame.ncols();
let data_frame: Vec<TypedDataArray> = (0..ncols)
.map(|i| {
let values: Vec<DataValue> = data_frame.column(i).iter().cloned().collect();
let dtype = helper
.index
.get_keys()
.get(i)
.map(|k| k.ctype)
.unwrap_or(crate::DataType::Unknown);
TypedDataArray::new(dtype, values)
})
.collect();
Ok(ColumnFrame {
index: helper.index,
data_frame,
})
}
WireDataframe::V2(data_frame) => {
let data_frame = data_frame.into_iter().map(TypedDataArray::from).collect();
Ok(ColumnFrame {
index: helper.index,
data_frame,
})
}
WireDataframe::V3(data_frame) => Ok(ColumnFrame {
index: helper.index,
data_frame,
}),
}
}
}
enum Continue {
Continue,
End,
}
impl Continue {
pub fn should_end(&self) -> bool {
matches!(self, Self::End)
}
}
use std::fmt;
impl fmt::Display for ColumnFrame {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "\n|")?;
for key in &self.index.keys {
write!(f, " {key} |")?;
}
if self.index.is_empty() {
writeln!(f, "|")?;
}
write!(f, "\n|")?;
for value in self.index.keys.iter() {
write!(f, " {:10?} |", value.ctype)?;
}
writeln!(f)?;
writeln!(f, "---")?;
for row_idx in 0..std::cmp::min(self.nrows(), 257) {
write!(f, "|")?;
for col in self.data_frame.iter() {
write!(f, " {} |", col.get(row_idx).unwrap_or_default())?;
}
writeln!(f)?;
if row_idx >= 256 {
writeln!(f, "... (dataframe is too long)")?;
break;
}
}
writeln!(f, "---")
}
}
pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
let x = &item;
match dtype {
crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
crate::DataType::U32 => DataValue::U32(u32::extract(x)),
crate::DataType::I32 => DataValue::I32(i32::extract(x)),
crate::DataType::U64 => DataValue::U64(u64::extract(x)),
crate::DataType::I64 => DataValue::I64(i64::extract(x)),
crate::DataType::F32 => DataValue::F32(f32::extract(x)),
crate::DataType::U128 => DataValue::U128(u128::extract(x)),
crate::DataType::I128 => DataValue::I128(i128::extract(x)),
crate::DataType::F64 => DataValue::F64(f64::extract(x)),
crate::DataType::U8 => DataValue::U8(u8::extract(x)),
crate::DataType::String => DataValue::String(String::extract(x).into()),
crate::DataType::Bytes => item,
crate::DataType::Map => item,
crate::DataType::Vec => item,
crate::DataType::Unknown => {
if matches!(item, DataValue::Null) {
return item;
}
let dtype = crate::detect_dtype(&item);
if matches!(dtype, crate::DataType::Unknown) {
tracing::error!("Unknown datatype {dtype:?} - {item:?}");
return item;
}
convert_data_value(item, dtype)
}
}
}
pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
convert_data_value(item, key.ctype)
}
pub enum MaybeView<'v> {
View(ArrayView2<'v, DataValue>),
Array(Array2<DataValue>),
}
impl MaybeView<'_> {
pub fn row_view(&self) -> ArrayView2<'_, DataValue> {
match self {
Self::View(v) => v.view(),
Self::Array(a) => a.t(),
}
}
}
impl ColumnFrame {
pub fn new<K, V>(index: K, data_frame: Vec<V>) -> Self
where
K: Into<KeyIndex>,
V: Into<TypedDataArray>,
{
let index = index.into();
let data_frame = data_frame.into_iter().map(Into::into).collect();
Self { data_frame, index }
}
pub fn new_coerced<K, V>(index: K, data_frame: Vec<V>) -> Self
where
K: Into<KeyIndex>,
V: Into<TypedDataArray>,
{
let index = index.into();
let data_frame = data_frame
.into_iter()
.zip(index.keys.iter())
.map(|(value, key)| {
let mut col: TypedDataArray = value.into();
if !matches!(key.ctype, crate::DataType::Unknown) && col.data_type() != key.ctype {
let _ = col.try_convert_to_dtype(key.ctype);
}
col
})
.collect();
Self { data_frame, index }
}
pub fn keys(&self) -> &[Key] {
self.index.get_keys()
}
pub fn nrows(&self) -> usize {
self.data_frame.first().map(|x| x.len()).unwrap_or_default()
}
pub fn ncolumns(&self) -> usize {
self.data_frame.len()
}
pub fn is_empty(&self) -> bool {
self.nrows() == 0
}
pub fn shrink(&mut self) {}
pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
for i in 0..self.index.keys.len() {
let should_fix = force || matches!(self.index.keys[i].ctype, crate::DataType::Unknown);
if should_fix {
let column = self
.get_column(&self.index.keys[i])
.map_err(|_| Error::EmptyData)?;
let first = column.get(0).ok_or(Error::EmptyData)?;
let dtype = crate::detect_dtype(&first);
self.index.keys[i].ctype = dtype;
}
}
Ok(())
}
pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
let mut errors = vec![];
let keys = self.index.keys.clone();
for key in keys {
tracing::trace!("key: {key:?}- {:?}", key.ctype);
if let Err(e) = self.try_fix_column_by_key(&key) {
errors.push((key, e.to_string()));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(Error::CastFailed(errors))
}
}
pub fn get_column(&self, key: &Key) -> Result<&TypedDataArray, Error> {
let idx = self
.index
.get_column_index(key)
.ok_or(Error::MissingField(format!("{key}").into()))?;
self.get_column_by_idx(idx)
}
pub fn get_column_mut(&mut self, key: &Key) -> Result<&mut TypedDataArray, Error> {
let idx = self
.index
.get_column_index(key)
.ok_or(Error::MissingField(format!("{key}").into()))?;
self.get_column_by_idx_mut(idx)
}
pub fn get_column_by_idx(&self, idx: usize) -> Result<&TypedDataArray, Error> {
self.data_frame
.get(idx)
.ok_or_else(|| Error::IndexOutOfRange(idx, self.nrows()))
}
pub fn get_column_by_idx_mut(&mut self, idx: usize) -> Result<&mut TypedDataArray, Error> {
let n_cols = self.ncolumns();
self.data_frame
.get_mut(idx)
.ok_or_else(|| Error::IndexOutOfRange(idx, n_cols))
}
pub fn get_row(&self, idx: usize) -> Result<Vec<DataValue>, Error> {
let mut row = Vec::with_capacity(self.ncolumns());
for col in self.data_frame.iter() {
row.push(col.get(idx).unwrap_or_default())
}
Ok(row)
}
fn finish(&self) -> Result<Array2<DataValue>, Error> {
let ncols = self.ncolumns();
if ncols == 0 {
return Ok(Array2::default((0, 0)));
}
let nrows = self.nrows();
let mut data = Vec::with_capacity(nrows * ncols);
let mut selected_cols: Vec<Box<dyn Iterator<Item = DataValue>>> = self
.index
.indexes()
.iter()
.map(|col_idx| {
self.get_column_by_idx(*col_idx)
.expect("Cannot get column on index")
.iter_values()
})
.collect::<Vec<_>>();
for _ in 0..nrows {
for col in selected_cols.iter_mut() {
data.push(col.next().unwrap_or(DataValue::Null));
}
}
Array2::from_shape_vec((nrows, ncols), data)
.map_err(|e| Error::UnknownError(format!("finish reshape: {e}")))
}
fn push_row(&mut self, values: Vec<DataValue>) -> Result<(), Error> {
let n_cols = self.ncolumns();
for (idx, value) in values.into_iter().enumerate() {
let current_ptr = self
.data_frame
.get_mut(idx)
.ok_or(Error::IndexOutOfRange(idx, n_cols))?;
current_ptr.push(value)?;
}
Ok(())
}
pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
let col = self.get_column_mut(key)?;
col.try_convert_to_dtype(key.ctype)?;
Ok(())
}
pub fn enforce_dtype_for_column(
&mut self,
key: &str,
dtype: crate::DataType,
) -> Result<(), Error> {
if let Some(idx) = self.index.get_column_index_by_name(key) {
let new_key = Key::new(key, dtype);
let col = self.get_column_by_idx_mut(idx)?;
col.try_convert_to_dtype(new_key.ctype)?;
self.index.rename_key(key, new_key)?;
Ok(())
} else {
Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
}
}
pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
self.index.rename_key(old, new)
}
pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
self.index.add_alias(key, alias)
}
pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
let selected = self.select(Some(keys));
let mut result = Vec::with_capacity(selected.nrows());
for row in selected.rows() {
let mut r = Vec::with_capacity(selected.ncols());
for value in row.iter() {
r.push(D::extract(value));
}
result.push(r);
}
result
}
pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let key_indexes = self.index.select(keys);
if key_indexes.is_empty() {
return Ok(Array2::default((0, 0)));
}
let data_vec: Result<Vec<Array1<DataValue>>, Error> = key_indexes
.indexes()
.iter()
.map(|x| self.get_column_by_idx(*x).map(|col| col.as_generic_array()))
.collect();
let data_vec = data_vec?;
let views: Vec<ArrayView1<DataValue>> = data_vec.iter().map(|a| a.view()).collect();
Ok(ndarray::stack(Axis(0), &views)?)
}
#[deprecated(note = "allocates O(n); use get_column() for zero-copy typed access")]
pub fn select_column(&self, key: &Key) -> Option<Array1<DataValue>> {
self.index
.get_column_index(key)
.and_then(|x| self.get_column_by_idx(x).ok())
.map(|col| col.as_generic_array())
}
pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
where
F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
{
func(keys, self)
}
pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
if row_index >= self.nrows() {
return Err(Error::IndexOutOfRange(row_index, self.nrows()));
}
let Some(column_index) = self.index.get_column_index(column) else {
return Err(Error::NotFound(column.clone()));
};
Ok(column_index)
}
pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<DataValue> {
trace!(
"Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
self.ncolumns(),
self.nrows()
);
trace!("{:?}", self.data_frame);
match self.validate_entry_access(column, row_index) {
Ok(column_index) => self.data_frame.get(column_index)?.get(row_index),
Err(e) => {
trace!("Error: {e}");
None
}
}
}
pub fn set_by_row_index(
&mut self,
column: &Key,
row_index: usize,
value: DataValue,
) -> Result<(), Error> {
let column_index = self.validate_entry_access(column, row_index)?;
let ncols = self.ncolumns();
self.data_frame
.get_mut(column_index)
.ok_or(Error::IndexOutOfRange(column_index, ncols))?
.set(row_index, value)
}
pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() {
return Default::default();
}
let mut new_data_frame = HashMap::with_capacity(keys.len());
for key in keys.iter() {
if let Some(column_index_in_source) = indexes.get_column_index(key) {
let column = self
.data_frame
.get(column_index_in_source)
.map(|x| x.to_vec())
.unwrap_or_else(|| vec![DataValue::Null; self.nrows()]);
new_data_frame.insert(key.clone(), column);
}
}
new_data_frame
}
pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
if keys.is_none() && !self.is_empty() {
return self.finish().expect("BUG: There has to be some data");
}
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() || keys.is_empty() {
return Array2::default((0, 0));
}
let nrows = self.nrows();
let ncols = keys.len();
let mut data = Vec::with_capacity(nrows * ncols);
let mut selected_cols: Vec<Box<dyn Iterator<Item = DataValue> + '_>> = keys
.iter()
.map(|key| match indexes.get_column_index(key) {
Some(col_idx) => self
.get_column_by_idx(col_idx)
.expect("Cannot get column on index")
.iter_values(),
None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = DataValue> + '_>,
})
.collect();
for _ in 0..nrows {
for col in selected_cols.iter_mut() {
data.push(col.next().unwrap_or(DataValue::Null));
}
}
Array::from_shape_vec((nrows, ncols), data).unwrap_or_else(|_| Array2::default((0, 0)))
}
pub fn select_vec_view(
&self,
keys: Option<&[Key]>,
) -> Result<Vec<Option<&TypedDataArray>>, Error> {
if keys.is_none() && !self.is_empty() {
return Ok(self.data_frame.iter().map(Some).collect());
}
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() || keys.is_empty() {
return Err(Error::EmptyData);
}
let ncols = keys.len();
let mut views = Vec::with_capacity(ncols);
for col_key in keys {
if let Some(col_idx) = self.index.get_column_index(col_key) {
views.push(Some(self.get_column_by_idx(col_idx)?));
} else {
views.push(None);
}
}
Ok(views)
}
pub fn select_typed_columns(&self, keys: Option<&[Key]>) -> Result<Vec<TypedDataArray>, Error> {
if keys.is_none() && !self.is_empty() {
return Ok(self.data_frame.clone());
}
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() || keys.is_empty() {
return Err(Error::EmptyData);
}
let mut out = Vec::with_capacity(keys.len());
for col_key in keys {
match indexes.get_column_index(col_key) {
Some(col_idx) => out.push(self.get_column_by_idx(col_idx)?.clone()),
None => out.push(TypedDataArray::default_init(col_key, self.nrows())),
}
}
Ok(out)
}
pub fn select_view(&self, keys: Option<&[Key]>) -> Result<MaybeView<'_>, Error> {
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() || keys.is_empty() {
return Err(Error::EmptyData);
}
let ncols = keys.len();
let mut owned_cols: Vec<Array1<DataValue>> = Vec::with_capacity(ncols);
for col_idx in indexes.indexes() {
owned_cols.push(self.get_column_by_idx(col_idx)?.as_generic_array());
}
let views: Vec<ArrayView1<DataValue>> = owned_cols.iter().map(|a| a.view()).collect();
Ok(MaybeView::Array(ndarray::stack(Axis(0), &views)?))
}
pub fn select_typed<T: Extract + Clone>(&self, keys: Option<&[Key]>) -> Array2<T> {
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() || keys.is_empty() {
return Array2::from_shape_vec((0, 0), vec![]).unwrap();
}
let nrows = self.nrows();
let ncols = keys.len();
let null_default = T::extract(&DataValue::Null);
let mut data = Vec::with_capacity(nrows * ncols);
for key in keys {
match indexes.get_column_index(key) {
Some(col_idx) => {
let col = self
.get_column_by_idx(col_idx)
.expect("Cannot get column on index");
for v in col.iter_values() {
data.push(T::extract(&v));
}
}
None => {
data.resize(data.len() + nrows, null_default.clone());
}
}
}
Array2::from_shape_vec((nrows, ncols).f(), data)
.unwrap_or_else(|_| Array2::from_shape_vec((0, 0), vec![]).unwrap())
}
fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
let len = self.nrows();
let column = TypedDataArray::default_init(&key, len);
self.index.store_key(key);
self.data_frame.push(column);
Ok(())
}
pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
let num_keys = self.index.len();
let candidate_keys = row_candidate.keys();
let mut arr = Vec::with_capacity(num_keys.max(candidate_keys.len()));
for key in &candidate_keys {
if self.index.get_column_index(key).is_none() {
self.extend_dataframe_for_column(key.clone())?;
}
}
arr.reserve(self.index.len());
for index in self.index.get_keys() {
arr.push(
row_candidate
.get_value_ref(index)
.cloned()
.unwrap_or(DataValue::Null),
);
}
self.push_row(arr)?;
Ok(())
}
pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
let mut removed_index = KeyIndex::default();
let removed_data = self.select_typed_columns(Some(keys))?;
let mut indices_to_remove: Vec<usize> = keys
.iter()
.filter_map(|key| self.index.get_column_index(key))
.collect();
for key in keys {
if let Some((current, _)) = self.index.remove_key(key) {
removed_index.store_key(current);
}
}
indices_to_remove.sort_unstable();
indices_to_remove.dedup();
for idx in indices_to_remove.into_iter().rev() {
self.data_frame.remove(idx);
}
let remaining_keys = self.index.get_keys().to_vec();
self.index = KeyIndex::new(remaining_keys);
Ok(Self::new(removed_index, removed_data))
}
fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
if self.index.is_empty() {
self.index = other.index.clone();
self.data_frame = other.data_frame.clone();
return Ok(Continue::End);
}
if other.index.is_empty() {
return Ok(Continue::End);
}
if self.is_empty() {
let n = other.nrows();
self.data_frame = self
.index
.get_keys()
.iter()
.map(|k| TypedDataArray::default_init(k, n))
.collect();
}
Ok(Continue::Continue)
}
fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
let missing_keys: Vec<Key> = other
.index
.get_keys()
.iter()
.filter(|key| self.index.get_column_index(key).is_none())
.cloned()
.collect();
if missing_keys.is_empty() {
return Ok(());
}
let nrows = self.nrows();
for key in missing_keys {
let column = TypedDataArray::default_init(&key, nrows);
self.data_frame.push(column);
self.index.store_key(key);
}
Ok(())
}
fn try_extend(&mut self, mut other: Self) -> Result<(), Error> {
let mut joined_keys = self.index.clone();
for key in other.keys() {
if self.index.get_column_index(key).is_none() {
joined_keys.store_key(key.clone());
}
}
let self_nrows = self.nrows();
let other_nrows = other.nrows();
let mut new_data: Vec<TypedDataArray> = Vec::with_capacity(joined_keys.len());
for key in joined_keys.get_keys() {
let self_col = self
.index
.get_column_index(key)
.map(|i| std::mem::take(&mut self.data_frame[i]));
let other_col = other
.index
.get_column_index(key)
.map(|i| std::mem::take(&mut other.data_frame[i]));
let col = match (self_col, other_col) {
(Some(mut s), Some(o)) => {
s.extend_from(&o);
s
}
(Some(mut s), None) => {
let filler = TypedDataArray::default_init(key, other_nrows);
s.extend_from(&filler);
s
}
(None, Some(o)) => {
let mut base = TypedDataArray::default_init(key, self_nrows);
base.extend_from(&o);
base
}
(None, None) => TypedDataArray::default_init(key, self_nrows + other_nrows),
};
new_data.push(col);
}
*self = ColumnFrame {
index: joined_keys,
data_frame: new_data,
};
Ok(())
}
pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
if self.index.check_order_of_indexes(&other.index).is_err() {
return self.try_extend(other);
}
trace!(
"Extend columns from other {:?} vs {:?}",
other.index.get_keys(),
self.index.get_keys()
);
self.extend_columns_from_other(&other)?;
other.extend_columns_from_other(self)?;
let keys = self.index.get_keys().to_vec();
for key in &keys {
let self_idx = self.index.get_column_index(key).unwrap();
let other_idx = other.index.get_column_index(key).unwrap();
let other_col = std::mem::take(&mut other.data_frame[other_idx]);
let self_col = &mut self.data_frame[self_idx];
self_col.extend_from(&other_col);
}
Ok(())
}
pub fn replace(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
if self.nrows() > other.nrows() {
return Err(Error::DataSetSizeDoesntMatch(self.nrows(), other.nrows()));
}
self.index = other.index;
self.data_frame = other.data_frame;
Ok(())
}
pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
if self.check_or_init_frame(&right)?.should_end() {
return Ok(());
}
let timer = std::time::Instant::now();
let new_columns = right.index.get_complement_keys(self.index.get_keys());
self.extend_columns_from_other(&right)?;
tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
let column_mappings: Vec<(usize, usize)> = new_columns
.iter()
.filter_map(|key| {
let left_idx = self.index.get_column_index(key)?;
let right_idx = right.index.get_column_index(key)?;
Some((left_idx, right_idx))
})
.collect();
let timer = std::time::Instant::now();
let index = Index::new(keys.to_vec(), self);
tracing::debug!("Left index build took: {}ns", timer.elapsed().as_nanos());
tracing::trace!("Index {index:?}");
let timer = std::time::Instant::now();
let right_index = Index::new(keys.to_vec(), &right);
let joined_idx = index.join(right_index);
tracing::debug!(
"Right index build and join took: {}ns",
timer.elapsed().as_nanos()
);
let timer = std::time::Instant::now();
let joined_idx_len = joined_idx.len();
for (left_col_idx, right_col_idx) in &column_mappings {
let right_col = right.get_column_by_idx(*right_col_idx)?;
let left_col = self.get_column_by_idx_mut(*left_col_idx)?;
for (left_indices, right_indices) in &joined_idx {
for right_row_idx in right_indices {
let value = right_col.get_or_null(*right_row_idx);
for left_idx in left_indices {
let _ = left_col.set(*left_idx, value.clone());
}
}
}
}
let elapsed = timer.elapsed();
tracing::debug!(
"Filled {} rows in {}ms|{}s",
joined_idx_len,
elapsed.as_millis(),
elapsed.as_secs()
);
Ok(())
}
pub fn add_single_column<K, V>(&mut self, key: K, column: V) -> Result<(), Error>
where
K: Into<Key>,
V: Into<TypedDataArray>,
{
let key = key.into();
let mut column: TypedDataArray = column.into();
if self.index.get_column_index(&key).is_some() {
return Err(Error::ColumnAlreadyExists(key));
}
if self.nrows() != column.len() && !self.is_empty() {
return Err(Error::DataSetSizeDoesntMatch(self.nrows(), column.len()));
}
if self.is_empty() && !column.is_empty() {
let new_len = column.len();
let keys = self.index.get_keys().to_vec();
for (i, existing_col) in self.data_frame.iter_mut().enumerate() {
*existing_col = TypedDataArray::default_init(&keys[i], new_len);
}
}
if matches!(column.data_type(), crate::DataType::Unknown)
&& !matches!(key.ctype, crate::DataType::Unknown)
{
let _ = column.try_convert_to_dtype(key.ctype);
}
self.index.store_key(key);
self.data_frame.push(column);
Ok(())
}
pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
self.extend_columns_from_other(&other)?;
let nrows = self.nrows();
for (idx, key) in other.index.get_keys().iter().enumerate() {
if let Some(index) = self.index.get_column_index(key) {
let arr = match other.get_column_by_idx(idx) {
Ok(arr) => arr.clone(),
Err(_) => continue,
};
trace!(
"Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
nrows,
arr.len()
);
let dst = self.get_column_by_idx_mut(index)?;
if arr.len() != nrows {
dst.fill(DataValue::Null);
} else {
dst.assign(&arr);
}
}
}
Ok(())
}
pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
if other.nrows() != 1 {
return Err(Error::CannotBroadcast);
}
let other_keys: Vec<_> = other
.index
.get_keys()
.iter()
.filter(|k| self.index.get_column_index(k).is_none())
.cloned()
.collect();
let nrows = self.nrows();
for key in &other_keys {
self.index.store_key(key.clone());
let value = other.get_column(key)?;
let first = value.get_or_null(0);
let mut new_col = TypedDataArray::default_init(key, nrows);
new_col.fill(first);
self.data_frame.push(new_col);
}
Ok(())
}
pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
let self_nrows = self.nrows();
let other_nrows = other.nrows();
let max_rows = self_nrows * other_nrows;
for other_key in other.keys() {
if self.index.get_column_index(other_key).is_none() {
self.index.store_key(other_key.clone());
} else {
self.index.store_key(Key::new(
format!("{}-{}", other_key, other_key.id()).as_str(),
other_key.ctype,
));
}
}
let mut df: Vec<TypedDataArray> = Vec::with_capacity(self.index.len());
std::mem::swap(&mut df, &mut self.data_frame);
for col in df.into_iter() {
let mut new_col = vec![DataValue::Null; max_rows];
for (idx, value) in col.iter_values().enumerate() {
for self_idx in 0..other_nrows {
new_col[self_idx + (idx * other_nrows)] = value.clone();
}
}
self.data_frame
.push(TypedDataArray::new(col.data_type(), new_col));
}
for col in other.data_frame.into_iter() {
let mut new_col: Vec<DataValue> = Vec::with_capacity(max_rows);
let tile = col.to_vec();
for _ in 0..self_nrows {
new_col.extend(tile.iter().cloned());
}
self.data_frame
.push(TypedDataArray::new(col.data_type(), new_col));
}
Ok(())
}
pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
use JoinBy::*;
match &join_type.join_type {
AddColumns => self.add_columns(right),
Replace => self.replace(right),
Extend => self.extend(right),
Broadcast => self.broadcast(right),
CartesianProduct => self.cartesian_product(right),
JoinById(join) => self.join_by_id_inner(right, &join.keys),
}
}
#[deprecated(note = "allocates O(n); use get_column() for zero-copy typed access")]
pub fn get_single_column(&self, key: &Key) -> Option<Array1<DataValue>> {
self.index
.get_column_index(key)
.and_then(|x| self.get_column_by_idx(x).ok())
.map(|col| col.as_generic_array())
}
pub fn get_single_column_typed<T: Extract>(&self, key: &Key) -> Option<Array1<T>> {
self.index
.get_column_index(key)
.and_then(|x| self.get_column_by_idx(x).ok())
.map(|col| col.to_typed_array())
}
pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
let index = self
.index
.get_column_index(key)
.ok_or(Error::NotFound(key.clone()))?;
let column = self.get_column_by_idx(index)?;
let values: Vec<DataValue> = column.to_vec();
let mut data_with_index: Vec<(usize, &DataValue)> = values.iter().enumerate().collect();
tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
data_with_index.sort_by(
|(a_idx, a_val), (b_idx, b_val)| match a_val.partial_cmp(b_val) {
Some(ordering) => ordering.then_with(|| a_idx.cmp(b_idx)),
None => {
let a_null = matches!(a_val, DataValue::Null);
let b_null = matches!(b_val, DataValue::Null);
match (a_null, b_null) {
(true, true) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
(true, false) => std::cmp::Ordering::Greater.then_with(|| a_idx.cmp(b_idx)),
(false, true) => std::cmp::Ordering::Less.then_with(|| a_idx.cmp(b_idx)),
(false, false) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
}
}
},
);
tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
let indicies = data_with_index
.into_iter()
.map(|(idx, _)| idx)
.collect::<Vec<_>>();
Ok(sorted_df::SortedDataFrame::new(self, indicies))
}
pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
let mut final_indices = Vec::new();
let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
for rule in &filter.rules {
final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
}
final_indices.sort_unstable();
final_indices.dedup();
let new_data: Vec<Vec<DataValue>> = self
.data_frame
.iter()
.map(|col| {
final_indices
.iter()
.map(|&idx| col.get_or_null(idx))
.collect::<Vec<DataValue>>()
})
.collect();
Ok(ColumnFrame::new(self.index.clone(), new_data))
}
}
#[macro_export]
macro_rules! df {
($($everything:tt)*) => {
$crate::DataFrame::new($crate::column_frame!($($everything)*))
};
}
#[macro_export]
macro_rules! column_frame {
($($key:expr => vec![$($value:expr),*]),*) => {
$crate::column_frame!($($key => [$($value),*]),*)
};
($($key:expr => [$($value:expr),*]),*) => {
{
let data: ::std::vec::Vec<::std::vec::Vec<$crate::data_value::DataValue>> = vec!($(
vec![$($value.into(),)*],
)*);
let _keys = vec![$($key.into(),)*];
$crate::ColumnFrame::new(
$crate::KeyIndex::new(_keys),
data
)
}
};
($($key:expr => $value:expr $(,)?)*) => {
{
let _data: ::std::vec::Vec<::std::vec::Vec<$crate::data_value::DataValue>> =
vec![$(vec![$value.into()],)*];
let _keys = vec![$($key.into(),)*];
tracing::trace!("{_keys:?}, {_data:?}");
$crate::ColumnFrame::new(
$crate::KeyIndex::new(_keys),
_data,
)
}
};
}
#[cfg(test)]
mod test {
use crate::{filter::FilterRules, JoinById};
use super::*;
use data_value::stdhashmap;
use rstest::*;
use tracing_test::traced_test;
#[rstest]
#[case(
column_frame! {
"t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => [1752001987000000u64],
"b" => [5],
"c" => [8]
},
FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => [1752001987000000f64],
"b" => [5],
"c" => [8]
},
FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => [1752001987000000i64],
"b" => [5],
"c" => [8]
},
FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => [1751001987000000u64],
"b" => [4],
"c" => [7]
},
FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => ["2025-07-08 18:13:07"],
"b" => [4],
"c" => [7]
},
FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => [],
"b" => [],
"c" => []
},
FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => [DataValue::Vec(vec![])],
"b" => [5],
"c" => [ 8]
},
FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"t" => [DataValue::Vec(vec![1.into()])],
"b" => [6],
"c" => [9]
},
FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [1, 2],
"b" => [4, 5],
"c" => [7, 8]
},
FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [2],
"b" => [5],
"c" => [8]
},
FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [],
"b" => [],
"c" => []
},
FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [1, 2],
"b" => [4, 5],
"c" => [7, 8]
},
FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [2],
"b" => [5],
"c" => [8]
},
FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => ["abcd", "ab", "abcdefg"],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => ["abcd","abcdefg"],
"b" => [4, 6],
"c" => [7, 9]
},
FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [1],
"b" => [4],
"c" => [7]
},
FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [2, 3],
"b" => [5, 6],
"c" => [8, 9]
},
FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1f64, 2f64, 3f64],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a" => [1f64, 2f64],
"b" => [4, 5],
"c" => [7, 8]
},
FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
)]
#[case(
column_frame! {
"a" => [1f64, 2f64, 3f64],
"b" => [4i64, 5i64, 6i64],
"c" => [7i64, 8i64, 9i64]
},
column_frame! {
"a" => [1f64, 2f64],
"b" => [4i64, 5i64],
"c" => [7i64, 8i64]
},
FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
)]
#[traced_test]
fn filter_test(
#[case] df: ColumnFrame,
#[case] expected: ColumnFrame,
#[case] filter: FilterRules,
) {
let filtered = df.filter(&filter).expect("BUG: cannot filter");
assert_eq!(filtered, expected);
}
#[rstest]
#[traced_test]
fn test_macro() {
let df = column_frame! {
"a" => 1,
"b" => 2,
"c" => 3,
"d" => 4,
};
assert_eq!(df.nrows(), 1);
assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
.expect("BUG: cannot create array");
assert_eq!(df.select(None), f);
let df = column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
};
assert_eq!(df.nrows(), 3);
assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
let f = Array2::from_shape_vec(
(3, 3),
vec![
1.into(),
4.into(),
7.into(),
2.into(),
5.into(),
8.into(),
3.into(),
6.into(),
9.into(),
],
)
.expect("BUG: cannot create array");
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(selected, f);
let df1 = df! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
};
let formatted = format!("{}", df);
debug!("{}", formatted);
assert_eq!(df1, crate::DataFrame::from(df));
}
#[rstest]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a_new" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
vec![("a", "a_new".into())]
)]
#[traced_test]
fn rename_test(
#[case] df: ColumnFrame,
#[case] expected: ColumnFrame,
#[case] keys: Vec<Key>,
#[case] renames: Vec<(&str, Key)>,
) {
let mut df = df;
for (old, new) in renames {
df.rename_key(old, new).expect("BUG: cannot rename key");
}
assert_eq!(df, expected);
assert_eq!(df.keys(), keys.as_slice());
}
#[rstest]
#[case(
column_frame!("a" => [1, 2, 3]),
Key::new("a", crate::DataType::I32),
column_frame!("a" => [1i32, 2i32, 3i32])
)]
#[case(
column_frame!("a" => [1, 2, 3]),
Key::new("a", crate::DataType::U32),
column_frame!("a" => [1u32, 2u32, 3u32])
)]
#[case(
column_frame!("a" => [1, 2, 3]),
Key::new("a", crate::DataType::I64),
column_frame!("a" => [1i64, 2i64, 3i64])
)]
#[case(
column_frame!("a" => [1, 2, 3]),
Key::new("a", crate::DataType::U64),
column_frame!("a" => [1u64, 2u64, 3u64])
)]
#[case(
column_frame!("a" => [1, 2, 3]),
Key::new("a", crate::DataType::F64),
column_frame!("a" => [1f64, 2f64, 3f64])
)]
#[case(
column_frame!("a" => [1, 2, 3]),
Key::new("a", crate::DataType::F32),
column_frame!("a" => [1f32, 2f32, 3f32])
)]
fn test_try_fix_dtype(
#[case] mut df: ColumnFrame,
#[case] key: Key,
#[case] expected: ColumnFrame,
) {
assert!(df.try_fix_column_by_key(&key).is_ok());
assert_eq!(
df.select(Some(&[key.clone()])),
expected.select(Some(&[key.clone()]))
);
}
#[fixture]
fn unknown_df() -> ColumnFrame {
let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
hm.insert("a".into(), vec![1u32.into()]);
hm.insert("b".into(), vec![3i64.into()]);
hm.insert("c".into(), vec![1f64.into()]);
hm.insert("d".into(), vec![1u64.into()]);
hm.into()
}
#[rstest]
#[case(stdhashmap!(
"a" => crate::DataType::U32,
"b" => crate::DataType::I64,
"c" => crate::DataType::F64,
"d" => crate::DataType::U64)
)]
fn test_try_fix_dtype_unknown(
mut unknown_df: ColumnFrame,
#[case] dtypes: HashMap<String, crate::DataType>,
) {
for dtype in dtypes.iter() {
let t: &Key = unknown_df
.keys()
.iter()
.find(|x| x.name() == dtype.0)
.unwrap();
assert_ne!(t.ctype, crate::DataType::Unknown);
}
assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
for dtype in dtypes.iter() {
let t: &Key = unknown_df
.keys()
.iter()
.find(|x| x.name() == dtype.0)
.unwrap();
assert_eq!(t.ctype, *dtype.1);
assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
}
assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
}
#[rstest]
#[case(
column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
Key::new("a", crate::DataType::F32),
column_frame!("a" => [1f32, 2f32, 3f32])
)]
#[traced_test]
fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
assert!(df.try_fix_dtype().is_ok());
assert_eq!(
df.select(Some(&[key.clone()])),
expected.select(Some(&[key]))
)
}
#[rstest]
#[traced_test]
fn test_not_key_fix() {
let mut cf = column_frame!("a" => [1]);
let non_existing = Key::new("b", crate::DataType::I32);
assert!(cf.try_fix_column_by_key(&non_existing).is_err());
}
#[rstest]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
vec![("a", "a_alias")]
)]
#[traced_test]
fn alias_test(
#[case] df: ColumnFrame,
#[case] keys: Vec<Key>,
#[case] aliases: Vec<(&str, &str)>,
) {
let mut df = df;
for (old, new) in aliases {
df.add_alias(old, new).expect("BUG: cannot rename key");
}
let origin_keys = df.keys().to_vec();
let selected_aliases = df.select(Some(keys.as_slice()));
let selected = df.select(Some(origin_keys.as_slice()));
assert_eq!(selected, selected_aliases);
}
#[rstest]
#[traced_test]
fn test_mut_view() {
let keys: Vec<Key> = vec!["a".into(), "b".into()];
let index = KeyIndex::new(keys.clone());
let data_frame = vec![
Array1::from_vec(vec![
DataValue::from(1f64),
DataValue::from(2f64),
DataValue::from(f64::NAN),
]),
Array1::from_vec(vec![
DataValue::from(4f32),
DataValue::from(f32::NAN),
DataValue::from(f32::INFINITY),
]),
];
let mut df = ColumnFrame::new(index.clone(), data_frame);
for col in &mut df.data_frame {
col.mapv_inplace(|x| match x {
DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
e => e,
});
}
let expected = ColumnFrame::new(
index,
vec![
Array1::from_vec(vec![
DataValue::from(1f64),
DataValue::from(2f64),
DataValue::from(0f64),
]),
Array1::from_vec(vec![
DataValue::from(4f32),
DataValue::from(0f32),
DataValue::from(0f32),
]),
],
);
assert_eq!(df, expected);
}
#[rstest]
#[traced_test]
fn dummy_test() {
let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
let index = KeyIndex::new(keys.clone());
let data_frame = vec![
Array1::from_vec(vec![DataValue::U32(1)]),
Array1::from_vec(vec![DataValue::I32(2)]),
Array1::from_vec(vec![DataValue::I64(3)]),
Array1::from_vec(vec![DataValue::U64(4)]),
];
let frame = ColumnFrame::new(index, data_frame);
assert_eq!(
frame.get_by_row_index(&"a".into(), 0),
Some(DataValue::U32(1))
);
assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
assert_eq!(
frame.select(Some(&["a".into(), "b".into()])),
Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
.expect("BUG: cannot create array")
);
}
#[rstest]
#[traced_test]
fn dummy_test_multiple_rows() {
let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
let index = KeyIndex::new(keys.clone());
let data_frame = vec![
Array1::from_vec(vec![DataValue::U32(1), DataValue::U32(12)]),
Array1::from_vec(vec![DataValue::I32(2), DataValue::I32(22)]),
Array1::from_vec(vec![DataValue::I64(3), DataValue::I64(32)]),
Array1::from_vec(vec![DataValue::U64(4), DataValue::U64(42)]),
];
let frame = ColumnFrame::new(index, data_frame);
assert_eq!(
frame.get_by_row_index(&"a".into(), 0),
Some(DataValue::U32(1))
);
assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
let arr = Array2::from_shape_vec(
(2, 2),
vec![
DataValue::U32(1),
DataValue::I32(2),
DataValue::U32(12),
DataValue::I32(22),
],
)
.expect("BUG: cannot create array");
trace!("{arr:?}");
assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
}
#[rstest]
#[traced_test]
fn dummy_test_multiple_rows_push() {
let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
let index = KeyIndex::new(keys.clone());
let data_frame = vec![
Array1::from_vec(vec![DataValue::U32(1), DataValue::U32(12)]),
Array1::from_vec(vec![DataValue::I32(2), DataValue::I32(22)]),
Array1::from_vec(vec![DataValue::I64(3), DataValue::I64(32)]),
Array1::from_vec(vec![DataValue::U64(4), DataValue::U64(42)]),
];
let mut frame = ColumnFrame::new(index, data_frame);
assert!(frame
.push(data_value::stdhashmap!(
"a" => DataValue::U32(2),
"b" => DataValue::I32(3),
"c" => DataValue::I64(4),
"d" => DataValue::U64(5)
))
.is_ok());
let arr = Array2::from_shape_vec(
(3, 2),
vec![
DataValue::U32(1),
DataValue::I32(2),
DataValue::U32(12),
DataValue::I32(22),
DataValue::U32(2),
DataValue::I32(3),
],
)
.expect("BUG: cannot create array");
trace!("{arr:?}");
assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
let result = frame.push(data_value::stdhashmap!(
"a" => DataValue::U32(34),
"b" => DataValue::I32(44),
"c" => DataValue::I64(54),
"e" => DataValue::F32(6f32)
));
assert!(result.is_ok(), "{result:?}");
let arr = Array2::from_shape_vec(
(4, 2),
vec![
DataValue::U64(4),
DataValue::Null,
DataValue::U64(42),
DataValue::Null,
DataValue::U64(5),
DataValue::Null,
DataValue::Null,
DataValue::F32(6f32),
],
)
.expect("BUG: cannot create array");
trace!("{arr:?}");
assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
}
#[rstest]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, 4]
},
Some(vec![Key::from("group_id")]),
ndarray::array!([1.into()], [2.into()])
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, 4]
},
Some(vec!["group_id".into(), "feed_tag".into()]),
ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, DataValue::Null]
},
Some(vec!["feed_tag".into()]),
ndarray::array![[3.into()], [DataValue::Null]]
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![1, DataValue::Null]
},
Some(vec!["feed_tag2".into()]),
Array2::<DataValue>::default((0, 0))
)]
#[traced_test]
fn test_select(
#[case] input: ColumnFrame,
#[case] keys: Option<Vec<Key>>,
#[case] expected: Array2<DataValue>,
) {
trace!("input={input:?}");
let keys_slice = keys.as_deref();
let selected = input.select(keys_slice);
trace!("selected={selected:?}");
assert_eq!(selected, expected);
let selected = input.select_transposed(keys_slice);
trace!("selected_transposed={selected:?}");
assert!(selected.is_ok());
assert_eq!(selected.unwrap(), expected.t());
}
#[rstest]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, 4]
},
Key::from("group_id"),
Some(ndarray::array!(1.into(), 2.into()))
)]
#[case(
column_frame! {
"group_id" => vec![1, 2, 5, 6],
"feed_tag" => vec![3, 4, 7, 8]
},
Key::from("group_id"),
Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![1, 1]
},
Key::from("feed_tag1"),
None
)]
#[traced_test]
#[allow(deprecated)]
fn test_select_column(
#[case] input: ColumnFrame,
#[case] key: Key,
#[case] expected: Option<Array1<DataValue>>,
) {
let selected = input.select_column(&key);
trace!("selected={selected:?}");
match expected {
Some(expected) => {
assert!(selected.is_some());
assert_eq!(selected.expect("BUG: checked above"), expected);
}
None => assert!(selected.is_none()),
}
}
#[test]
#[traced_test]
fn empty_join_test() {
let join = JoinRelation::add_columns();
let mut column_frame = ColumnFrame::default();
column_frame
.add_single_column("group_id", Vec::<DataValue>::new())
.expect("BUG: cannot add column");
let column_frame2 = column_frame! {
"group_id" => vec![2, 1, 3],
"feed_tag" => vec![1, 1, 1],
"clicks" => vec![100, 10, 10],
"imps" => vec![1000, 200, 200]
};
assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
let joined = column_frame.join(column_frame2, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame:?}");
assert_eq!(
column_frame.select(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into()
])),
ndarray::array!(
[2.into(), 1.into(), 100.into(), 1000.into()],
[1.into(), 1.into(), 10.into(), 200.into()],
[3.into(), 1.into(), 10.into(), 200.into()],
)
);
let mut column_frame2 = column_frame! {
"feed_tag" => vec![1, 1, 1],
"clicks" => vec![100, 10, 10],
"imps" => vec![1000, 200, 200]
};
let mut column_frame = ColumnFrame::default();
column_frame
.add_single_column("group_id", Array1::from_vec(Vec::<DataValue>::new()))
.expect("BUG: cannot add column");
let joined = column_frame2.join(column_frame, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame2:?}");
assert_eq!(
column_frame2.select(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into()
])),
ndarray::array!(
[DataValue::Null, 1.into(), 100.into(), 1000.into()],
[DataValue::Null, 1.into(), 10.into(), 200.into()],
[DataValue::Null, 1.into(), 10.into(), 200.into()],
)
);
let mut column_frame = ColumnFrame::default();
column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
let joined = column_frame2.join(column_frame, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame2:?}");
assert_eq!(
column_frame2.select(Some(&[
"group_id2".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into()
])),
ndarray::array!(
[DataValue::Null, 1.into(), 100.into(), 1000.into()],
[DataValue::Null, 1.into(), 10.into(), 200.into()],
[DataValue::Null, 1.into(), 10.into(), 200.into()],
)
);
}
#[test]
#[traced_test]
fn join_test_multiple() {
let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
let mut column_frame = column_frame! {
"group_id" => vec![1, 1, 3]
};
let column_frame2 = column_frame! {
"group_id" => vec![2, 1, 1],
"clicks" => vec![100, 10, 10],
"imps" => vec![1000, 200, 200]
};
let joined = column_frame.join(column_frame2, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame:?}");
assert_eq!(
column_frame.select(Some(&["group_id".into(), "clicks".into(), "imps".into(),])),
ndarray::array!(
[1.into(), 10.into(), 200.into()],
[1.into(), 10.into(), 200.into()],
[3.into(), DataValue::Null, DataValue::Null],
)
)
}
#[test]
#[traced_test]
fn join_test_no_matches() {
let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
let mut column_frame = column_frame! {
"group_id" => vec![DataValue::I32(1), DataValue::I32(2), DataValue::I32(3)]
};
let column_frame2 = column_frame! {
"group_id" => vec![DataValue::I32(4), DataValue::I32(5), DataValue::I32(6)],
"clicks" => vec![DataValue::I32(100), DataValue::I32(200), DataValue::I32(300)],
};
let joined = column_frame.join(column_frame2, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame:?}");
assert_eq!(
column_frame.select(Some(&["group_id".into(), "clicks".into()])),
ndarray::array!(
[DataValue::I32(1), DataValue::Null],
[DataValue::I32(2), DataValue::Null],
[DataValue::I32(3), DataValue::Null],
)
)
}
#[test]
#[traced_test]
fn join_test() {
let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
"group_id".into(),
"feed_tag".into(),
])));
let mut column_frame = column_frame! {
"group_id" => vec![1, 2, 8],
"feed_tag" => vec![1, 1, 10]
};
let column_frame2 = column_frame! {
"group_id" => vec![2, 1, 3],
"feed_tag" => vec![1, 1, 1],
"clicks" => vec![100, 10, 10],
"imps" => vec![1000, 200, 200]
};
assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
let joined = column_frame.join(column_frame2, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame:?}");
assert_eq!(
column_frame.select(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into()
])),
ndarray::array!(
[1.into(), 1.into(), 10.into(), 200.into()],
[2.into(), 1.into(), 100.into(), 1000.into()],
[8.into(), 10.into(), DataValue::Null, DataValue::Null]
),
"DF {column_frame:?}"
);
assert_eq!(
column_frame
.select_view(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into()
]))
.unwrap()
.row_view(),
ndarray::array!(
[1.into(), 1.into(), 10.into(), 200.into()],
[2.into(), 1.into(), 100.into(), 1000.into()],
[8.into(), 10.into(), DataValue::Null, DataValue::Null]
)
.view(),
"DF {column_frame:?}"
)
}
#[test]
#[traced_test]
fn join_test_with_additional() {
let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
"group_id".into(),
"feed_tag".into(),
])));
let mut column_frame = column_frame! {
"group_id" => vec![1, 2, 8],
"feed_tag" => vec![1, 1, 10],
"clicked" => vec![0, 0, 1]
};
let column_frame2 = column_frame! {
"group_id" => vec![2, 1, 3],
"feed_tag" => vec![1, 1, 1],
"clicks" => vec![100, 10, 10],
"imps" => vec![1000, 200, 200]
};
assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
let joined = column_frame.join(column_frame2, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame:?}");
assert_eq!(
column_frame.select(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into(),
"clicked".into()
])),
ndarray::array!(
[1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
[2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
[
8.into(),
10.into(),
DataValue::Null,
DataValue::Null,
1.into()
]
)
);
assert_eq!(
column_frame
.select_view(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into(),
"clicked".into()
]))
.unwrap()
.row_view(),
ndarray::array!(
[1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
[2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
[
8.into(),
10.into(),
DataValue::Null,
DataValue::Null,
1.into()
]
)
.view(),
"DF {column_frame:?}"
);
}
#[test]
#[traced_test]
fn join_test_with_additional_single() {
let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
"group_id".into(),
"feed_tag".into(),
])));
let mut column_frame = column_frame! {
"group_id" => vec![1, 2, 8],
"feed_tag" => vec![1, 1, 10],
"clicked" => vec![0, 0, 1]
};
let column_frame2 = column_frame! {
"a" => vec![1],
"group_id" => vec![2],
"feed_tag" => vec![1],
"clicks" => vec![10],
"imps" => vec![200]
};
assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
let joined = column_frame.join(column_frame2, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame:?}");
assert_eq!(
column_frame.select(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into(),
"clicked".into()
])),
ndarray::array!(
[
1.into(),
1.into(),
DataValue::Null,
DataValue::Null,
0.into(),
],
[2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
[
8.into(),
10.into(),
DataValue::Null,
DataValue::Null,
1.into()
]
)
)
}
#[rstest]
#[traced_test]
fn cartesian_product_join() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"zone_id" => vec![111111, 111133],
"zone_avg_ctr" => vec![0.1, 0.001]
};
assert!(df
.join(
ColumnFrame::default(),
&JoinRelation::new(JoinBy::CartesianProduct)
)
.is_ok());
let join = JoinRelation::new(JoinBy::CartesianProduct);
let result = df.join(df2, &join);
assert!(result.is_ok(), "{result:?}");
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into(), 111111.into(), 0.1.into()],
[1.into(), 1.into(), 111133.into(), 0.001.into()],
[2.into(), 2.into(), 111111.into(), 0.1.into()],
[2.into(), 2.into(), 111133.into(), 0.001.into()],
[3.into(), 3.into(), 111111.into(), 0.1.into()],
[3.into(), 3.into(), 111133.into(), 0.001.into()],
)
);
let df2 = column_frame! {
"zone_id" => vec![111]
};
let result = df.join(df2, &join);
assert!(result.is_ok(), "{result:?}");
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
[1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
[2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
[2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
[3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
[3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
)
);
}
#[rstest]
#[traced_test]
fn broadcast_join() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"zone_id" => vec![111111]
};
assert!(df
.join(
ColumnFrame::default(),
&JoinRelation::new(JoinBy::Broadcast)
)
.is_ok());
let join = JoinRelation::new(JoinBy::Broadcast);
assert!(df.join(df2, &join).is_ok());
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into(), 111111.into()],
[2.into(), 2.into(), 111111.into()],
[3.into(), 3.into(), 111111.into()]
)
);
}
#[rstest]
#[traced_test]
fn merge_test() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"group_id" => vec![11, 21, 31],
"feed_tag" => vec![12, 22, 32]
};
let join = JoinRelation::new(JoinBy::Replace);
assert!(df.join(df2, &join).is_ok());
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[11.into(), 12.into()],
[21.into(), 22.into()],
[31.into(), 32.into()]
)
);
}
#[rstest]
#[traced_test]
fn extend_test() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"group_id" => vec![11, 21, 31],
"feed_tag" => vec![5, 6, 7]
};
assert!(df
.join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
.is_ok());
let join = JoinRelation::new(JoinBy::Extend);
assert!(df.join(df2, &join).is_ok());
let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into()],
[2.into(), 2.into()],
[3.into(), 3.into()],
[5.into(), 11.into()],
[6.into(), 21.into()],
[7.into(), 31.into()]
)
);
let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
trace!("{as_map:?}");
assert_eq!(
as_map,
stdhashmap!(
"feed_tag" => vec![1, 2, 3, 5, 6, 7],
"group_id" => vec![1, 2, 3, 11, 21, 31]
)
);
let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
trace!("{as_map:?}");
assert_eq!(as_map, HashMap::default());
}
#[rstest]
#[traced_test]
fn extend_test_with_non_existing_cols() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let mut df2 = column_frame! {
"group_id" => vec![11, 21, 31],
"feed_tag" => vec![5, 6, 7],
"clicks" => vec![100, 200, 300],
"impressions" => vec![1000, 2000, 3000]
};
let df_bckp = df.clone();
let join = JoinRelation::new(JoinBy::Extend);
assert!(df.join(df2.clone(), &join).is_ok());
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into(), DataValue::Null, DataValue::Null],
[2.into(), 2.into(), DataValue::Null, DataValue::Null],
[3.into(), 3.into(), DataValue::Null, DataValue::Null],
[11.into(), 5.into(), 100.into(), 1000.into()],
[21.into(), 6.into(), 200.into(), 2000.into()],
[31.into(), 7.into(), 300.into(), 3000.into()]
)
);
let join = JoinRelation::new(JoinBy::Extend);
let r = df2.join(df_bckp, &join);
assert!(r.is_ok(), "{r:?}");
let selected = df2.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[11.into(), 5.into(), 100.into(), 1000.into()],
[21.into(), 6.into(), 200.into(), 2000.into()],
[31.into(), 7.into(), 300.into(), 3000.into()],
[1.into(), 1.into(), DataValue::Null, DataValue::Null],
[2.into(), 2.into(), DataValue::Null, DataValue::Null],
[3.into(), 3.into(), DataValue::Null, DataValue::Null]
)
);
}
#[rstest]
#[traced_test]
fn extend_test_with_non_existing_cols_wrong_order() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"feed_tag" => vec![5, 6, 7],
"group_id" => vec![11, 21, 31]
};
let join = JoinRelation::new(JoinBy::Extend);
let err = df.join(df2, &join);
assert!(err.is_ok(), "{err:?}");
assert_eq!(df.nrows(), 6);
assert_eq!(
df.select(Some(&["group_id".into(), "feed_tag".into()])),
ndarray::array![
[1.into(), 1.into()],
[2.into(), 2.into()],
[3.into(), 3.into()],
[11.into(), 5.into()],
[21.into(), 6.into()],
[31.into(), 7.into()],
]
);
}
#[rstest]
#[traced_test]
fn extend_test_wrong_order_with_extra_columns() {
let mut df = column_frame! {
"a" => vec![1, 2],
"b" => vec![10, 20]
};
let df2 = column_frame! {
"c" => vec![100, 200],
"a" => vec![3, 4]
};
let join = JoinRelation::new(JoinBy::Extend);
assert!(df.join(df2, &join).is_ok());
assert_eq!(df.nrows(), 4);
assert_eq!(
df.select(Some(&["a".into(), "b".into(), "c".into()])),
ndarray::array![
[1.into(), 10.into(), DataValue::Null],
[2.into(), 20.into(), DataValue::Null],
[3.into(), DataValue::Null, 100.into()],
[4.into(), DataValue::Null, 200.into()],
]
);
}
#[rstest]
#[traced_test]
fn test_replace_not_compatible() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"feed_tag" => vec![5, 6],
"group_id" => vec![11, 21]
};
let join = JoinRelation::new(JoinBy::Replace);
let err = df.join(df2, &join);
assert!(err.is_err(), "{err:?}");
let empty = ColumnFrame::default();
let err = df.join(empty, &join);
assert!(err.is_ok(), "{err:?}");
}
#[rstest]
#[traced_test]
fn test_different_data() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"group_id" => vec![11, 21],
"a" => vec![5, 6]
};
let join = JoinRelation::new(JoinBy::Extend);
let err = df.join(df2, &join);
assert!(err.is_ok(), "{err:?}");
println!("{df:?}");
let expected_df = ColumnFrame::new(
KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
vec![
Array1::from_vec(vec![1.into(), 2.into(), 3.into(), 11.into(), 21.into()]),
Array1::from_vec(vec![
1.into(),
2.into(),
3.into(),
DataValue::Null,
DataValue::Null,
]),
Array1::from_vec(vec![
DataValue::Null,
DataValue::Null,
DataValue::Null,
5.into(),
6.into(),
]),
],
);
assert_eq!(df, expected_df)
}
#[rstest]
#[traced_test]
fn serde_column_frame() {
let df = column_frame! {
"group_id" => vec![1u64, 2u64, 3u64],
"feed_tag" => vec![1u64, 2u64, 3u64]
};
let key_idx = df.index.clone();
let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
let deserialized: KeyIndex =
serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
assert_eq!(key_idx, deserialized);
assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
let deserialized: ColumnFrame =
serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
assert_eq!(df, deserialized);
}
#[rstest]
#[traced_test]
fn update_value() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let group_id: Key = "group_id".into();
let v = df.get_by_row_index(&group_id, 1);
assert_eq!(v, Some(DataValue::I32(2)));
df.set_by_row_index(&group_id, 1, DataValue::I32(22))
.expect("set must succeed");
let v = df.get_by_row_index(&group_id, 1);
assert_eq!(v, Some(DataValue::I32(22)));
assert!(df
.set_by_row_index(&"group_id2".into(), 1, DataValue::Null)
.is_err());
}
#[rstest]
fn get_single_column_typed_f64() {
let df = column_frame! {
"a" => [1i32, 2i32, 3i32],
"b" => [10u64, 20u64, 30u64]
};
let key: Key = "a".into();
let col = df.get_single_column_typed::<f64>(&key).unwrap();
assert_eq!(col, ndarray::arr1(&[1.0, 2.0, 3.0]));
}
#[rstest]
fn get_single_column_typed_i64() {
let df = column_frame! {
"x" => [10u32, 20u32, 30u32]
};
let key: Key = "x".into();
let col = df.get_single_column_typed::<i64>(&key).unwrap();
assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
}
#[rstest]
fn get_single_column_typed_string() {
let df = column_frame! {
"name" => ["alice", "bob", "carol"]
};
let key: Key = "name".into();
let col = df.get_single_column_typed::<String>(&key).unwrap();
assert_eq!(
col,
ndarray::arr1(&["alice".to_string(), "bob".to_string(), "carol".to_string()])
);
}
#[rstest]
fn get_single_column_typed_bool() {
let df = column_frame! {
"flag" => [1i32, 0i32, 1i32]
};
let key: Key = "flag".into();
let col = df.get_single_column_typed::<bool>(&key).unwrap();
assert_eq!(col, ndarray::arr1(&[true, false, true]));
}
#[rstest]
fn get_single_column_typed_missing_key_returns_none() {
let df = column_frame! {
"a" => [1, 2, 3]
};
let missing: Key = "nonexistent".into();
assert!(df.get_single_column_typed::<f64>(&missing).is_none());
}
#[rstest]
fn get_single_column_typed_numeric_coercion_from_mixed() {
let df = column_frame! {
"vals" => [1.5f64, 2.7f64, 3.9f64]
};
let key: Key = "vals".into();
let col = df.get_single_column_typed::<i32>(&key).unwrap();
assert_eq!(col, ndarray::arr1(&[1i32, 2i32, 3i32]));
}
#[rstest]
fn get_single_column_typed_selects_correct_column() {
let df = column_frame! {
"x" => [1, 2, 3],
"y" => [10, 20, 30],
"z" => [100, 200, 300]
};
let key: Key = "y".into();
let col = df.get_single_column_typed::<i64>(&key).unwrap();
assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
}
#[rstest]
fn get_single_column_typed_u64_identity() {
let df = column_frame! {
"id" => [100u64, 200u64, 300u64]
};
let key: Key = "id".into();
let col = df.get_single_column_typed::<u64>(&key).unwrap();
assert_eq!(col, ndarray::arr1(&[100u64, 200u64, 300u64]));
}
#[rstest]
fn get_single_column_typed_single_row() {
let df = column_frame! {
"x" => [42i32]
};
let key: Key = "x".into();
let col = df.get_single_column_typed::<f64>(&key).unwrap();
assert_eq!(col, ndarray::arr1(&[42.0f64]));
}
#[rstest]
fn get_single_column_typed_empty_frame() {
let df = ColumnFrame::default();
let key: Key = "x".into();
let col = df.get_single_column_typed::<f64>(&key);
assert!(col.is_none());
}
#[rstest]
fn select_typed_all_columns() {
let df = column_frame! {
"a" => [1i32, 2i32],
"b" => [3i32, 4i32]
};
let result = df.select_typed::<f64>(None);
assert_eq!(result.nrows(), 2);
assert_eq!(result.ncols(), 2);
assert_eq!(result[[0, 0]], 1.0);
assert_eq!(result[[0, 1]], 3.0);
assert_eq!(result[[1, 0]], 2.0);
assert_eq!(result[[1, 1]], 4.0);
}
#[rstest]
fn select_typed_subset_of_columns() {
let df = column_frame! {
"a" => [10u64, 20u64],
"b" => [30u64, 40u64],
"c" => [50u64, 60u64]
};
let keys: Vec<Key> = vec!["a".into(), "c".into()];
let result = df.select_typed::<i64>(Some(&keys));
assert_eq!(result.nrows(), 2);
assert_eq!(result.ncols(), 2);
assert_eq!(result[[0, 0]], 10i64);
assert_eq!(result[[0, 1]], 50i64);
assert_eq!(result[[1, 0]], 20i64);
assert_eq!(result[[1, 1]], 60i64);
}
#[rstest]
fn select_typed_nonexistent_keys_returns_empty() {
let df = column_frame! {
"a" => [1i32, 2i32]
};
let keys: Vec<Key> = vec!["z".into()];
let result = df.select_typed::<f64>(Some(&keys));
assert_eq!(result.shape(), &[0, 0]);
}
#[rstest]
fn select_typed_string_extraction() {
let df = column_frame! {
"name" => ["hello", "world"]
};
let result = df.select_typed::<String>(None);
assert_eq!(result[[0, 0]], "hello");
assert_eq!(result[[1, 0]], "world");
}
#[rstest]
fn select_typed_matches_manual_mapv() {
let df = column_frame! {
"x" => [1i32, 2i32, 3i32],
"y" => [4i32, 5i32, 6i32]
};
let typed = df.select_typed::<f64>(None);
let manual = df.select(None).mapv(|v| f64::extract(&v));
assert_eq!(typed, manual);
}
#[rstest]
#[traced_test]
fn select_vec_view_all_columns() {
let cf = column_frame! {
"a" => [1i32, 2i32, 3i32],
"b" => [4i32, 5i32, 6i32]
};
let cols = cf.select_vec_view(None).expect("should succeed");
assert_eq!(cols.len(), 2, "must return one borrow per column");
let a = cols[0].expect("column 'a' present");
let b = cols[1].expect("column 'b' present");
assert_eq!(a.len(), 3, "each column has 3 rows");
assert_eq!(b.len(), 3);
assert_eq!(
a.to_vec(),
vec![
DataValue::from(1i32),
DataValue::from(2i32),
DataValue::from(3i32),
]
);
assert_eq!(
b.to_vec(),
vec![
DataValue::from(4i32),
DataValue::from(5i32),
DataValue::from(6i32),
]
);
}
#[rstest]
#[traced_test]
fn select_vec_view_subset_in_order() {
let cf = column_frame! {
"a" => [10i32, 20i32],
"b" => [30i32, 40i32],
"c" => [50i32, 60i32]
};
let cols = cf
.select_vec_view(Some(&["c".into(), "a".into()]))
.expect("should succeed");
assert_eq!(cols.len(), 2);
assert_eq!(
cols[0].expect("first col is 'c'").to_vec(),
vec![DataValue::from(50i32), DataValue::from(60i32)],
);
assert_eq!(
cols[1].expect("second col is 'a'").to_vec(),
vec![DataValue::from(10i32), DataValue::from(20i32)],
);
}
#[rstest]
#[traced_test]
fn select_vec_view_single_column() {
let cf = column_frame! {
"x" => [7i32, 8i32, 9i32],
"y" => [1i32, 2i32, 3i32]
};
let cols = cf
.select_vec_view(Some(&["x".into()]))
.expect("should succeed");
assert_eq!(cols.len(), 1);
assert_eq!(
cols[0].expect("column 'x' present").to_vec(),
vec![
DataValue::from(7i32),
DataValue::from(8i32),
DataValue::from(9i32),
]
);
}
#[rstest]
#[traced_test]
fn select_vec_view_unknown_keys_returns_err() {
let cf = column_frame! {
"a" => [1i32, 2i32]
};
let result = cf.select_vec_view(Some(&["nonexistent".into()]));
assert!(
result.is_err(),
"should error on unknown key, got {result:?}"
);
}
#[rstest]
#[traced_test]
fn select_vec_view_empty_keys_returns_err() {
let cf = column_frame! {
"a" => [1i32, 2i32]
};
let result = cf.select_vec_view(Some(&[]));
assert!(result.is_err(), "empty slice should return an error");
}
#[rstest]
#[traced_test]
fn select_vec_view_preserves_nulls() {
let cf = column_frame! {
"v" => [DataValue::Null, DataValue::from(42i32), DataValue::Null]
};
let cols = cf.select_vec_view(None).expect("should succeed");
assert_eq!(cols.len(), 1);
assert_eq!(
cols[0].expect("column 'v' present").to_vec(),
vec![DataValue::Null, DataValue::from(42i32), DataValue::Null]
);
}
#[rstest]
#[traced_test]
fn select_view_all_columns_shape() {
let cf = column_frame! {
"a" => [1i32, 2i32, 3i32],
"b" => [4i32, 5i32, 6i32]
};
let mv = cf.select_view(None).expect("should succeed");
let rv = mv.row_view();
assert_eq!(rv.nrows(), 3, "nrows should be 3");
assert_eq!(rv.ncols(), 2, "ncols should be 2");
}
#[rstest]
#[traced_test]
fn select_view_subset_shape() {
let cf = column_frame! {
"a" => [10i32, 20i32],
"b" => [30i32, 40i32],
"c" => [50i32, 60i32]
};
let mv = cf
.select_view(Some(&["a".into(), "c".into()]))
.expect("should succeed");
let rv = mv.row_view();
assert_eq!(rv.nrows(), 2);
assert_eq!(rv.ncols(), 2);
}
#[rstest]
#[traced_test]
fn select_view_data_matches_select() {
let cf = column_frame! {
"p" => [1i32, 2i32],
"q" => [3i32, 4i32]
};
let keys: &[Key] = &["p".into(), "q".into()];
let mv = cf.select_view(Some(keys)).expect("should succeed");
let view_data = mv.row_view().to_owned();
let select_data = cf.select(Some(keys));
assert_eq!(view_data, select_data);
}
#[rstest]
#[traced_test]
fn select_view_unknown_keys_returns_err() {
let cf = column_frame! {
"a" => [1i32, 2i32]
};
let result = cf.select_view(Some(&["does_not_exist".into()]));
assert!(result.is_err(), "unknown key should return an error");
}
#[rstest]
#[traced_test]
fn select_view_empty_keys_returns_err() {
let cf = column_frame! {
"a" => [1i32, 2i32]
};
let result = cf.select_view(Some(&[]));
assert!(result.is_err(), "empty slice should return an error");
}
#[rstest]
#[traced_test]
fn select_view_single_column() {
let cf = column_frame! {
"only" => [5i32, 6i32, 7i32, 8i32]
};
let mv = cf
.select_view(Some(&["only".into()]))
.expect("should succeed");
let rv = mv.row_view();
assert_eq!(rv.nrows(), 4, "four rows after transposing single column");
assert_eq!(rv.ncols(), 1, "one column");
}
#[test]
fn serde_v1_array2_roundtrip() {
#[derive(Serialize)]
struct WireV1 {
index: KeyIndex,
data_frame: Array2<DataValue>,
}
let index = KeyIndex::new(vec![
Key::new("a", crate::DataType::I32),
Key::new("b", crate::DataType::String),
]);
let data_frame = Array2::from_shape_vec(
(2, 2),
vec![
DataValue::I32(1),
DataValue::String("x".into()),
DataValue::I32(2),
DataValue::String("y".into()),
],
)
.unwrap();
let wire = WireV1 { index, data_frame };
let bytes = rmp_serde::to_vec(&wire).unwrap();
let deserialized: ColumnFrame = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(deserialized.nrows(), 2);
assert_eq!(deserialized.ncolumns(), 2);
assert_eq!(
deserialized.get_column(&"a".into()).unwrap().get(0),
Some(DataValue::I32(1))
);
assert_eq!(
deserialized.get_column(&"a".into()).unwrap().get(1),
Some(DataValue::I32(2))
);
assert_eq!(
deserialized.get_column(&"b".into()).unwrap().get(0),
Some(DataValue::String("x".into()))
);
}
#[test]
fn serde_v2_vec_typed_data_roundtrip() {
#[derive(Serialize)]
struct WireV2 {
index: KeyIndex,
data_frame: Vec<TypedData>,
}
let index = KeyIndex::new(vec![
Key::new("c", crate::DataType::F64),
Key::new("d", crate::DataType::Bool),
]);
let data_frame = vec![
TypedData::from(vec![1.5f64, 2.5]),
TypedData::from(vec![true, false]),
];
let wire = WireV2 { index, data_frame };
let bytes = rmp_serde::to_vec(&wire).unwrap();
let deserialized: ColumnFrame = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(deserialized.nrows(), 2);
assert_eq!(deserialized.ncolumns(), 2);
assert_eq!(
deserialized.get_column(&"c".into()).unwrap().get(0),
Some(DataValue::F64(1.5))
);
assert_eq!(
deserialized.get_column(&"d".into()).unwrap().get(1),
Some(DataValue::Bool(false))
);
}
#[test]
fn serde_v3_current_format_roundtrip() {
let cf = column_frame! {
"x" => [100i32, 200i32, 300i32],
"y" => [true, false, true]
};
let bytes = rmp_serde::to_vec(&cf).unwrap();
let deserialized: ColumnFrame = rmp_serde::from_slice(&bytes).unwrap();
assert_eq!(deserialized.nrows(), 3);
assert_eq!(deserialized.ncolumns(), 2);
let col_x = deserialized.get_column(&"x".into()).unwrap();
assert_eq!(col_x.len(), 3);
let col_y = deserialized.get_column(&"y".into()).unwrap();
assert_eq!(col_y.get(0), Some(DataValue::Bool(true)));
assert_eq!(col_y.get(1), Some(DataValue::Bool(false)));
}
}