use ndarray::{concatenate, Array, Array1, Array2, ArrayView1, Axis};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::error::Error;
use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
use data_value::{DataValue, Extract};
use tracing::*;
mod from;
mod key_index;
mod ops;
pub use key_index::KeyIndex;
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct ColumnFrame {
index: KeyIndex,
data_frame: Array2<DataValue>,
}
enum Continue {
Continue,
End,
}
impl Continue {
pub fn should_end(&self) -> bool {
matches!(self, Self::End)
}
}
use std::fmt;
impl fmt::Display for ColumnFrame {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "\n|")?;
for key in &self.index.keys {
write!(f, " {} |", key)?;
}
if self.index.is_empty() {
writeln!(f, "|")?;
}
if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
write!(f, "\n|")?;
for value in row.iter() {
write!(f, " {:10?} |", crate::detect_dtype(value))?;
}
writeln!(f)?;
}
writeln!(f, "---")?;
for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
write!(f, "|")?;
for value in row.iter() {
write!(f, " {} |", value)?;
}
writeln!(f)?;
if n >= 256 {
writeln!(f, "... (dataframe is too long)")?;
break;
}
}
writeln!(f, "---")
}
}
impl ColumnFrame {
pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
Self {
index: index.into(),
data_frame,
}
}
pub fn keys(&self) -> &[Key] {
self.index.get_keys()
}
pub fn len(&self) -> usize {
self.data_frame.nrows()
}
pub fn is_empty(&self) -> bool {
self.data_frame.nrows() == 0
}
pub fn shrink(&mut self) {
}
pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
self.index.rename_key(old, new)
}
pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
self.index.add_alias(key, alias)
}
pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
let selected = self.select(Some(keys));
let mut result = Vec::with_capacity(selected.nrows());
for row in selected.rows() {
let mut r = Vec::with_capacity(selected.ncols());
for value in row.iter() {
r.push(D::extract(value));
}
result.push(r);
}
result
}
pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let key_indexes = self.index.select(keys);
if key_indexes.is_empty() {
return Ok(Array2::default((0, 0)));
}
let data_vec: Vec<Array1<DataValue>> = key_indexes
.indexes()
.iter()
.map(|x| self.data_frame.column(*x).to_owned())
.collect();
to_array2(data_vec)
}
pub fn select_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
self.index
.get_column_index(key)
.map(|x| self.data_frame.column(x))
}
pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
where
F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
{
func(keys, self)
}
pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
if row_index >= self.data_frame.nrows() {
return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
}
let Some(column_index) = self.index.get_column_index(column) else {
return Err(Error::NotFound(column.clone()));
};
Ok(column_index)
}
pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
trace!(
"Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
self.data_frame.len(),
self.data_frame.nrows()
);
trace!("{:?}", self.data_frame);
match self.validate_entry_access(column, row_index) {
Ok(column_index) => self.data_frame.get((row_index, column_index)),
Err(e) => {
trace!("Error: {e}");
None
}
}
}
pub fn get_mut_by_row_index(
&mut self,
column: &Key,
row_index: usize,
) -> Option<&mut DataValue> {
trace!(
"Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
self.data_frame.len(),
self.data_frame.nrows()
);
trace!("{:?}", self.data_frame);
match self.validate_entry_access(column, row_index) {
Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
Err(e) => {
trace!("Error: {e}");
None
}
}
}
pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() {
return Default::default();
}
let mut new_data_frame = HashMap::with_capacity(keys.len());
for key in keys.iter() {
if let Some(column_index_in_source) = indexes.get_column_index(key) {
let column = self.data_frame.column(column_index_in_source);
new_data_frame.insert(key.clone(), column.to_vec());
}
}
new_data_frame
}
pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
let keys = keys.unwrap_or_else(|| self.index.get_keys());
let indexes = self.index.select(keys);
if indexes.is_empty() {
return Array2::default((0, 0));
}
let mut new_data_frame = Array2::default((self.data_frame.nrows(), keys.len()));
for (idx, key) in keys.iter().enumerate() {
if let Some(column_index_in_source) = indexes.get_column_index(key) {
new_data_frame
.column_mut(idx)
.assign(&self.data_frame.column(column_index_in_source));
}
}
new_data_frame
}
fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
self.index.store_key(key);
let len = self.data_frame.nrows();
self.data_frame.push_column(Array1::default(len).view())?;
Ok(())
}
pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
let mut arr = vec![];
for key in row_candidate.keys() {
if self.index.get_column_index(&key).is_none() {
self.extend_dataframe_for_column(key)?;
}
}
for index in self.index.get_keys() {
if let Some(value) = row_candidate.get_value_ref(index) {
arr.push(value.clone());
} else {
arr.push(DataValue::Null);
}
}
self.data_frame.push_row(Array::from_vec(arr).view())?;
Ok(())
}
pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
let mut indexes = KeyIndex::default();
let data = self.select(Some(keys));
for key in keys {
if let Some((current, _idx)) = self.index.remove_key(key) {
indexes.store_key(current);
}
}
let rest = self.select(None);
let keys = self.index.get_keys().to_vec();
self.data_frame = rest;
self.index = KeyIndex::new(keys);
Ok(Self::new(indexes, data))
}
fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
if self.index.is_empty() {
self.index = other.index.clone();
self.data_frame = other.data_frame.clone();
return Ok(Continue::End);
}
if other.index.is_empty() {
return Ok(Continue::End);
}
Ok(Continue::Continue)
}
fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
for key in other.index.get_keys() {
if self.index.get_column_index(key).is_none() {
self.extend_dataframe_for_column(key.clone())?;
}
}
Ok(())
}
pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
self.index.check_order_of_indexes(&other.index)?;
trace!(
"Extend columns from other {:?} vs {:?}",
other.index.get_keys(),
self.index.get_keys()
);
if other.data_frame.ncols() < self.data_frame.ncols() {
other.extend_columns_from_other(self)?;
} else {
self.extend_columns_from_other(&other)?;
}
self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
Ok(())
}
pub fn replace(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
if self.data_frame.len() > other.data_frame.len() {
return Err(Error::DataSetSizeDoesntMatch(
self.data_frame.len(),
other.data_frame.len(),
));
}
self.index = other.index;
self.data_frame = other.data_frame;
Ok(())
}
pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
if self.check_or_init_frame(&right)?.should_end() {
return Ok(());
}
self.index.is_extendable(&right.index)?;
self.extend_columns_from_other(&right)?;
let index = Index::new(keys.to_vec(), self);
tracing::trace!("Index {index:?}");
let indexes = self.index.select(keys);
let mut new_df = Array2::default((self.len(), self.index.len()));
new_df.assign(&self.data_frame);
trace!("Indexes: {indexes:?}");
trace!("New DF: {new_df:?}");
trace!("Right DF: {:?}", right.data_frame);
trace!("current {:?}", self.data_frame);
for c_row in right.data_frame.rows() {
let row = c_row.to_vec();
let values = indexes
.get_keys()
.iter()
.map(|k| {
row.get(
indexes
.get_column_index(k)
.expect("BUG: Something is very wrong"),
)
.cloned()
.unwrap_or_default()
})
.collect::<Vec<_>>();
if let Some(index) = index.get(&values) {
trace!("Index: {index} {c_row:?} -> {values:?} vs {new_df:?}");
new_df.row_mut(index).assign(&c_row);
}
}
self.data_frame = new_df;
Ok(())
}
pub fn add_single_column<K: Into<Key>>(
&mut self,
key: K,
column: Array1<DataValue>,
) -> Result<(), Error> {
let key = key.into();
if self.index.get_column_index(&key).is_some() {
return Err(Error::ColumnAlreadyExists(key));
}
if self.len() != column.len() && !self.is_empty() {
return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
}
self.index.store_key(key.clone());
let column_index = self
.index
.get_column_index(&key)
.ok_or(Error::UnknownError(format!("Column {} should exists", key)))?;
if self.is_empty() {
self.data_frame = Array2::default((column.len(), 1));
} else {
self.extend_dataframe_for_column(key.clone())?;
}
self.data_frame.column_mut(column_index).assign(&column);
Ok(())
}
pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
self.extend_columns_from_other(&other)?;
for (idx, key) in other.index.get_keys().iter().enumerate() {
if let Some(index) = self.index.get_column_index(key) {
self.data_frame
.column_mut(index)
.assign(&other.data_frame.column(idx));
}
}
Ok(())
}
pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
if other.data_frame.nrows() != 1 {
return Err(Error::CannotBroadcast);
}
self.extend_columns_from_other(&other)?;
let mut new_df = Array2::default((self.len(), self.index.len()));
for (idx, key) in self.index.get_keys().iter().enumerate() {
if let Some(other_idx) = other.index.get_column_index(key) {
new_df
.column_mut(idx)
.assign(&other.data_frame.column(other_idx));
} else {
new_df.column_mut(idx).assign(&self.data_frame.column(idx));
}
}
self.data_frame = new_df;
Ok(())
}
pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
if self.check_or_init_frame(&other)?.should_end() {
return Ok(());
}
let mut new_index = self.index.clone();
for other_key in other.keys() {
if self.index.get_column_index(other_key).is_none() {
new_index.store_key(other_key.clone());
}
}
let mut rows = Vec::with_capacity(other.len());
let max_rows = self.len() * other.len();
for other_row in other.data_frame.rows() {
let other_candidate = other.index.get_as_candidate(other_row);
for row in self.data_frame.rows() {
let mut candidate = self.index.get_as_candidate(row);
candidate.merge(&other_candidate);
let vec_row = new_index.to_vec_row(candidate);
rows.extend(vec_row);
}
}
trace!(
"Rows len: {}, max_rows: {}, new_index_len: {}, current_index len: {}",
rows.len(),
max_rows,
new_index.len(),
self.index.len()
);
self.data_frame = Array2::from_shape_vec((max_rows, new_index.len()), rows)?;
self.index = new_index;
Ok(())
}
pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
use JoinBy::*;
match &join_type.join_type {
AddColumns => self.add_columns(right),
Replace => self.replace(right),
Extend => self.extend(right),
Broadcast => self.broadcast(right),
CartesianProduct => self.cartesian_product(right),
JoinById(join) => self.join_by_id_inner(right, &join.keys),
}
}
pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
self.index
.get_column_index(key)
.map(|x| self.data_frame.column(x))
}
}
pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
let width = source.len();
let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
let height = flattened.len() / width;
Ok(flattened.into_shape_with_order((width, height))?)
}
#[macro_export]
macro_rules! dataframe {
($($everything:tt)*) => {
$crate::DataFrame::new($crate::column_frame!($($everything)*))
};
}
#[macro_export]
macro_rules! column_frame {
($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
($($key:expr => vec![$($value:expr),*]),*) => {
$crate::column_frame!($($key => [$($value),*]),*)
};
($($key:expr => [$($value:expr),*]),*) => {
{
let data = ::ndarray::arr2(&[$(
[$($value.into(),)*],
)*]);
let _keys = vec![$($key.into(),)*];
$crate::ColumnFrame::new(
$crate::KeyIndex::new(_keys),
data.reversed_axes()
)
}
};
($($key:expr => $value:expr),*) => {
{
let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
let _keys = vec![$($key.into(),)*];
$crate::ColumnFrame::new(
$crate::KeyIndex::new(_keys),
_data,
)
}
};
}
#[cfg(test)]
mod test {
use crate::JoinById;
use super::*;
use data_value::stdhashmap;
use ndarray::ArrayView;
use rstest::*;
use tracing_test::traced_test;
#[rstest]
#[traced_test]
fn test_macro() {
let df = column_frame! {
"a" => 1,
"b" => 2,
"c" => 3,
"d" => 4,
};
assert_eq!(df.len(), 1);
assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
.expect("BUG: cannot create array");
assert_eq!(df.select(None), f);
let df = column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
};
assert_eq!(df.len(), 3);
assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
let f = Array2::from_shape_vec(
(3, 3),
vec![
1.into(),
4.into(),
7.into(),
2.into(),
5.into(),
8.into(),
3.into(),
6.into(),
9.into(),
],
)
.expect("BUG: cannot create array");
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(selected, f);
let df1 = dataframe! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
};
let formatted = format!("{}", df);
debug!("{}", formatted);
assert_eq!(df1, crate::DataFrame::from(df));
}
#[rstest]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
column_frame! {
"a_new" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
vec![("a", "a_new".into())]
)]
#[traced_test]
fn rename_test(
#[case] df: ColumnFrame,
#[case] expected: ColumnFrame,
#[case] keys: Vec<Key>,
#[case] renames: Vec<(&str, Key)>,
) {
let mut df = df;
for (old, new) in renames {
df.rename_key(old, new).expect("BUG: cannot rename key");
}
assert_eq!(df, expected);
assert_eq!(df.keys(), keys.as_slice());
}
#[rstest]
#[case(
column_frame! {
"a" => [1, 2, 3],
"b" => [4, 5, 6],
"c" => [7, 8, 9]
},
vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
vec![("a", "a_alias")]
)]
#[traced_test]
fn alias_test(
#[case] df: ColumnFrame,
#[case] keys: Vec<Key>,
#[case] aliases: Vec<(&str, &str)>,
) {
let mut df = df;
for (old, new) in aliases {
df.add_alias(old, new).expect("BUG: cannot rename key");
}
let origin_keys = df.keys().to_vec();
let selected_aliases = df.select(Some(keys.as_slice()));
let selected = df.select(Some(origin_keys.as_slice()));
assert_eq!(selected, selected_aliases);
}
#[rstest]
#[traced_test]
fn dummy_test() {
let data = vec![
DataValue::U32(1),
DataValue::I32(2),
DataValue::I64(3),
DataValue::U64(4),
];
let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
let index = KeyIndex::new(keys.clone());
let mut data_frame = Array2::default((1, keys.len()));
for (idx, entry) in data.iter().enumerate() {
data_frame
.column_mut(idx)
.assign(&ArrayView::from(&[entry.clone()]));
}
let frame = ColumnFrame::new(index, data_frame);
assert_eq!(
frame.get_by_row_index(&"a".into(), 0),
Some(&DataValue::U32(1))
);
assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
assert_eq!(
frame.select(Some(&["a".into(), "b".into()])),
Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
.expect("BUG: cannot create array")
);
}
#[rstest]
#[traced_test]
fn dummy_test_multiple_rows() {
let data = vec![
DataValue::U32(1),
DataValue::I32(2),
DataValue::I64(3),
DataValue::U64(4),
DataValue::U32(12),
DataValue::I32(22),
DataValue::I64(32),
DataValue::U64(42),
];
let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
let index = KeyIndex::new(keys.clone());
let data_frame =
Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
let frame = ColumnFrame::new(index, data_frame);
assert_eq!(
frame.get_by_row_index(&"a".into(), 0),
Some(&DataValue::U32(1))
);
assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
let arr = Array2::from_shape_vec(
(2, 2),
vec![
DataValue::U32(1),
DataValue::I32(2),
DataValue::U32(12),
DataValue::I32(22),
],
)
.expect("BUG: cannot create array");
trace!("{arr:?}");
assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
}
#[rstest]
#[traced_test]
fn dummy_test_multiple_rows_push() {
let data = vec![
DataValue::U32(1),
DataValue::I32(2),
DataValue::I64(3),
DataValue::U64(4),
DataValue::U32(12),
DataValue::I32(22),
DataValue::I64(32),
DataValue::U64(42),
];
let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
let index = KeyIndex::new(keys.clone());
let data_frame =
Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
let mut frame = ColumnFrame::new(index, data_frame);
assert!(frame
.push(data_value::stdhashmap!(
"a" => DataValue::U32(2),
"b" => DataValue::I32(3),
"c" => DataValue::I64(4),
"d" => DataValue::U64(5)
))
.is_ok());
let arr = Array2::from_shape_vec(
(3, 2),
vec![
DataValue::U32(1),
DataValue::I32(2),
DataValue::U32(12),
DataValue::I32(22),
DataValue::U32(2),
DataValue::I32(3),
],
)
.expect("BUG: cannot create array");
trace!("{arr:?}");
assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
let result = frame.push(data_value::stdhashmap!(
"a" => DataValue::U32(34),
"b" => DataValue::I32(44),
"c" => DataValue::I64(54),
"e" => DataValue::F32(6f32)
));
assert!(result.is_ok(), "{result:?}");
let arr = Array2::from_shape_vec(
(4, 2),
vec![
DataValue::U64(4),
DataValue::Null,
DataValue::U64(42),
DataValue::Null,
DataValue::U64(5),
DataValue::Null,
DataValue::Null,
DataValue::F32(6f32),
],
)
.expect("BUG: cannot create array");
trace!("{arr:?}");
assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
}
#[rstest]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, 4]
},
Some(vec![Key::from("group_id")]),
ndarray::array!([1.into()], [2.into()])
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, 4]
},
Some(vec!["group_id".into(), "feed_tag".into()]),
ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, DataValue::Null]
},
Some(vec!["feed_tag".into()]),
ndarray::array![[3.into()], [DataValue::Null]]
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![1, DataValue::Null]
},
Some(vec!["feed_tag2".into()]),
Array2::<DataValue>::default((0, 0))
)]
#[traced_test]
fn test_select(
#[case] input: ColumnFrame,
#[case] keys: Option<Vec<Key>>,
#[case] expected: Array2<DataValue>,
) {
trace!("input={input:?}");
let keys_slice = keys.as_deref();
let selected = input.select(keys_slice);
trace!("selected={selected:?}");
assert_eq!(selected, expected);
let selected = input.select_transposed(keys_slice);
trace!("selected_transposed={selected:?}");
assert!(selected.is_ok());
assert_eq!(selected.unwrap(), expected.t());
}
#[rstest]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![3, 4]
},
Key::from("group_id"),
Some(ndarray::array!(1.into(), 2.into()))
)]
#[case(
column_frame! {
"group_id" => vec![1, 2, 5, 6],
"feed_tag" => vec![3, 4, 7, 8]
},
Key::from("group_id"),
Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
)]
#[case(
column_frame! {
"group_id" => vec![1, 2],
"feed_tag" => vec![1, 1]
},
Key::from("feed_tag1"),
None
)]
#[traced_test]
fn test_select_column(
#[case] input: ColumnFrame,
#[case] key: Key,
#[case] expected: Option<Array1<DataValue>>,
) {
let selected = input.select_column(&key);
trace!("selected={selected:?}");
match expected {
Some(expected) => {
assert!(selected.is_some());
assert_eq!(selected.expect("BUG: checked above"), expected);
}
None => assert!(selected.is_none()),
}
}
#[test]
#[traced_test]
fn join_test() {
let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
"group_id".into(),
"feed_tag".into(),
])));
let mut column_frame = column_frame! {
"group_id" => vec![1, 2, 8],
"feed_tag" => vec![1, 1, 10]
};
let column_frame2 = column_frame! {
"group_id" => vec![2, 1, 3],
"feed_tag" => vec![1, 1, 1],
"clicks" => vec![100, 10, 10],
"imps" => vec![1000, 200, 200]
};
assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
let joined = column_frame.join(column_frame2, &join);
assert!(joined.is_ok(), "{joined:?}");
trace!("{column_frame:?}");
assert_eq!(
column_frame.select(Some(&[
"group_id".into(),
"feed_tag".into(),
"clicks".into(),
"imps".into()
])),
ndarray::array!(
[1.into(), 1.into(), 10.into(), 200.into()],
[2.into(), 1.into(), 100.into(), 1000.into()],
[8.into(), 10.into(), DataValue::Null, DataValue::Null]
)
)
}
#[rstest]
#[traced_test]
fn cartesian_product_join() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"zone_id" => vec![111111, 111133],
"zone_avg_ctr" => vec![0.1, 0.001]
};
assert!(df
.join(
ColumnFrame::default(),
&JoinRelation::new(JoinBy::CartesianProduct)
)
.is_ok());
let join = JoinRelation::new(JoinBy::CartesianProduct);
let result = df.join(df2, &join);
assert!(result.is_ok(), "{result:?}");
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into(), 111111.into(), 0.1.into()],
[2.into(), 2.into(), 111111.into(), 0.1.into()],
[3.into(), 3.into(), 111111.into(), 0.1.into()],
[1.into(), 1.into(), 111133.into(), 0.001.into()],
[2.into(), 2.into(), 111133.into(), 0.001.into()],
[3.into(), 3.into(), 111133.into(), 0.001.into()]
)
)
}
#[rstest]
#[traced_test]
fn broadcast_join() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"zone_id" => vec![111111]
};
assert!(df
.join(
ColumnFrame::default(),
&JoinRelation::new(JoinBy::Broadcast)
)
.is_ok());
let join = JoinRelation::new(JoinBy::Broadcast);
assert!(df.join(df2, &join).is_ok());
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into(), 111111.into()],
[2.into(), 2.into(), 111111.into()],
[3.into(), 3.into(), 111111.into()]
)
);
}
#[rstest]
#[traced_test]
fn merge_test() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"group_id" => vec![11, 21, 31],
"feed_tag" => vec![12, 22, 32]
};
let join = JoinRelation::new(JoinBy::Replace);
assert!(df.join(df2, &join).is_ok());
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[11.into(), 12.into()],
[21.into(), 22.into()],
[31.into(), 32.into()]
)
);
}
#[rstest]
#[traced_test]
fn extend_test() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"group_id" => vec![11, 21, 31],
"feed_tag" => vec![5, 6, 7]
};
assert!(df
.join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
.is_ok());
let join = JoinRelation::new(JoinBy::Extend);
assert!(df.join(df2, &join).is_ok());
let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into()],
[2.into(), 2.into()],
[3.into(), 3.into()],
[5.into(), 11.into()],
[6.into(), 21.into()],
[7.into(), 31.into()]
)
);
let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
trace!("{as_map:?}");
assert_eq!(
as_map,
stdhashmap!(
"feed_tag" => vec![1, 2, 3, 5, 6, 7],
"group_id" => vec![1, 2, 3, 11, 21, 31]
)
);
let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
trace!("{as_map:?}");
assert_eq!(as_map, HashMap::default());
}
#[rstest]
#[traced_test]
fn extend_test_with_non_existing_cols() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let mut df2 = column_frame! {
"group_id" => vec![11, 21, 31],
"feed_tag" => vec![5, 6, 7],
"clicks" => vec![100, 200, 300],
"impressions" => vec![1000, 2000, 3000]
};
let df_bckp = df.clone();
let join = JoinRelation::new(JoinBy::Extend);
assert!(df.join(df2.clone(), &join).is_ok());
let selected = df.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[1.into(), 1.into(), DataValue::Null, DataValue::Null],
[2.into(), 2.into(), DataValue::Null, DataValue::Null],
[3.into(), 3.into(), DataValue::Null, DataValue::Null],
[11.into(), 5.into(), 100.into(), 1000.into()],
[21.into(), 6.into(), 200.into(), 2000.into()],
[31.into(), 7.into(), 300.into(), 3000.into()]
)
);
let join = JoinRelation::new(JoinBy::Extend);
let r = df2.join(df_bckp, &join);
assert!(r.is_ok(), "{r:?}");
let selected = df2.select(None);
trace!("{selected:?}");
assert_eq!(
selected,
ndarray::array!(
[11.into(), 5.into(), 100.into(), 1000.into()],
[21.into(), 6.into(), 200.into(), 2000.into()],
[31.into(), 7.into(), 300.into(), 3000.into()],
[1.into(), 1.into(), DataValue::Null, DataValue::Null],
[2.into(), 2.into(), DataValue::Null, DataValue::Null],
[3.into(), 3.into(), DataValue::Null, DataValue::Null]
)
);
}
#[rstest]
#[traced_test]
fn extend_test_with_non_existing_cols_wrong_order() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"feed_tag" => vec![5, 6, 7],
"group_id" => vec![11, 21, 31]
};
let join = JoinRelation::new(JoinBy::Extend);
let err = df.join(df2, &join);
assert!(err.is_err(), "{err:?}");
}
#[rstest]
#[traced_test]
fn test_replace_not_compatible() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let df2 = column_frame! {
"feed_tag" => vec![5, 6],
"group_id" => vec![11, 21]
};
let join = JoinRelation::new(JoinBy::Replace);
let err = df.join(df2, &join);
assert!(err.is_err(), "{err:?}");
let empty = ColumnFrame::default();
let err = df.join(empty, &join);
assert!(err.is_ok(), "{err:?}");
}
#[rstest]
#[traced_test]
fn serde_column_frame() {
let df = column_frame! {
"group_id" => vec![1u64, 2u64, 3u64],
"feed_tag" => vec![1u64, 2u64, 3u64]
};
let key_idx = df.index.clone();
let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
let deserialized: KeyIndex =
serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
assert_eq!(key_idx, deserialized);
assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
let deserialized: ColumnFrame =
serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
assert_eq!(df, deserialized);
}
#[rstest]
#[traced_test]
fn update_value() {
let mut df = column_frame! {
"group_id" => vec![1, 2, 3],
"feed_tag" => vec![1, 2, 3]
};
let group_id: Key = "group_id".into();
let v = df.get_mut_by_row_index(&group_id, 1);
assert!(v.is_some());
let v = v.unwrap();
assert_eq!(v, &DataValue::I32(2));
*v = DataValue::U64(22);
let v = df.get_by_row_index(&group_id, 1);
assert!(v.is_some());
let v = v.unwrap();
assert_eq!(v, &DataValue::U64(22));
assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
}
}