1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
22pub struct ColumnFrame {
23 pub index: KeyIndex,
24 pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28 Continue,
29 End,
30}
31
32impl Continue {
33 pub fn should_end(&self) -> bool {
34 matches!(self, Self::End)
35 }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "\n|")?;
44
45 for key in &self.index.keys {
46 write!(f, " {key} |")?;
47 }
48
49 if self.index.is_empty() {
50 writeln!(f, "|")?;
51 }
52
53 if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55 write!(f, "\n|")?;
56 for value in row.iter() {
57 write!(f, " {:10?} |", crate::detect_dtype(value))?;
59 }
60 writeln!(f)?;
61 }
62
63 writeln!(f, "---")?;
64
65 for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67 write!(f, "|")?;
68
69 for value in row.iter() {
70 write!(f, " {value} |")?;
72 }
73 writeln!(f)?;
74
75 if n >= 256 {
76 writeln!(f, "... (dataframe is too long)")?;
77 break;
78 }
79 }
80
81 writeln!(f, "---")
82 }
83}
84
85impl ColumnFrame {
86 pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
87 Self {
88 index: index.into(),
89 data_frame,
90 }
91 }
92
93 pub fn keys(&self) -> &[Key] {
94 self.index.get_keys()
95 }
96
97 pub fn len(&self) -> usize {
98 self.data_frame.nrows()
99 }
100
101 pub fn is_empty(&self) -> bool {
102 self.data_frame.nrows() == 0
103 }
104
105 pub fn shrink(&mut self) {
106 }
108
109 pub fn get_mut_view(&mut self) -> ArrayViewMut2<DataValue> {
110 self.data_frame.view_mut()
111 }
112
113 pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
114 self.index.rename_key(old, new)
115 }
116
117 pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
118 self.index.add_alias(key, alias)
119 }
120
121 pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
127 let selected = self.select(Some(keys));
128 let mut result = Vec::with_capacity(selected.nrows());
129 for row in selected.rows() {
130 let mut r = Vec::with_capacity(selected.ncols());
131 for value in row.iter() {
132 r.push(D::extract(value));
133 }
134 result.push(r);
135 }
136 result
137 }
138
139 pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
145 let keys = keys.unwrap_or_else(|| self.index.get_keys());
146 let key_indexes = self.index.select(keys);
147 if key_indexes.is_empty() {
148 return Ok(Array2::default((0, 0)));
149 }
150 let data_vec: Vec<Array1<DataValue>> = key_indexes
151 .indexes()
152 .iter()
153 .map(|x| self.data_frame.column(*x).to_owned())
154 .collect();
155 to_array2(data_vec)
156 }
157
158 pub fn select_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
162 self.index
163 .get_column_index(key)
164 .map(|x| self.data_frame.column(x))
165 }
166
167 pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
168 where
169 F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
170 {
171 func(keys, self)
172 }
173
174 pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
179 if row_index >= self.data_frame.nrows() {
180 return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
181 }
182 let Some(column_index) = self.index.get_column_index(column) else {
183 return Err(Error::NotFound(column.clone()));
184 };
185 Ok(column_index)
186 }
187
188 pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
192 trace!(
193 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
194 self.data_frame.len(),
195 self.data_frame.nrows()
196 );
197 trace!("{:?}", self.data_frame);
198 match self.validate_entry_access(column, row_index) {
199 Ok(column_index) => self.data_frame.get((row_index, column_index)),
200 Err(e) => {
201 trace!("Error: {e}");
202 None
203 }
204 }
205 }
206
207 pub fn get_mut_by_row_index(
211 &mut self,
212 column: &Key,
213 row_index: usize,
214 ) -> Option<&mut DataValue> {
215 trace!(
216 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
217 self.data_frame.len(),
218 self.data_frame.nrows()
219 );
220 trace!("{:?}", self.data_frame);
221 match self.validate_entry_access(column, row_index) {
222 Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
223 Err(e) => {
224 trace!("Error: {e}");
225 None
226 }
227 }
228 }
229
230 pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
235 let keys = keys.unwrap_or_else(|| self.index.get_keys());
236 let indexes = self.index.select(keys);
237 if indexes.is_empty() {
238 return Default::default();
239 }
240
241 let mut new_data_frame = HashMap::with_capacity(keys.len());
242
243 for key in keys.iter() {
244 if let Some(column_index_in_source) = indexes.get_column_index(key) {
245 let column = self.data_frame.column(column_index_in_source);
246 new_data_frame.insert(key.clone(), column.to_vec());
247 }
248 }
249
250 new_data_frame
251 }
252
253 pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
258 let keys = keys.unwrap_or_else(|| self.index.get_keys());
259 let indexes = self.index.select(keys);
260 if indexes.is_empty() {
261 return Array2::default((0, 0));
262 }
263 let mut new_data_frame = Array2::default((self.data_frame.nrows(), keys.len()));
264
265 for (idx, key) in keys.iter().enumerate() {
266 if let Some(column_index_in_source) = indexes.get_column_index(key) {
267 new_data_frame
268 .column_mut(idx)
269 .assign(&self.data_frame.column(column_index_in_source));
270 }
271 }
272
273 new_data_frame
274 }
275
276 fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
277 self.index.store_key(key);
278 let len = self.data_frame.nrows();
279 self.data_frame.push_column(Array1::default(len).view())?;
280 Ok(())
281 }
282
283 pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
287 let mut arr = vec![];
288 for key in row_candidate.keys() {
289 if self.index.get_column_index(&key).is_none() {
290 self.extend_dataframe_for_column(key)?;
291 }
292 }
293 for index in self.index.get_keys() {
294 if let Some(value) = row_candidate.get_value_ref(index) {
295 arr.push(value.clone());
296 } else {
297 arr.push(DataValue::Null);
298 }
299 }
300 self.data_frame.push_row(Array::from_vec(arr).view())?;
301 Ok(())
302 }
303
304 pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
305 let mut indexes = KeyIndex::default();
307 let data = self.select(Some(keys));
309 for key in keys {
311 if let Some((current, _idx)) = self.index.remove_key(key) {
312 indexes.store_key(current);
313 }
314 }
315 let rest = self.select(None);
317 let keys = self.index.get_keys().to_vec();
318 self.data_frame = rest;
319 self.index = KeyIndex::new(keys);
320
321 Ok(Self::new(indexes, data))
323 }
324
325 fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
326 if self.index.is_empty() {
327 self.index = other.index.clone();
328 self.data_frame = other.data_frame.clone();
329 return Ok(Continue::End);
330 }
331 if other.index.is_empty() {
332 return Ok(Continue::End);
333 }
334 if self.is_empty() {
335 self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
336 }
337
338 Ok(Continue::Continue)
339 }
340
341 fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
342 for key in other.index.get_keys() {
343 if self.index.get_column_index(key).is_none() {
344 self.extend_dataframe_for_column(key.clone())?;
345 }
346 }
347 Ok(())
348 }
349
350 fn try_extend(&mut self, other: Self) -> Result<(), Error> {
351 let mut joined_keys = self.index.clone();
352 for key in other.keys() {
354 if self.index.get_column_index(key).is_none() {
355 joined_keys.store_key(key.clone());
356 }
357 }
358
359 let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
360 let mut arr = Array2::default((sum_len, joined_keys.len()));
361 let increment = self.data_frame.nrows();
362 for key in joined_keys.get_keys() {
363 let index_result = joined_keys
364 .get_column_index(key)
365 .expect("BUG: index for this has to be defined");
366 if let Some(index) = self.index.get_column_index(key) {
367 for (idx, value) in self.data_frame.column(index).iter().enumerate() {
368 if let Some(x) = arr.get_mut((idx, index_result)) {
369 *x = value.to_owned();
370 }
371 }
372 }
373
374 if let Some(index) = other.index.get_column_index(key) {
375 for (idx, value) in other.data_frame.column(index).iter().enumerate() {
376 if let Some(x) = arr.get_mut((increment + idx, index_result)) {
377 *x = value.to_owned();
378 }
379 }
380 }
381 }
382 *self = ColumnFrame::new(joined_keys, arr);
383 Ok(())
384 }
385
386 pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
394 if self.check_or_init_frame(&other)?.should_end() {
395 return Ok(());
396 }
397
398 if self.index.check_order_of_indexes(&other.index).is_err() {
399 return self.try_extend(other);
400 }
401
402 trace!(
403 "Extend columns from other {:?} vs {:?}",
404 other.index.get_keys(),
405 self.index.get_keys()
406 );
407
408 if other.data_frame.ncols() < self.data_frame.ncols() {
409 other.extend_columns_from_other(self)?;
410 } else {
411 self.extend_columns_from_other(&other)?;
412 }
413 self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
414
415 Ok(())
416 }
417
418 pub fn replace(&mut self, other: Self) -> Result<(), Error> {
424 if self.check_or_init_frame(&other)?.should_end() {
425 return Ok(());
426 }
427
428 if self.data_frame.len() > other.data_frame.len() {
429 return Err(Error::DataSetSizeDoesntMatch(
430 self.data_frame.len(),
431 other.data_frame.len(),
432 ));
433 }
434 self.index = other.index;
435 self.data_frame = other.data_frame;
436
437 Ok(())
438 }
439
440 pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
443 if self.check_or_init_frame(&right)?.should_end() {
444 return Ok(());
445 }
446 let new_columnns = right.index.get_complement_keys(self.index.get_keys());
447 self.extend_columns_from_other(&right)?;
449 let index = Index::new(keys.to_vec(), self);
451 tracing::trace!("Index {index:?}");
452 let right_indexes = right.index.select(keys);
453
454 let mut new_df = Array2::default((self.len(), self.index.len()));
455 new_df.assign(&self.data_frame);
456 trace!("Indexes: {right_indexes:?}");
457 trace!("New DF: {new_df:?}");
458 trace!("Right DF: {:?}", right.data_frame);
459 trace!("current {:?}", self.data_frame);
460 for c_row in right.data_frame.rows() {
464 let values = right_indexes
465 .get_keys()
466 .iter()
467 .map(|k| {
468 c_row
469 .get(
470 right_indexes
471 .get_column_index(k)
472 .expect("BUG: Something is very wrong"),
473 )
474 .cloned()
475 .unwrap_or_default()
476 })
477 .collect::<Vec<_>>();
478 if let Some(row_index) = index.get(&values) {
479 trace!("Index: {row_index} {c_row:?} -> {values:?} vs {new_df:?}");
480 for complement_key in new_columnns.iter() {
481 let column_index = self
482 .index
483 .get_column_index(complement_key)
484 .expect("BUG: Something is very wrong");
485 if let Some(right_column_index) = right.index.get_column_index(complement_key) {
486 trace!("Filling Index: [{complement_key:?}] ri:{row_index} rci:{right_column_index:?} -> {:?} vs {:?}", c_row[right_column_index], new_df.get_mut((row_index, column_index)));
487 if let Some(v) = new_df.get_mut((row_index, column_index)) {
488 *v = c_row[right_column_index].to_owned();
489 }
490 }
491 }
492 }
493 }
494
495 self.data_frame = new_df;
496
497 Ok(())
498 }
499
500 pub fn add_single_column<K: Into<Key>>(
504 &mut self,
505 key: K,
506 column: Array1<DataValue>,
507 ) -> Result<(), Error> {
508 let key = key.into();
509 if self.index.get_column_index(&key).is_some() {
510 return Err(Error::ColumnAlreadyExists(key));
511 }
512 if self.len() != column.len() && !self.is_empty() {
513 return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
514 }
515
516 self.index.store_key(key.clone());
517 let rows = column.len();
518 let column_index = self
519 .index
520 .get_column_index(&key)
521 .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
522 if self.is_empty() && self.index.len() == 1 {
523 self.data_frame = column.into_shape_clone((rows, 1))?;
524 assert_eq!(self.data_frame.column(column_index).len(), rows);
525 } else if self.is_empty() {
526 self.data_frame = Array2::default((column.len(), self.index.len() - 1));
527 self.data_frame.push_column(column.view())?;
528 assert_eq!(self.data_frame.column(column_index).len(), rows);
529 } else {
530 self.data_frame.push_column(column.view())?;
531 }
532 assert_eq!(self.data_frame.column(column_index).len(), rows);
533
534 Ok(())
535 }
536 pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
540 if self.check_or_init_frame(&other)?.should_end() {
541 return Ok(());
542 }
543
544 self.extend_columns_from_other(&other)?;
545 for (idx, key) in other.index.get_keys().iter().enumerate() {
546 if let Some(index) = self.index.get_column_index(key) {
547 let arr = other.data_frame.column(idx);
548 trace!(
549 "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
550 self.data_frame.nrows(),
551 arr.len()
552 );
553 if arr.len() != self.data_frame.nrows() {
554 self.data_frame.column_mut(index).fill(DataValue::Null);
555 } else {
556 self.data_frame.column_mut(index).assign(&arr);
557 }
558 }
559 }
560 Ok(())
561 }
562
563 pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
568 if self.check_or_init_frame(&other)?.should_end() {
569 return Ok(());
570 }
571 if other.data_frame.nrows() != 1 {
572 return Err(Error::CannotBroadcast);
573 }
574 self.extend_columns_from_other(&other)?;
575 let mut new_df = Array2::default((self.len(), self.index.len()));
576 for (idx, key) in self.index.get_keys().iter().enumerate() {
577 if let Some(other_idx) = other.index.get_column_index(key) {
578 new_df
579 .column_mut(idx)
580 .assign(&other.data_frame.column(other_idx));
581 } else {
582 new_df.column_mut(idx).assign(&self.data_frame.column(idx));
583 }
584 }
585 self.data_frame = new_df;
586 Ok(())
587 }
588
589 pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
594 if self.check_or_init_frame(&other)?.should_end() {
595 return Ok(());
596 }
597 for other_key in other.keys() {
600 if self.index.get_column_index(other_key).is_none() {
601 self.index.store_key(other_key.clone());
602 } else {
603 self.index.store_key(Key::new(
604 format!("{}-{}", other_key, other_key.id()).as_str(),
605 other_key.ctype,
606 ));
607 }
608 }
609 let max_rows = self.len() * other.len();
610 let ncols = self.index.len();
611 let mut new_df = Array2::default((max_rows, ncols));
613
614 let mut cur_idx = 0;
615 for cur_row in self.data_frame.rows() {
616 for other_row in other.data_frame.rows() {
617 new_df
618 .slice_mut(s![cur_idx, ..])
619 .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
620 cur_idx += 1;
621 }
622 }
623 self.data_frame = new_df;
624 Ok(())
625 }
626
627 pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
635 use JoinBy::*;
636 match &join_type.join_type {
637 AddColumns => self.add_columns(right),
638 Replace => self.replace(right),
639 Extend => self.extend(right),
640 Broadcast => self.broadcast(right),
641 CartesianProduct => self.cartesian_product(right),
642 JoinById(join) => self.join_by_id_inner(right, &join.keys),
643 }
644 }
645
646 pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
647 self.index
648 .get_column_index(key)
649 .map(|x| self.data_frame.column(x))
650 }
651
652 pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
653 let index = self
654 .index
655 .get_column_index(key)
656 .ok_or(Error::NotFound(key.clone()))?;
657 let column = self.data_frame.column(index);
658 let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
659 tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
660 let middle = data_with_index.len() / 2;
661 data_with_index.select_nth_unstable_by(middle, |(a_idx, a_val), (b_idx, b_val)| {
662 a_val
663 .partial_cmp(b_val)
664 .unwrap_or(std::cmp::Ordering::Equal)
665 .then_with(|| a_idx.cmp(b_idx))
666 });
667 tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
668 let indicies = data_with_index
669 .into_iter()
670 .map(|(idx, _)| idx)
671 .collect::<Vec<_>>();
672
673 Ok(sorted_df::SortedDataFrame::new(self, indicies))
674 }
675
676 pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
677 let mut final_indices = Vec::new();
678 for rule in &filter.rules {
679 final_indices.extend(filter_df::filter_combination(self, rule)?);
680 }
681
682 final_indices.sort_unstable();
683 final_indices.dedup();
684 let mut new_df = ColumnFrame::new(
685 self.index.clone(),
686 Array2::default((final_indices.len(), self.index.len())),
687 );
688 final_indices
689 .iter()
690 .enumerate()
691 .for_each(|(cur_idx, row_idx)| {
692 new_df
693 .data_frame
694 .slice_mut(s![cur_idx, ..])
695 .assign(&self.data_frame.slice(s![*row_idx, ..]));
696 });
697
698 Ok(new_df)
699 }
700}
701
702pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
703 let width = source.len();
704 let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
705 let height = flattened.len() / width;
706 Ok(flattened.into_shape_with_order((width, height))?)
707}
708#[macro_export]
709macro_rules! df {
710 ($($everything:tt)*) => {
711 $crate::DataFrame::new($crate::column_frame!($($everything)*))
712 };
713}
714
715#[macro_export]
716macro_rules! column_frame {
717 ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
719 ($($key:expr => vec![$($value:expr),*]),*) => {
721 $crate::column_frame!($($key => [$($value),*]),*)
722 };
723 ($($key:expr => [$($value:expr),*]),*) => {
725 {
726 let data = ::ndarray::arr2(&[$(
727 [$($value.into(),)*],
728 )*]);
729
730 let _keys = vec![$($key.into(),)*];
731
732 $crate::ColumnFrame::new(
733 $crate::KeyIndex::new(_keys),
734 data.reversed_axes()
735 )
736 }
737 };
738 ($($key:expr => $value:expr),*) => {
740 {
741 let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
742 let _keys = vec![$($key.into(),)*];
743
744 $crate::ColumnFrame::new(
745 $crate::KeyIndex::new(_keys),
746 _data,
747 )
748 }
749 };
750}
751
752#[cfg(test)]
753mod test {
754 use crate::{filter::FilterRules, JoinById};
755
756 use super::*;
757 use data_value::stdhashmap;
758 use ndarray::ArrayView;
759 use rstest::*;
760 use tracing_test::traced_test;
761
762 #[rstest]
763 #[case(
764 column_frame! {
765 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
766 "b" => [4, 5, 6],
767 "c" => [7, 8, 9]
768 },
769 column_frame! {
770 "t" => [1752001987000000u64],
771 "b" => [5],
772 "c" => [8]
773 },
774 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
775 )]
776 #[case(
777 column_frame! {
778 "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
779 "b" => [4, 5, 6],
780 "c" => [7, 8, 9]
781 },
782 column_frame! {
783 "t" => [1752001987000000f64],
784 "b" => [5],
785 "c" => [8]
786 },
787 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
788 )]
789 #[case(
790 column_frame! {
791 "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
792 "b" => [4, 5, 6],
793 "c" => [7, 8, 9]
794 },
795 column_frame! {
796 "t" => [1752001987000000i64],
797 "b" => [5],
798 "c" => [8]
799 },
800 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
801 )]
802 #[case(
803 column_frame! {
804 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
805 "b" => [4, 5, 6],
806 "c" => [7, 8, 9]
807 },
808 column_frame! {
809 "t" => [1751001987000000u64],
810 "b" => [4],
811 "c" => [7]
812 },
813 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
814 )]
815 #[case(
816 column_frame! {
817 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
818 "b" => [4, 5, 6],
819 "c" => [7, 8, 9]
820 },
821 column_frame! {
822 "t" => ["2025-07-08 18:13:07"],
823 "b" => [4],
824 "c" => [7]
825 },
826 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
827 )]
828 #[case(
829 column_frame! {
830 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
831 "b" => [4, 5, 6],
832 "c" => [7, 8, 9]
833 },
834 column_frame! {
835 "t" => [],
836 "b" => [],
837 "c" => []
838 },
839 FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
840 )]
841 #[case(
842 column_frame! {
843 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
844 "b" => [4, 5, 6],
845 "c" => [7, 8, 9]
846 },
847 column_frame! {
848 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
849 "b" => [4, 5, 6],
850 "c" => [7, 8, 9]
851 },
852 FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
853 )]
854 #[case(
855 column_frame! {
856 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
857 "b" => [4, 5, 6],
858 "c" => [7, 8, 9]
859 },
860 column_frame! {
861 "t" => [DataValue::Vec(vec![])],
862 "b" => [5],
863 "c" => [ 8]
864 },
865 FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
866 )]
867 #[case(
868 column_frame! {
869 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
870 "b" => [4, 5, 6],
871 "c" => [7, 8, 9]
872 },
873 column_frame! {
874 "t" => [DataValue::Vec(vec![1.into()])],
875 "b" => [6],
876 "c" => [9]
877 },
878 FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
879 )]
880 #[case(
881 column_frame! {
882 "a" => [1, 2, 3],
883 "b" => [4, 5, 6],
884 "c" => [7, 8, 9]
885 },
886 column_frame! {
887 "a" => [1, 2],
888 "b" => [4, 5],
889 "c" => [7, 8]
890 },
891 FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
892 )]
893 #[case(
894 column_frame! {
895 "a" => [1, 2, 3],
896 "b" => [4, 5, 6],
897 "c" => [7, 8, 9]
898 },
899 column_frame! {
900 "a" => [2],
901 "b" => [5],
902 "c" => [8]
903 },
904 FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
905 )]
906 #[case(
907 column_frame! {
908 "a" => [1, 2, 3],
909 "b" => [4, 5, 6],
910 "c" => [7, 8, 9]
911 },
912 column_frame! {
913 "a" => [],
914 "b" => [],
915 "c" => []
916 },
917 FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
918 )]
919 #[case(
920 column_frame! {
921 "a" => [1, 2, 3],
922 "b" => [4, 5, 6],
923 "c" => [7, 8, 9]
924 },
925 column_frame! {
926 "a" => [1, 2],
927 "b" => [4, 5],
928 "c" => [7, 8]
929 },
930 FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
931 )]
932 #[case(
933 column_frame! {
934 "a" => [1, 2, 3],
935 "b" => [4, 5, 6],
936 "c" => [7, 8, 9]
937 },
938 column_frame! {
939 "a" => [2],
940 "b" => [5],
941 "c" => [8]
942 },
943 FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
944 )]
945 #[case(
946 column_frame! {
947 "a" => ["abcd", "ab", "abcdefg"],
948 "b" => [4, 5, 6],
949 "c" => [7, 8, 9]
950 },
951 column_frame! {
952 "a" => ["abcd","abcdefg"],
953 "b" => [4, 6],
954 "c" => [7, 9]
955 },
956 FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
957 )]
958 #[case(
959 column_frame! {
960 "a" => [1, 2, 3],
961 "b" => [4, 5, 6],
962 "c" => [7, 8, 9]
963 },
964 column_frame! {
965 "a" => [1],
966 "b" => [4],
967 "c" => [7]
968 },
969 FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
970 )]
971 #[case(
972 column_frame! {
973 "a" => [1, 2, 3],
974 "b" => [4, 5, 6],
975 "c" => [7, 8, 9]
976 },
977 column_frame! {
978 "a" => [2, 3],
979 "b" => [5, 6],
980 "c" => [8, 9]
981 },
982 FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
983 )]
984 #[case(
985 column_frame! {
986 "a" => [1f64, 2f64, 3f64],
987 "b" => [4, 5, 6],
988 "c" => [7, 8, 9]
989 },
990 column_frame! {
991 "a" => [1f64, 2f64],
992 "b" => [4, 5],
993 "c" => [7, 8]
994 },
995 FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
996 )]
997 #[case(
998 column_frame! {
999 "a" => [1f64, 2f64, 3f64],
1000 "b" => [4i64, 5i64, 6i64],
1001 "c" => [7i64, 8i64, 9i64]
1002 },
1003 column_frame! {
1004 "a" => [1f64, 2f64],
1005 "b" => [4i64, 5i64],
1006 "c" => [7i64, 8i64]
1007 },
1008 FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1009 )]
1010 #[traced_test]
1011 fn filter_test(
1012 #[case] df: ColumnFrame,
1013 #[case] expected: ColumnFrame,
1014 #[case] filter: FilterRules,
1015 ) {
1016 let filtered = df.filter(&filter).expect("BUG: cannot filter");
1017 assert_eq!(filtered, expected);
1018 }
1019
1020 #[rstest]
1021 #[traced_test]
1022 fn test_macro() {
1023 let df = column_frame! {
1024 "a" => 1,
1025 "b" => 2,
1026 "c" => 3,
1027 "d" => 4,
1028 };
1029
1030 assert_eq!(df.len(), 1);
1031 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1032 let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1033 .expect("BUG: cannot create array");
1034 assert_eq!(df.select(None), f);
1035
1036 let df = column_frame! {
1037 "a" => [1, 2, 3],
1038 "b" => [4, 5, 6],
1039 "c" => [7, 8, 9]
1040 };
1041
1042 assert_eq!(df.len(), 3);
1043 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1044 let f = Array2::from_shape_vec(
1045 (3, 3),
1046 vec![
1047 1.into(),
1048 4.into(),
1049 7.into(),
1050 2.into(),
1051 5.into(),
1052 8.into(),
1053 3.into(),
1054 6.into(),
1055 9.into(),
1056 ],
1057 )
1058 .expect("BUG: cannot create array");
1059 let selected = df.select(None);
1060 trace!("{selected:?}");
1061 assert_eq!(selected, f);
1062
1063 let df1 = df! {
1064 "a" => [1, 2, 3],
1065 "b" => [4, 5, 6],
1066 "c" => [7, 8, 9]
1067 };
1068
1069 let formatted = format!("{}", df);
1071 debug!("{}", formatted);
1072
1073 assert_eq!(df1, crate::DataFrame::from(df));
1074 }
1075
1076 #[rstest]
1077 #[case(
1078 column_frame! {
1079 "a" => [1, 2, 3],
1080 "b" => [4, 5, 6],
1081 "c" => [7, 8, 9]
1082 },
1083 column_frame! {
1084 "a_new" => [1, 2, 3],
1085 "b" => [4, 5, 6],
1086 "c" => [7, 8, 9]
1087 },
1088 vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1089 vec![("a", "a_new".into())]
1090 )]
1091 #[traced_test]
1092 fn rename_test(
1093 #[case] df: ColumnFrame,
1094 #[case] expected: ColumnFrame,
1095 #[case] keys: Vec<Key>,
1096 #[case] renames: Vec<(&str, Key)>,
1097 ) {
1098 let mut df = df;
1099 for (old, new) in renames {
1100 df.rename_key(old, new).expect("BUG: cannot rename key");
1101 }
1102 assert_eq!(df, expected);
1103 assert_eq!(df.keys(), keys.as_slice());
1104 }
1105
1106 #[rstest]
1107 #[case(
1108 column_frame! {
1109 "a" => [1, 2, 3],
1110 "b" => [4, 5, 6],
1111 "c" => [7, 8, 9]
1112 },
1113 vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1114 vec![("a", "a_alias")]
1115 )]
1116 #[traced_test]
1117 fn alias_test(
1118 #[case] df: ColumnFrame,
1119 #[case] keys: Vec<Key>,
1120 #[case] aliases: Vec<(&str, &str)>,
1121 ) {
1122 let mut df = df;
1123 for (old, new) in aliases {
1124 df.add_alias(old, new).expect("BUG: cannot rename key");
1125 }
1126 let origin_keys = df.keys().to_vec();
1127 let selected_aliases = df.select(Some(keys.as_slice()));
1128 let selected = df.select(Some(origin_keys.as_slice()));
1129 assert_eq!(selected, selected_aliases);
1130 }
1131
1132 #[rstest]
1133 #[traced_test]
1134 fn test_mut_view() {
1135 let data = vec![
1136 DataValue::from(1f64),
1137 DataValue::from(4f32),
1138 DataValue::from(2f64),
1139 DataValue::from(f32::NAN),
1140 DataValue::from(f64::NAN),
1141 DataValue::from(f32::INFINITY),
1142 ];
1143 let keys: Vec<Key> = vec!["a".into(), "b".into()];
1144
1145 let index = KeyIndex::new(keys.clone());
1146 let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1147 let mut df = ColumnFrame::new(index.clone(), df);
1148 df.get_mut_view().mapv_inplace(|x| match x {
1149 DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1150 DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1151 e => e,
1152 });
1153 let data = vec![
1154 DataValue::from(1f64),
1155 DataValue::from(4f32),
1156 DataValue::from(2f64),
1157 DataValue::from(0f32),
1158 DataValue::from(0f64),
1159 DataValue::from(0f32),
1160 ];
1161 let expected = ColumnFrame::new(
1162 index,
1163 Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1164 );
1165 assert_eq!(df, expected);
1166 }
1167
1168 #[rstest]
1169 #[traced_test]
1170 fn dummy_test() {
1171 let data = vec![
1172 DataValue::U32(1),
1173 DataValue::I32(2),
1174 DataValue::I64(3),
1175 DataValue::U64(4),
1176 ];
1177
1178 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1179
1180 let index = KeyIndex::new(keys.clone());
1181 let mut data_frame = Array2::default((1, keys.len()));
1182 for (idx, entry) in data.iter().enumerate() {
1183 data_frame
1184 .column_mut(idx)
1185 .assign(&ArrayView::from(&[entry.clone()]));
1186 }
1187
1188 let frame = ColumnFrame::new(index, data_frame);
1189 assert_eq!(
1190 frame.get_by_row_index(&"a".into(), 0),
1191 Some(&DataValue::U32(1))
1192 );
1193 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1194 assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1195 assert_eq!(
1196 frame.select(Some(&["a".into(), "b".into()])),
1197 Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1198 .expect("BUG: cannot create array")
1199 );
1200 }
1201
1202 #[rstest]
1203 #[traced_test]
1204 fn dummy_test_multiple_rows() {
1205 let data = vec![
1206 DataValue::U32(1),
1207 DataValue::I32(2),
1208 DataValue::I64(3),
1209 DataValue::U64(4),
1210 DataValue::U32(12),
1211 DataValue::I32(22),
1212 DataValue::I64(32),
1213 DataValue::U64(42),
1214 ];
1215
1216 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1217
1218 let index = KeyIndex::new(keys.clone());
1219 let data_frame =
1220 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1221
1222 let frame = ColumnFrame::new(index, data_frame);
1223 assert_eq!(
1224 frame.get_by_row_index(&"a".into(), 0),
1225 Some(&DataValue::U32(1))
1226 );
1227 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1228 assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1229 let arr = Array2::from_shape_vec(
1230 (2, 2),
1231 vec![
1232 DataValue::U32(1),
1233 DataValue::I32(2),
1234 DataValue::U32(12),
1235 DataValue::I32(22),
1236 ],
1237 )
1238 .expect("BUG: cannot create array");
1239 trace!("{arr:?}");
1240 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1241 }
1242
1243 #[rstest]
1244 #[traced_test]
1245 fn dummy_test_multiple_rows_push() {
1246 let data = vec![
1247 DataValue::U32(1),
1248 DataValue::I32(2),
1249 DataValue::I64(3),
1250 DataValue::U64(4),
1251 DataValue::U32(12),
1252 DataValue::I32(22),
1253 DataValue::I64(32),
1254 DataValue::U64(42),
1255 ];
1256 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1257
1258 let index = KeyIndex::new(keys.clone());
1259 let data_frame =
1260 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1261
1262 let mut frame = ColumnFrame::new(index, data_frame);
1263 assert!(frame
1264 .push(data_value::stdhashmap!(
1265 "a" => DataValue::U32(2),
1266 "b" => DataValue::I32(3),
1267 "c" => DataValue::I64(4),
1268 "d" => DataValue::U64(5)
1269 ))
1270 .is_ok());
1271 let arr = Array2::from_shape_vec(
1272 (3, 2),
1273 vec![
1274 DataValue::U32(1),
1275 DataValue::I32(2),
1276 DataValue::U32(12),
1277 DataValue::I32(22),
1278 DataValue::U32(2),
1279 DataValue::I32(3),
1280 ],
1281 )
1282 .expect("BUG: cannot create array");
1283 trace!("{arr:?}");
1284 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1285 let result = frame.push(data_value::stdhashmap!(
1286 "a" => DataValue::U32(34),
1287 "b" => DataValue::I32(44),
1288 "c" => DataValue::I64(54),
1289 "e" => DataValue::F32(6f32)
1290 ));
1291 assert!(result.is_ok(), "{result:?}");
1292 let arr = Array2::from_shape_vec(
1293 (4, 2),
1294 vec![
1295 DataValue::U64(4),
1296 DataValue::Null,
1297 DataValue::U64(42),
1298 DataValue::Null,
1299 DataValue::U64(5),
1300 DataValue::Null,
1301 DataValue::Null,
1302 DataValue::F32(6f32),
1303 ],
1304 )
1305 .expect("BUG: cannot create array");
1306 trace!("{arr:?}");
1307 assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1308 }
1309
1310 #[rstest]
1311 #[case(
1312 column_frame! {
1313 "group_id" => vec![1, 2],
1314 "feed_tag" => vec![3, 4]
1315 },
1316 Some(vec![Key::from("group_id")]),
1317 ndarray::array!([1.into()], [2.into()])
1318 )]
1319 #[case(
1320 column_frame! {
1321 "group_id" => vec![1, 2],
1322 "feed_tag" => vec![3, 4]
1323 },
1324 Some(vec!["group_id".into(), "feed_tag".into()]),
1325 ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1326 )]
1327 #[case(
1328 column_frame! {
1329 "group_id" => vec![1, 2],
1330 "feed_tag" => vec![3, DataValue::Null]
1331 },
1332 Some(vec!["feed_tag".into()]),
1333 ndarray::array![[3.into()], [DataValue::Null]]
1334 )]
1335 #[case(
1336 column_frame! {
1337 "group_id" => vec![1, 2],
1338 "feed_tag" => vec![1, DataValue::Null]
1339 },
1340 Some(vec!["feed_tag2".into()]),
1341 Array2::<DataValue>::default((0, 0))
1342 )]
1343 #[traced_test]
1344 fn test_select(
1345 #[case] input: ColumnFrame,
1346 #[case] keys: Option<Vec<Key>>,
1347 #[case] expected: Array2<DataValue>,
1348 ) {
1349 trace!("input={input:?}");
1350 let keys_slice = keys.as_deref();
1351 let selected = input.select(keys_slice);
1352 trace!("selected={selected:?}");
1353 assert_eq!(selected, expected);
1354 let selected = input.select_transposed(keys_slice);
1355 trace!("selected_transposed={selected:?}");
1356 assert!(selected.is_ok());
1357 assert_eq!(selected.unwrap(), expected.t());
1358 }
1359
1360 #[rstest]
1361 #[case(
1362 column_frame! {
1363 "group_id" => vec![1, 2],
1364 "feed_tag" => vec![3, 4]
1365 },
1366 Key::from("group_id"),
1367 Some(ndarray::array!(1.into(), 2.into()))
1368 )]
1369 #[case(
1370 column_frame! {
1371 "group_id" => vec![1, 2, 5, 6],
1372 "feed_tag" => vec![3, 4, 7, 8]
1373 },
1374 Key::from("group_id"),
1375 Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1376 )]
1377 #[case(
1378 column_frame! {
1379 "group_id" => vec![1, 2],
1380 "feed_tag" => vec![1, 1]
1381 },
1382 Key::from("feed_tag1"),
1383 None
1384 )]
1385 #[traced_test]
1386 fn test_select_column(
1387 #[case] input: ColumnFrame,
1388 #[case] key: Key,
1389 #[case] expected: Option<Array1<DataValue>>,
1390 ) {
1391 let selected = input.select_column(&key);
1392 trace!("selected={selected:?}");
1393 match expected {
1394 Some(expected) => {
1395 assert!(selected.is_some());
1396 assert_eq!(selected.expect("BUG: checked above"), expected);
1397 }
1398 None => assert!(selected.is_none()),
1399 }
1400 }
1401
1402 #[test]
1403 #[traced_test]
1404 fn empty_join_test() {
1405 let join = JoinRelation::add_columns();
1406 let mut column_frame = ColumnFrame::default();
1407 column_frame
1408 .add_single_column("group_id", Array1::from_vec(vec![]))
1409 .expect("BUG: cannot add column");
1410 let column_frame2 = column_frame! {
1411 "group_id" => vec![2, 1, 3],
1412 "feed_tag" => vec![1, 1, 1],
1413 "clicks" => vec![100, 10, 10],
1414 "imps" => vec![1000, 200, 200]
1415 };
1416 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1417
1418 let joined = column_frame.join(column_frame2, &join);
1419 assert!(joined.is_ok(), "{joined:?}");
1420
1421 trace!("{column_frame:?}");
1422 assert_eq!(
1423 column_frame.select(Some(&[
1424 "group_id".into(),
1425 "feed_tag".into(),
1426 "clicks".into(),
1427 "imps".into()
1428 ])),
1429 ndarray::array!(
1430 [2.into(), 1.into(), 100.into(), 1000.into()],
1431 [1.into(), 1.into(), 10.into(), 200.into()],
1432 [3.into(), 1.into(), 10.into(), 200.into()],
1433 )
1434 );
1435
1436 let mut column_frame2 = column_frame! {
1437 "feed_tag" => vec![1, 1, 1],
1438 "clicks" => vec![100, 10, 10],
1439 "imps" => vec![1000, 200, 200]
1440 };
1441 let mut column_frame = ColumnFrame::default();
1442 column_frame
1443 .add_single_column("group_id", Array1::from_vec(vec![]))
1444 .expect("BUG: cannot add column");
1445 let joined = column_frame2.join(column_frame, &join);
1446 assert!(joined.is_ok(), "{joined:?}");
1447
1448 trace!("{column_frame2:?}");
1449 assert_eq!(
1450 column_frame2.select(Some(&[
1451 "group_id".into(),
1452 "feed_tag".into(),
1453 "clicks".into(),
1454 "imps".into()
1455 ])),
1456 ndarray::array!(
1457 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1458 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1459 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1460 )
1461 );
1462 }
1463
1464 #[test]
1465 #[traced_test]
1466 fn join_test() {
1467 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1468 "group_id".into(),
1469 "feed_tag".into(),
1470 ])));
1471 let mut column_frame = column_frame! {
1472 "group_id" => vec![1, 2, 8],
1473 "feed_tag" => vec![1, 1, 10]
1474 };
1475 let column_frame2 = column_frame! {
1476 "group_id" => vec![2, 1, 3],
1477 "feed_tag" => vec![1, 1, 1],
1478 "clicks" => vec![100, 10, 10],
1479 "imps" => vec![1000, 200, 200]
1480 };
1481 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1482
1483 let joined = column_frame.join(column_frame2, &join);
1484 assert!(joined.is_ok(), "{joined:?}");
1485
1486 trace!("{column_frame:?}");
1487 assert_eq!(
1488 column_frame.select(Some(&[
1489 "group_id".into(),
1490 "feed_tag".into(),
1491 "clicks".into(),
1492 "imps".into()
1493 ])),
1494 ndarray::array!(
1495 [1.into(), 1.into(), 10.into(), 200.into()],
1496 [2.into(), 1.into(), 100.into(), 1000.into()],
1497 [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1498 )
1499 )
1500 }
1501
1502 #[test]
1503 #[traced_test]
1504 fn join_test_with_additional() {
1505 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1506 "group_id".into(),
1507 "feed_tag".into(),
1508 ])));
1509 let mut column_frame = column_frame! {
1510 "group_id" => vec![1, 2, 8],
1511 "feed_tag" => vec![1, 1, 10],
1512 "clicked" => vec![0, 0, 1]
1513 };
1514 let column_frame2 = column_frame! {
1515 "group_id" => vec![2, 1, 3],
1516 "feed_tag" => vec![1, 1, 1],
1517 "clicks" => vec![100, 10, 10],
1518 "imps" => vec![1000, 200, 200]
1519 };
1520 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1521
1522 let joined = column_frame.join(column_frame2, &join);
1523 assert!(joined.is_ok(), "{joined:?}");
1524
1525 trace!("{column_frame:?}");
1526 assert_eq!(
1527 column_frame.select(Some(&[
1528 "group_id".into(),
1529 "feed_tag".into(),
1530 "clicks".into(),
1531 "imps".into(),
1532 "clicked".into()
1533 ])),
1534 ndarray::array!(
1535 [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1536 [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1537 [
1538 8.into(),
1539 10.into(),
1540 DataValue::Null,
1541 DataValue::Null,
1542 1.into()
1543 ]
1544 )
1545 )
1546 }
1547
1548 #[test]
1549 #[traced_test]
1550 fn join_test_with_additional_single() {
1551 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1552 "group_id".into(),
1553 "feed_tag".into(),
1554 ])));
1555 let mut column_frame = column_frame! {
1556 "group_id" => vec![1, 2, 8],
1557 "feed_tag" => vec![1, 1, 10],
1558 "clicked" => vec![0, 0, 1]
1559 };
1560 let column_frame2 = column_frame! {
1561 "a" => vec![1],
1562 "group_id" => vec![2],
1563 "feed_tag" => vec![1],
1564 "clicks" => vec![10],
1565 "imps" => vec![200]
1566 };
1567 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1568
1569 let joined = column_frame.join(column_frame2, &join);
1570 assert!(joined.is_ok(), "{joined:?}");
1571
1572 trace!("{column_frame:?}");
1573 assert_eq!(
1574 column_frame.select(Some(&[
1575 "group_id".into(),
1576 "feed_tag".into(),
1577 "clicks".into(),
1578 "imps".into(),
1579 "clicked".into()
1580 ])),
1581 ndarray::array!(
1582 [
1583 1.into(),
1584 1.into(),
1585 DataValue::Null,
1586 DataValue::Null,
1587 0.into(),
1588 ],
1589 [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1590 [
1591 8.into(),
1592 10.into(),
1593 DataValue::Null,
1594 DataValue::Null,
1595 1.into()
1596 ]
1597 )
1598 )
1599 }
1600
1601 #[rstest]
1602 #[traced_test]
1603 fn cartesian_product_join() {
1604 let mut df = column_frame! {
1605 "group_id" => vec![1, 2, 3],
1606 "feed_tag" => vec![1, 2, 3]
1607 };
1608 let df2 = column_frame! {
1609 "zone_id" => vec![111111, 111133],
1610 "zone_avg_ctr" => vec![0.1, 0.001]
1611 };
1612 assert!(df
1613 .join(
1614 ColumnFrame::default(),
1615 &JoinRelation::new(JoinBy::CartesianProduct)
1616 )
1617 .is_ok());
1618 let join = JoinRelation::new(JoinBy::CartesianProduct);
1619 let result = df.join(df2, &join);
1620 assert!(result.is_ok(), "{result:?}");
1621 let selected = df.select(None);
1622 trace!("{selected:?}");
1623 assert_eq!(
1624 selected,
1625 ndarray::array!(
1626 [1.into(), 1.into(), 111111.into(), 0.1.into()],
1627 [1.into(), 1.into(), 111133.into(), 0.001.into()],
1628 [2.into(), 2.into(), 111111.into(), 0.1.into()],
1629 [2.into(), 2.into(), 111133.into(), 0.001.into()],
1630 [3.into(), 3.into(), 111111.into(), 0.1.into()],
1631 [3.into(), 3.into(), 111133.into(), 0.001.into()],
1632 )
1633 );
1634
1635 let df2 = column_frame! {
1636 "zone_id" => vec![111]
1637 };
1638 let result = df.join(df2, &join);
1639 assert!(result.is_ok(), "{result:?}");
1640 let selected = df.select(None);
1641 trace!("{selected:?}");
1642 assert_eq!(
1643 selected,
1644 ndarray::array!(
1645 [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
1646 [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
1647 [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
1648 [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
1649 [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
1650 [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
1651 )
1652 );
1653 }
1654
1655 #[rstest]
1656 #[traced_test]
1657 fn broadcast_join() {
1658 let mut df = column_frame! {
1659 "group_id" => vec![1, 2, 3],
1660 "feed_tag" => vec![1, 2, 3]
1661 };
1662 let df2 = column_frame! {
1663 "zone_id" => vec![111111]
1664 };
1665 assert!(df
1666 .join(
1667 ColumnFrame::default(),
1668 &JoinRelation::new(JoinBy::Broadcast)
1669 )
1670 .is_ok());
1671 let join = JoinRelation::new(JoinBy::Broadcast);
1672 assert!(df.join(df2, &join).is_ok());
1673 let selected = df.select(None);
1674 trace!("{selected:?}");
1675 assert_eq!(
1676 selected,
1677 ndarray::array!(
1678 [1.into(), 1.into(), 111111.into()],
1679 [2.into(), 2.into(), 111111.into()],
1680 [3.into(), 3.into(), 111111.into()]
1681 )
1682 );
1683 }
1684 #[rstest]
1685 #[traced_test]
1686 fn merge_test() {
1687 let mut df = column_frame! {
1688 "group_id" => vec![1, 2, 3],
1689 "feed_tag" => vec![1, 2, 3]
1690 };
1691 let df2 = column_frame! {
1692 "group_id" => vec![11, 21, 31],
1693 "feed_tag" => vec![12, 22, 32]
1694 };
1695
1696 let join = JoinRelation::new(JoinBy::Replace);
1697 assert!(df.join(df2, &join).is_ok());
1698 let selected = df.select(None);
1699 trace!("{selected:?}");
1700 assert_eq!(
1701 selected,
1702 ndarray::array!(
1703 [11.into(), 12.into()],
1704 [21.into(), 22.into()],
1705 [31.into(), 32.into()]
1706 )
1707 );
1708 }
1709
1710 #[rstest]
1711 #[traced_test]
1712 fn extend_test() {
1713 let mut df = column_frame! {
1714 "group_id" => vec![1, 2, 3],
1715 "feed_tag" => vec![1, 2, 3]
1716 };
1717 let df2 = column_frame! {
1718 "group_id" => vec![11, 21, 31],
1719 "feed_tag" => vec![5, 6, 7]
1720 };
1721 assert!(df
1722 .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
1723 .is_ok());
1724
1725 let join = JoinRelation::new(JoinBy::Extend);
1726 assert!(df.join(df2, &join).is_ok());
1727 let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
1728 trace!("{selected:?}");
1729 assert_eq!(
1730 selected,
1731 ndarray::array!(
1732 [1.into(), 1.into()],
1733 [2.into(), 2.into()],
1734 [3.into(), 3.into()],
1735 [5.into(), 11.into()],
1736 [6.into(), 21.into()],
1737 [7.into(), 31.into()]
1738 )
1739 );
1740 let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
1741 trace!("{as_map:?}");
1742 assert_eq!(
1743 as_map,
1744 stdhashmap!(
1745 "feed_tag" => vec![1, 2, 3, 5, 6, 7],
1746 "group_id" => vec![1, 2, 3, 11, 21, 31]
1747 )
1748 );
1749
1750 let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
1751 trace!("{as_map:?}");
1752 assert_eq!(as_map, HashMap::default());
1753 }
1754
1755 #[rstest]
1756 #[traced_test]
1757 fn extend_test_with_non_existing_cols() {
1758 let mut df = column_frame! {
1759 "group_id" => vec![1, 2, 3],
1760 "feed_tag" => vec![1, 2, 3]
1761 };
1762 let mut df2 = column_frame! {
1763 "group_id" => vec![11, 21, 31],
1764 "feed_tag" => vec![5, 6, 7],
1765 "clicks" => vec![100, 200, 300],
1766 "impressions" => vec![1000, 2000, 3000]
1767 };
1768 let df_bckp = df.clone();
1769 let join = JoinRelation::new(JoinBy::Extend);
1770 assert!(df.join(df2.clone(), &join).is_ok());
1771 let selected = df.select(None);
1772 trace!("{selected:?}");
1773 assert_eq!(
1774 selected,
1775 ndarray::array!(
1776 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
1777 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
1778 [3.into(), 3.into(), DataValue::Null, DataValue::Null],
1779 [11.into(), 5.into(), 100.into(), 1000.into()],
1780 [21.into(), 6.into(), 200.into(), 2000.into()],
1781 [31.into(), 7.into(), 300.into(), 3000.into()]
1782 )
1783 );
1784 let join = JoinRelation::new(JoinBy::Extend);
1785 let r = df2.join(df_bckp, &join);
1786 assert!(r.is_ok(), "{r:?}");
1787 let selected = df2.select(None);
1788 trace!("{selected:?}");
1789 assert_eq!(
1790 selected,
1791 ndarray::array!(
1792 [11.into(), 5.into(), 100.into(), 1000.into()],
1793 [21.into(), 6.into(), 200.into(), 2000.into()],
1794 [31.into(), 7.into(), 300.into(), 3000.into()],
1795 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
1796 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
1797 [3.into(), 3.into(), DataValue::Null, DataValue::Null]
1798 )
1799 );
1800 }
1801
1802 #[rstest]
1803 #[traced_test]
1804 fn extend_test_with_non_existing_cols_wrong_order() {
1805 let mut df = column_frame! {
1806 "group_id" => vec![1, 2, 3],
1807 "feed_tag" => vec![1, 2, 3]
1808 };
1809 let df2 = column_frame! {
1810 "feed_tag" => vec![5, 6, 7],
1811 "group_id" => vec![11, 21, 31]
1812 };
1813 let join = JoinRelation::new(JoinBy::Extend);
1814 let err = df.join(df2, &join);
1815 assert!(err.is_ok(), "{err:?}");
1816 }
1817
1818 #[rstest]
1819 #[traced_test]
1820 fn test_replace_not_compatible() {
1821 let mut df = column_frame! {
1822 "group_id" => vec![1, 2, 3],
1823 "feed_tag" => vec![1, 2, 3]
1824 };
1825 let df2 = column_frame! {
1826 "feed_tag" => vec![5, 6],
1827 "group_id" => vec![11, 21]
1828 };
1829 let join = JoinRelation::new(JoinBy::Replace);
1830 let err = df.join(df2, &join);
1831 assert!(err.is_err(), "{err:?}");
1832 let empty = ColumnFrame::default();
1833 let err = df.join(empty, &join);
1834 assert!(err.is_ok(), "{err:?}");
1835 }
1836
1837 #[rstest]
1838 #[traced_test]
1839 fn test_different_data() {
1840 let mut df = column_frame! {
1841 "group_id" => vec![1, 2, 3],
1842 "feed_tag" => vec![1, 2, 3]
1843 };
1844 let df2 = column_frame! {
1845 "group_id" => vec![11, 21],
1846 "a" => vec![5, 6]
1847 };
1848 let join = JoinRelation::new(JoinBy::Extend);
1849 let err = df.join(df2, &join);
1850 assert!(err.is_ok(), "{err:?}");
1851 println!("{df:?}");
1852 let expected_df = ColumnFrame::new(
1853 KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
1854 ndarray::array!(
1855 [1.into(), 1.into(), DataValue::Null],
1856 [2.into(), 2.into(), DataValue::Null],
1857 [3.into(), 3.into(), DataValue::Null],
1858 [11.into(), DataValue::Null, 5.into()],
1859 [21.into(), DataValue::Null, 6.into()]
1860 ),
1861 );
1862 assert_eq!(df, expected_df)
1863 }
1864
1865 #[rstest]
1866 #[traced_test]
1867 fn serde_column_frame() {
1868 let df = column_frame! {
1869 "group_id" => vec![1u64, 2u64, 3u64],
1870 "feed_tag" => vec![1u64, 2u64, 3u64]
1871 };
1872 let key_idx = df.index.clone();
1873 let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
1874 let deserialized: KeyIndex =
1875 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
1876 assert_eq!(key_idx, deserialized);
1877 assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
1878 let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
1879 let deserialized: ColumnFrame =
1880 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
1881 assert_eq!(df, deserialized);
1882 }
1883
1884 #[rstest]
1885 #[traced_test]
1886 fn update_value() {
1887 let mut df = column_frame! {
1888 "group_id" => vec![1, 2, 3],
1889 "feed_tag" => vec![1, 2, 3]
1890 };
1891 let group_id: Key = "group_id".into();
1892 let v = df.get_mut_by_row_index(&group_id, 1);
1893 assert!(v.is_some());
1894 let v = v.unwrap();
1895 assert_eq!(v, &DataValue::I32(2));
1896 *v = DataValue::U64(22);
1897 let v = df.get_by_row_index(&group_id, 1);
1898 assert!(v.is_some());
1899 let v = v.unwrap();
1900 assert_eq!(v, &DataValue::U64(22));
1901
1902 assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
1903 }
1904}