1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
22pub struct ColumnFrame {
23 pub index: KeyIndex,
24 pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28 Continue,
29 End,
30}
31
32impl Continue {
33 pub fn should_end(&self) -> bool {
34 matches!(self, Self::End)
35 }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "\n|")?;
44
45 for key in &self.index.keys {
46 write!(f, " {key} |")?;
47 }
48
49 if self.index.is_empty() {
50 writeln!(f, "|")?;
51 }
52
53 if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55 write!(f, "\n|")?;
56 for value in row.iter() {
57 write!(f, " {:10?} |", crate::detect_dtype(value))?;
59 }
60 writeln!(f)?;
61 }
62
63 writeln!(f, "---")?;
64
65 for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67 write!(f, "|")?;
68
69 for value in row.iter() {
70 write!(f, " {value} |")?;
72 }
73 writeln!(f)?;
74
75 if n >= 256 {
76 writeln!(f, "... (dataframe is too long)")?;
77 break;
78 }
79 }
80
81 writeln!(f, "---")
82 }
83}
84
85impl ColumnFrame {
86 pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
87 Self {
88 index: index.into(),
89 data_frame,
90 }
91 }
92
93 pub fn keys(&self) -> &[Key] {
94 self.index.get_keys()
95 }
96
97 pub fn len(&self) -> usize {
98 self.data_frame.nrows()
99 }
100
101 pub fn is_empty(&self) -> bool {
102 self.data_frame.nrows() == 0
103 }
104
105 pub fn shrink(&mut self) {
106 }
108
109 pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
110 let mut errors = vec![];
111 let keys = self.index.keys.clone();
112 for key in keys {
113 tracing::trace!("key: {key:?}- {:?}", key.ctype);
114 if let Err(e) = self.try_fix_column_by_key(&key) {
115 errors.push((key, e.to_string()));
116 }
117 }
118 if errors.is_empty() {
119 Ok(())
120 } else {
121 Err(Error::CastFailed(errors))
122 }
123 }
124
125 pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
126 let idx = self
127 .index
128 .get_column_index(key)
129 .ok_or(Error::MissingField(format!("{key}").into()))?;
130 let mut col = self.data_frame.column_mut(idx);
131
132 col.mapv_inplace(|item| {
133 let x = &item;
134 match key.ctype {
135 crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
136 crate::DataType::U32 => DataValue::U32(u32::extract(x)),
137 crate::DataType::I32 => DataValue::I32(i32::extract(x)),
138 crate::DataType::U64 => DataValue::U64(u64::extract(x)),
139 crate::DataType::I64 => DataValue::I64(i64::extract(x)),
140 crate::DataType::F32 => DataValue::F32(f32::extract(x)),
141 crate::DataType::F64 => DataValue::F64(f64::extract(x)),
142 crate::DataType::U8 => DataValue::U8(u8::extract(x)),
143 crate::DataType::String => DataValue::String(String::extract(x).into()),
144 crate::DataType::Bytes => item,
145 crate::DataType::Map => item,
146 crate::DataType::Vec => item,
147 crate::DataType::Unknown => item,
148 }
149 });
150 Ok(())
151 }
152
153 pub fn get_mut_view(&mut self) -> ArrayViewMut2<DataValue> {
154 self.data_frame.view_mut()
155 }
156
157 pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
158 self.index.rename_key(old, new)
159 }
160
161 pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
162 self.index.add_alias(key, alias)
163 }
164
165 pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
171 let selected = self.select(Some(keys));
172 let mut result = Vec::with_capacity(selected.nrows());
173 for row in selected.rows() {
174 let mut r = Vec::with_capacity(selected.ncols());
175 for value in row.iter() {
176 r.push(D::extract(value));
177 }
178 result.push(r);
179 }
180 result
181 }
182
183 pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
189 let keys = keys.unwrap_or_else(|| self.index.get_keys());
190 let key_indexes = self.index.select(keys);
191 if key_indexes.is_empty() {
192 return Ok(Array2::default((0, 0)));
193 }
194 let data_vec: Vec<Array1<DataValue>> = key_indexes
195 .indexes()
196 .iter()
197 .map(|x| self.data_frame.column(*x).to_owned())
198 .collect();
199 to_array2(data_vec)
200 }
201
202 pub fn select_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
206 self.index
207 .get_column_index(key)
208 .map(|x| self.data_frame.column(x))
209 }
210
211 pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
212 where
213 F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
214 {
215 func(keys, self)
216 }
217
218 pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
223 if row_index >= self.data_frame.nrows() {
224 return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
225 }
226 let Some(column_index) = self.index.get_column_index(column) else {
227 return Err(Error::NotFound(column.clone()));
228 };
229 Ok(column_index)
230 }
231
232 pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
236 trace!(
237 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
238 self.data_frame.len(),
239 self.data_frame.nrows()
240 );
241 trace!("{:?}", self.data_frame);
242 match self.validate_entry_access(column, row_index) {
243 Ok(column_index) => self.data_frame.get((row_index, column_index)),
244 Err(e) => {
245 trace!("Error: {e}");
246 None
247 }
248 }
249 }
250
251 pub fn get_mut_by_row_index(
255 &mut self,
256 column: &Key,
257 row_index: usize,
258 ) -> Option<&mut DataValue> {
259 trace!(
260 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
261 self.data_frame.len(),
262 self.data_frame.nrows()
263 );
264 trace!("{:?}", self.data_frame);
265 match self.validate_entry_access(column, row_index) {
266 Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
267 Err(e) => {
268 trace!("Error: {e}");
269 None
270 }
271 }
272 }
273
274 pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
279 let keys = keys.unwrap_or_else(|| self.index.get_keys());
280 let indexes = self.index.select(keys);
281 if indexes.is_empty() {
282 return Default::default();
283 }
284
285 let mut new_data_frame = HashMap::with_capacity(keys.len());
286
287 for key in keys.iter() {
288 if let Some(column_index_in_source) = indexes.get_column_index(key) {
289 let column = self.data_frame.column(column_index_in_source);
290 new_data_frame.insert(key.clone(), column.to_vec());
291 }
292 }
293
294 new_data_frame
295 }
296
297 pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
302 let keys = keys.unwrap_or_else(|| self.index.get_keys());
303 let indexes = self.index.select(keys);
304 if indexes.is_empty() {
305 return Array2::default((0, 0));
306 }
307 let mut new_data_frame = Array2::default((self.data_frame.nrows(), keys.len()));
308
309 for (idx, key) in keys.iter().enumerate() {
310 if let Some(column_index_in_source) = indexes.get_column_index(key) {
311 new_data_frame
312 .column_mut(idx)
313 .assign(&self.data_frame.column(column_index_in_source));
314 }
315 }
316
317 new_data_frame
318 }
319
320 fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
321 self.index.store_key(key);
322 let len = self.data_frame.nrows();
323 self.data_frame.push_column(Array1::default(len).view())?;
324 Ok(())
325 }
326
327 pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
331 let mut arr = vec![];
332 for key in row_candidate.keys() {
333 if self.index.get_column_index(&key).is_none() {
334 self.extend_dataframe_for_column(key)?;
335 }
336 }
337 for index in self.index.get_keys() {
338 if let Some(value) = row_candidate.get_value_ref(index) {
339 arr.push(value.clone());
340 } else {
341 arr.push(DataValue::Null);
342 }
343 }
344 self.data_frame.push_row(Array::from_vec(arr).view())?;
345 Ok(())
346 }
347
348 pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
349 let mut indexes = KeyIndex::default();
351 let data = self.select(Some(keys));
353 for key in keys {
355 if let Some((current, _idx)) = self.index.remove_key(key) {
356 indexes.store_key(current);
357 }
358 }
359 let rest = self.select(None);
361 let keys = self.index.get_keys().to_vec();
362 self.data_frame = rest;
363 self.index = KeyIndex::new(keys);
364
365 Ok(Self::new(indexes, data))
367 }
368
369 fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
370 if self.index.is_empty() {
371 self.index = other.index.clone();
372 self.data_frame = other.data_frame.clone();
373 return Ok(Continue::End);
374 }
375 if other.index.is_empty() {
376 return Ok(Continue::End);
377 }
378 if self.is_empty() {
379 self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
380 }
381
382 Ok(Continue::Continue)
383 }
384
385 fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
386 for key in other.index.get_keys() {
387 if self.index.get_column_index(key).is_none() {
388 self.extend_dataframe_for_column(key.clone())?;
389 }
390 }
391 Ok(())
392 }
393
394 fn try_extend(&mut self, other: Self) -> Result<(), Error> {
395 let mut joined_keys = self.index.clone();
396 for key in other.keys() {
398 if self.index.get_column_index(key).is_none() {
399 joined_keys.store_key(key.clone());
400 }
401 }
402
403 let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
404 let mut arr = Array2::default((sum_len, joined_keys.len()));
405 let increment = self.data_frame.nrows();
406 for key in joined_keys.get_keys() {
407 let index_result = joined_keys
408 .get_column_index(key)
409 .expect("BUG: index for this has to be defined");
410 if let Some(index) = self.index.get_column_index(key) {
411 for (idx, value) in self.data_frame.column(index).iter().enumerate() {
412 if let Some(x) = arr.get_mut((idx, index_result)) {
413 *x = value.to_owned();
414 }
415 }
416 }
417
418 if let Some(index) = other.index.get_column_index(key) {
419 for (idx, value) in other.data_frame.column(index).iter().enumerate() {
420 if let Some(x) = arr.get_mut((increment + idx, index_result)) {
421 *x = value.to_owned();
422 }
423 }
424 }
425 }
426 *self = ColumnFrame::new(joined_keys, arr);
427 Ok(())
428 }
429
430 pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
438 if self.check_or_init_frame(&other)?.should_end() {
439 return Ok(());
440 }
441
442 if self.index.check_order_of_indexes(&other.index).is_err() {
443 return self.try_extend(other);
444 }
445
446 trace!(
447 "Extend columns from other {:?} vs {:?}",
448 other.index.get_keys(),
449 self.index.get_keys()
450 );
451
452 if other.data_frame.ncols() < self.data_frame.ncols() {
453 other.extend_columns_from_other(self)?;
454 } else {
455 self.extend_columns_from_other(&other)?;
456 }
457 self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
458
459 Ok(())
460 }
461
462 pub fn replace(&mut self, other: Self) -> Result<(), Error> {
468 if self.check_or_init_frame(&other)?.should_end() {
469 return Ok(());
470 }
471
472 if self.data_frame.len() > other.data_frame.len() {
473 return Err(Error::DataSetSizeDoesntMatch(
474 self.data_frame.len(),
475 other.data_frame.len(),
476 ));
477 }
478 self.index = other.index;
479 self.data_frame = other.data_frame;
480
481 Ok(())
482 }
483
484 pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
487 if self.check_or_init_frame(&right)?.should_end() {
488 return Ok(());
489 }
490 let new_columnns = right.index.get_complement_keys(self.index.get_keys());
491 self.extend_columns_from_other(&right)?;
493 let index = Index::new(keys.to_vec(), self);
495 tracing::trace!("Index {index:?}");
496 let right_indexes = right.index.select(keys);
497
498 let mut new_df = Array2::default((self.len(), self.index.len()));
499 new_df.assign(&self.data_frame);
500 trace!("Indexes: {right_indexes:?}");
501 trace!("New DF: {new_df:?}");
502 trace!("Right DF: {:?}", right.data_frame);
503 trace!("current {:?}", self.data_frame);
504 for c_row in right.data_frame.rows() {
508 let values = right_indexes
509 .get_keys()
510 .iter()
511 .map(|k| {
512 c_row
513 .get(
514 right_indexes
515 .get_column_index(k)
516 .expect("BUG: Something is very wrong"),
517 )
518 .cloned()
519 .unwrap_or_default()
520 })
521 .collect::<Vec<_>>();
522 if let Some(row_index) = index.get(&values) {
523 trace!("Index: {row_index} {c_row:?} -> {values:?} vs {new_df:?}");
524 for complement_key in new_columnns.iter() {
525 let column_index = self
526 .index
527 .get_column_index(complement_key)
528 .expect("BUG: Something is very wrong");
529 if let Some(right_column_index) = right.index.get_column_index(complement_key) {
530 trace!("Filling Index: [{complement_key:?}] ri:{row_index} rci:{right_column_index:?} -> {:?} vs {:?}", c_row[right_column_index], new_df.get_mut((row_index, column_index)));
531 if let Some(v) = new_df.get_mut((row_index, column_index)) {
532 *v = c_row[right_column_index].to_owned();
533 }
534 }
535 }
536 }
537 }
538
539 self.data_frame = new_df;
540
541 Ok(())
542 }
543
544 pub fn add_single_column<K: Into<Key>>(
548 &mut self,
549 key: K,
550 column: Array1<DataValue>,
551 ) -> Result<(), Error> {
552 let key = key.into();
553 if self.index.get_column_index(&key).is_some() {
554 return Err(Error::ColumnAlreadyExists(key));
555 }
556 if self.len() != column.len() && !self.is_empty() {
557 return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
558 }
559
560 self.index.store_key(key.clone());
561 let rows = column.len();
562 let column_index = self
563 .index
564 .get_column_index(&key)
565 .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
566 if self.is_empty() && self.index.len() == 1 {
567 self.data_frame = column.into_shape_clone((rows, 1))?;
568 assert_eq!(self.data_frame.column(column_index).len(), rows);
569 } else if self.is_empty() {
570 self.data_frame = Array2::default((column.len(), self.index.len() - 1));
571 self.data_frame.push_column(column.view())?;
572 assert_eq!(self.data_frame.column(column_index).len(), rows);
573 } else {
574 self.data_frame.push_column(column.view())?;
575 }
576 assert_eq!(self.data_frame.column(column_index).len(), rows);
577
578 Ok(())
579 }
580 pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
584 if self.check_or_init_frame(&other)?.should_end() {
585 return Ok(());
586 }
587
588 self.extend_columns_from_other(&other)?;
589 for (idx, key) in other.index.get_keys().iter().enumerate() {
590 if let Some(index) = self.index.get_column_index(key) {
591 trace!("Other array = {:?}", other.data_frame.dim());
592 if other.data_frame.dim() == (0, 0) {
593 self.data_frame.column_mut(index).fill(DataValue::Null);
594 continue;
595 }
596 let arr = other.data_frame.column(idx);
597 trace!(
598 "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
599 self.data_frame.nrows(),
600 arr.len()
601 );
602 if arr.len() != self.data_frame.nrows() {
603 self.data_frame.column_mut(index).fill(DataValue::Null);
604 } else {
605 self.data_frame.column_mut(index).assign(&arr);
606 }
607 }
608 }
609 Ok(())
610 }
611
612 pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
617 if self.check_or_init_frame(&other)?.should_end() {
618 return Ok(());
619 }
620 if other.data_frame.nrows() != 1 {
621 return Err(Error::CannotBroadcast);
622 }
623 self.extend_columns_from_other(&other)?;
624 let mut new_df = Array2::default((self.len(), self.index.len()));
625 for (idx, key) in self.index.get_keys().iter().enumerate() {
626 if let Some(other_idx) = other.index.get_column_index(key) {
627 new_df
628 .column_mut(idx)
629 .assign(&other.data_frame.column(other_idx));
630 } else {
631 new_df.column_mut(idx).assign(&self.data_frame.column(idx));
632 }
633 }
634 self.data_frame = new_df;
635 Ok(())
636 }
637
638 pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
643 if self.check_or_init_frame(&other)?.should_end() {
644 return Ok(());
645 }
646 for other_key in other.keys() {
649 if self.index.get_column_index(other_key).is_none() {
650 self.index.store_key(other_key.clone());
651 } else {
652 self.index.store_key(Key::new(
653 format!("{}-{}", other_key, other_key.id()).as_str(),
654 other_key.ctype,
655 ));
656 }
657 }
658 let max_rows = self.len() * other.len();
659 let ncols = self.index.len();
660 let mut new_df = Array2::default((max_rows, ncols));
662
663 let mut cur_idx = 0;
664 for cur_row in self.data_frame.rows() {
665 for other_row in other.data_frame.rows() {
666 new_df
667 .slice_mut(s![cur_idx, ..])
668 .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
669 cur_idx += 1;
670 }
671 }
672 self.data_frame = new_df;
673 Ok(())
674 }
675
676 pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
684 use JoinBy::*;
685 match &join_type.join_type {
686 AddColumns => self.add_columns(right),
687 Replace => self.replace(right),
688 Extend => self.extend(right),
689 Broadcast => self.broadcast(right),
690 CartesianProduct => self.cartesian_product(right),
691 JoinById(join) => self.join_by_id_inner(right, &join.keys),
692 }
693 }
694
695 pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
696 self.index
697 .get_column_index(key)
698 .map(|x| self.data_frame.column(x))
699 }
700
701 pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
702 let index = self
703 .index
704 .get_column_index(key)
705 .ok_or(Error::NotFound(key.clone()))?;
706 let column = self.data_frame.column(index);
707 let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
708 tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
709 data_with_index.sort_by(|(a_idx, a_val), (b_idx, b_val)| {
710 a_val
711 .partial_cmp(b_val)
712 .unwrap_or(std::cmp::Ordering::Equal)
713 .then_with(|| a_idx.cmp(b_idx))
714 });
715
716 tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
717 let indicies = data_with_index
718 .into_iter()
719 .map(|(idx, _)| idx)
720 .collect::<Vec<_>>();
721
722 Ok(sorted_df::SortedDataFrame::new(self, indicies))
723 }
724
725 pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
726 let mut final_indices = Vec::new();
727 let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
728 for rule in &filter.rules {
729 final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
730 }
731
732 final_indices.sort_unstable();
733 final_indices.dedup();
734 let mut new_df = ColumnFrame::new(
735 self.index.clone(),
736 Array2::default((final_indices.len(), self.index.len())),
737 );
738 final_indices
739 .iter()
740 .enumerate()
741 .for_each(|(cur_idx, row_idx)| {
742 new_df
743 .data_frame
744 .slice_mut(s![cur_idx, ..])
745 .assign(&self.data_frame.slice(s![*row_idx, ..]));
746 });
747
748 Ok(new_df)
749 }
750}
751
752pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
753 let width = source.len();
754 let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
755 let height = flattened.len() / width;
756 Ok(flattened.into_shape_with_order((width, height))?)
757}
758#[macro_export]
759macro_rules! df {
760 ($($everything:tt)*) => {
761 $crate::DataFrame::new($crate::column_frame!($($everything)*))
762 };
763}
764
765#[macro_export]
766macro_rules! column_frame {
767 ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
769 ($($key:expr => vec![$($value:expr),*]),*) => {
771 $crate::column_frame!($($key => [$($value),*]),*)
772 };
773 ($($key:expr => [$($value:expr),*]),*) => {
775 {
776 let data = ::ndarray::arr2(&[$(
777 [$($value.into(),)*],
778 )*]);
779
780 let _keys = vec![$($key.into(),)*];
781
782 $crate::ColumnFrame::new(
783 $crate::KeyIndex::new(_keys),
784 data.reversed_axes()
785 )
786 }
787 };
788 ($($key:expr => $value:expr),*) => {
790 {
791 let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
792 let _keys = vec![$($key.into(),)*];
793
794 $crate::ColumnFrame::new(
795 $crate::KeyIndex::new(_keys),
796 _data,
797 )
798 }
799 };
800}
801
802#[cfg(test)]
803mod test {
804 use crate::{filter::FilterRules, JoinById};
805
806 use super::*;
807 use data_value::stdhashmap;
808 use ndarray::ArrayView;
809 use rstest::*;
810 use tracing_test::traced_test;
811
812 #[rstest]
813 #[case(
814 column_frame! {
815 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
816 "b" => [4, 5, 6],
817 "c" => [7, 8, 9]
818 },
819 column_frame! {
820 "t" => [1752001987000000u64],
821 "b" => [5],
822 "c" => [8]
823 },
824 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
825 )]
826 #[case(
827 column_frame! {
828 "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
829 "b" => [4, 5, 6],
830 "c" => [7, 8, 9]
831 },
832 column_frame! {
833 "t" => [1752001987000000f64],
834 "b" => [5],
835 "c" => [8]
836 },
837 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
838 )]
839 #[case(
840 column_frame! {
841 "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
842 "b" => [4, 5, 6],
843 "c" => [7, 8, 9]
844 },
845 column_frame! {
846 "t" => [1752001987000000i64],
847 "b" => [5],
848 "c" => [8]
849 },
850 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
851 )]
852 #[case(
853 column_frame! {
854 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
855 "b" => [4, 5, 6],
856 "c" => [7, 8, 9]
857 },
858 column_frame! {
859 "t" => [1751001987000000u64],
860 "b" => [4],
861 "c" => [7]
862 },
863 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
864 )]
865 #[case(
866 column_frame! {
867 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
868 "b" => [4, 5, 6],
869 "c" => [7, 8, 9]
870 },
871 column_frame! {
872 "t" => ["2025-07-08 18:13:07"],
873 "b" => [4],
874 "c" => [7]
875 },
876 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
877 )]
878 #[case(
879 column_frame! {
880 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
881 "b" => [4, 5, 6],
882 "c" => [7, 8, 9]
883 },
884 column_frame! {
885 "t" => [],
886 "b" => [],
887 "c" => []
888 },
889 FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
890 )]
891 #[case(
892 column_frame! {
893 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
894 "b" => [4, 5, 6],
895 "c" => [7, 8, 9]
896 },
897 column_frame! {
898 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
899 "b" => [4, 5, 6],
900 "c" => [7, 8, 9]
901 },
902 FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
903 )]
904 #[case(
905 column_frame! {
906 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
907 "b" => [4, 5, 6],
908 "c" => [7, 8, 9]
909 },
910 column_frame! {
911 "t" => [DataValue::Vec(vec![])],
912 "b" => [5],
913 "c" => [ 8]
914 },
915 FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
916 )]
917 #[case(
918 column_frame! {
919 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
920 "b" => [4, 5, 6],
921 "c" => [7, 8, 9]
922 },
923 column_frame! {
924 "t" => [DataValue::Vec(vec![1.into()])],
925 "b" => [6],
926 "c" => [9]
927 },
928 FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
929 )]
930 #[case(
931 column_frame! {
932 "a" => [1, 2, 3],
933 "b" => [4, 5, 6],
934 "c" => [7, 8, 9]
935 },
936 column_frame! {
937 "a" => [1, 2],
938 "b" => [4, 5],
939 "c" => [7, 8]
940 },
941 FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
942 )]
943 #[case(
944 column_frame! {
945 "a" => [1, 2, 3],
946 "b" => [4, 5, 6],
947 "c" => [7, 8, 9]
948 },
949 column_frame! {
950 "a" => [2],
951 "b" => [5],
952 "c" => [8]
953 },
954 FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
955 )]
956 #[case(
957 column_frame! {
958 "a" => [1, 2, 3],
959 "b" => [4, 5, 6],
960 "c" => [7, 8, 9]
961 },
962 column_frame! {
963 "a" => [],
964 "b" => [],
965 "c" => []
966 },
967 FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
968 )]
969 #[case(
970 column_frame! {
971 "a" => [1, 2, 3],
972 "b" => [4, 5, 6],
973 "c" => [7, 8, 9]
974 },
975 column_frame! {
976 "a" => [1, 2],
977 "b" => [4, 5],
978 "c" => [7, 8]
979 },
980 FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
981 )]
982 #[case(
983 column_frame! {
984 "a" => [1, 2, 3],
985 "b" => [4, 5, 6],
986 "c" => [7, 8, 9]
987 },
988 column_frame! {
989 "a" => [2],
990 "b" => [5],
991 "c" => [8]
992 },
993 FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
994 )]
995 #[case(
996 column_frame! {
997 "a" => ["abcd", "ab", "abcdefg"],
998 "b" => [4, 5, 6],
999 "c" => [7, 8, 9]
1000 },
1001 column_frame! {
1002 "a" => ["abcd","abcdefg"],
1003 "b" => [4, 6],
1004 "c" => [7, 9]
1005 },
1006 FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1007 )]
1008 #[case(
1009 column_frame! {
1010 "a" => [1, 2, 3],
1011 "b" => [4, 5, 6],
1012 "c" => [7, 8, 9]
1013 },
1014 column_frame! {
1015 "a" => [1],
1016 "b" => [4],
1017 "c" => [7]
1018 },
1019 FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1020 )]
1021 #[case(
1022 column_frame! {
1023 "a" => [1, 2, 3],
1024 "b" => [4, 5, 6],
1025 "c" => [7, 8, 9]
1026 },
1027 column_frame! {
1028 "a" => [2, 3],
1029 "b" => [5, 6],
1030 "c" => [8, 9]
1031 },
1032 FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1033 )]
1034 #[case(
1035 column_frame! {
1036 "a" => [1f64, 2f64, 3f64],
1037 "b" => [4, 5, 6],
1038 "c" => [7, 8, 9]
1039 },
1040 column_frame! {
1041 "a" => [1f64, 2f64],
1042 "b" => [4, 5],
1043 "c" => [7, 8]
1044 },
1045 FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1046 )]
1047 #[case(
1048 column_frame! {
1049 "a" => [1f64, 2f64, 3f64],
1050 "b" => [4i64, 5i64, 6i64],
1051 "c" => [7i64, 8i64, 9i64]
1052 },
1053 column_frame! {
1054 "a" => [1f64, 2f64],
1055 "b" => [4i64, 5i64],
1056 "c" => [7i64, 8i64]
1057 },
1058 FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1059 )]
1060 #[traced_test]
1061 fn filter_test(
1062 #[case] df: ColumnFrame,
1063 #[case] expected: ColumnFrame,
1064 #[case] filter: FilterRules,
1065 ) {
1066 let filtered = df.filter(&filter).expect("BUG: cannot filter");
1067 assert_eq!(filtered, expected);
1068 }
1069
1070 #[rstest]
1071 #[traced_test]
1072 fn test_macro() {
1073 let df = column_frame! {
1074 "a" => 1,
1075 "b" => 2,
1076 "c" => 3,
1077 "d" => 4,
1078 };
1079
1080 assert_eq!(df.len(), 1);
1081 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1082 let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1083 .expect("BUG: cannot create array");
1084 assert_eq!(df.select(None), f);
1085
1086 let df = column_frame! {
1087 "a" => [1, 2, 3],
1088 "b" => [4, 5, 6],
1089 "c" => [7, 8, 9]
1090 };
1091
1092 assert_eq!(df.len(), 3);
1093 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1094 let f = Array2::from_shape_vec(
1095 (3, 3),
1096 vec![
1097 1.into(),
1098 4.into(),
1099 7.into(),
1100 2.into(),
1101 5.into(),
1102 8.into(),
1103 3.into(),
1104 6.into(),
1105 9.into(),
1106 ],
1107 )
1108 .expect("BUG: cannot create array");
1109 let selected = df.select(None);
1110 trace!("{selected:?}");
1111 assert_eq!(selected, f);
1112
1113 let df1 = df! {
1114 "a" => [1, 2, 3],
1115 "b" => [4, 5, 6],
1116 "c" => [7, 8, 9]
1117 };
1118
1119 let formatted = format!("{}", df);
1121 debug!("{}", formatted);
1122
1123 assert_eq!(df1, crate::DataFrame::from(df));
1124 }
1125
1126 #[rstest]
1127 #[case(
1128 column_frame! {
1129 "a" => [1, 2, 3],
1130 "b" => [4, 5, 6],
1131 "c" => [7, 8, 9]
1132 },
1133 column_frame! {
1134 "a_new" => [1, 2, 3],
1135 "b" => [4, 5, 6],
1136 "c" => [7, 8, 9]
1137 },
1138 vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1139 vec![("a", "a_new".into())]
1140 )]
1141 #[traced_test]
1142 fn rename_test(
1143 #[case] df: ColumnFrame,
1144 #[case] expected: ColumnFrame,
1145 #[case] keys: Vec<Key>,
1146 #[case] renames: Vec<(&str, Key)>,
1147 ) {
1148 let mut df = df;
1149 for (old, new) in renames {
1150 df.rename_key(old, new).expect("BUG: cannot rename key");
1151 }
1152 assert_eq!(df, expected);
1153 assert_eq!(df.keys(), keys.as_slice());
1154 }
1155
1156 #[rstest]
1157 #[case(
1158 column_frame!("a" => [1, 2, 3]),
1159 Key::new("a", crate::DataType::I32),
1160 column_frame!("a" => [1i32, 2i32, 3i32])
1161 )]
1162 #[case(
1163 column_frame!("a" => [1, 2, 3]),
1164 Key::new("a", crate::DataType::U32),
1165 column_frame!("a" => [1u32, 2u32, 3u32])
1166 )]
1167 #[case(
1168 column_frame!("a" => [1, 2, 3]),
1169 Key::new("a", crate::DataType::I64),
1170 column_frame!("a" => [1i64, 2i64, 3i64])
1171 )]
1172 #[case(
1173 column_frame!("a" => [1, 2, 3]),
1174 Key::new("a", crate::DataType::U64),
1175 column_frame!("a" => [1u64, 2u64, 3u64])
1176 )]
1177 #[case(
1178 column_frame!("a" => [1, 2, 3]),
1179 Key::new("a", crate::DataType::F64),
1180 column_frame!("a" => [1f64, 2f64, 3f64])
1181 )]
1182 #[case(
1183 column_frame!("a" => [1, 2, 3]),
1184 Key::new("a", crate::DataType::F32),
1185 column_frame!("a" => [1f32, 2f32, 3f32])
1186 )]
1187 fn test_try_fix_dtype(
1193 #[case] mut df: ColumnFrame,
1194 #[case] key: Key,
1195 #[case] expected: ColumnFrame,
1196 ) {
1197 assert!(df.try_fix_column_by_key(&key).is_ok());
1198 assert_eq!(
1199 df.select(Some(&[key.clone()])),
1200 expected.select(Some(&[key.clone()]))
1201 );
1202 }
1203 #[rstest]
1204 #[case(
1205 column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1206 Key::new("a", crate::DataType::F32),
1207 column_frame!("a" => [1f32, 2f32, 3f32])
1208 )]
1209 #[traced_test]
1210 fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1211 assert!(df.try_fix_dtype().is_ok());
1212 assert_eq!(
1213 df.select(Some(&[key.clone()])),
1214 expected.select(Some(&[key]))
1215 )
1216 }
1217
1218 #[rstest]
1219 #[traced_test]
1220 fn test_not_key_fix() {
1221 let mut cf = column_frame!("a" => [1]);
1222 let non_existing = Key::new("b", crate::DataType::I32);
1223 assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1224 }
1225
1226 #[rstest]
1227 #[case(
1228 column_frame! {
1229 "a" => [1, 2, 3],
1230 "b" => [4, 5, 6],
1231 "c" => [7, 8, 9]
1232 },
1233 vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1234 vec![("a", "a_alias")]
1235 )]
1236 #[traced_test]
1237 fn alias_test(
1238 #[case] df: ColumnFrame,
1239 #[case] keys: Vec<Key>,
1240 #[case] aliases: Vec<(&str, &str)>,
1241 ) {
1242 let mut df = df;
1243 for (old, new) in aliases {
1244 df.add_alias(old, new).expect("BUG: cannot rename key");
1245 }
1246 let origin_keys = df.keys().to_vec();
1247 let selected_aliases = df.select(Some(keys.as_slice()));
1248 let selected = df.select(Some(origin_keys.as_slice()));
1249 assert_eq!(selected, selected_aliases);
1250 }
1251
1252 #[rstest]
1253 #[traced_test]
1254 fn test_mut_view() {
1255 let data = vec![
1256 DataValue::from(1f64),
1257 DataValue::from(4f32),
1258 DataValue::from(2f64),
1259 DataValue::from(f32::NAN),
1260 DataValue::from(f64::NAN),
1261 DataValue::from(f32::INFINITY),
1262 ];
1263 let keys: Vec<Key> = vec!["a".into(), "b".into()];
1264
1265 let index = KeyIndex::new(keys.clone());
1266 let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1267 let mut df = ColumnFrame::new(index.clone(), df);
1268 df.get_mut_view().mapv_inplace(|x| match x {
1269 DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1270 DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1271 e => e,
1272 });
1273 let data = vec![
1274 DataValue::from(1f64),
1275 DataValue::from(4f32),
1276 DataValue::from(2f64),
1277 DataValue::from(0f32),
1278 DataValue::from(0f64),
1279 DataValue::from(0f32),
1280 ];
1281 let expected = ColumnFrame::new(
1282 index,
1283 Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1284 );
1285 assert_eq!(df, expected);
1286 }
1287
1288 #[rstest]
1289 #[traced_test]
1290 fn dummy_test() {
1291 let data = vec![
1292 DataValue::U32(1),
1293 DataValue::I32(2),
1294 DataValue::I64(3),
1295 DataValue::U64(4),
1296 ];
1297
1298 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1299
1300 let index = KeyIndex::new(keys.clone());
1301 let mut data_frame = Array2::default((1, keys.len()));
1302 for (idx, entry) in data.iter().enumerate() {
1303 data_frame
1304 .column_mut(idx)
1305 .assign(&ArrayView::from(&[entry.clone()]));
1306 }
1307
1308 let frame = ColumnFrame::new(index, data_frame);
1309 assert_eq!(
1310 frame.get_by_row_index(&"a".into(), 0),
1311 Some(&DataValue::U32(1))
1312 );
1313 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1314 assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1315 assert_eq!(
1316 frame.select(Some(&["a".into(), "b".into()])),
1317 Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1318 .expect("BUG: cannot create array")
1319 );
1320 }
1321
1322 #[rstest]
1323 #[traced_test]
1324 fn dummy_test_multiple_rows() {
1325 let data = vec![
1326 DataValue::U32(1),
1327 DataValue::I32(2),
1328 DataValue::I64(3),
1329 DataValue::U64(4),
1330 DataValue::U32(12),
1331 DataValue::I32(22),
1332 DataValue::I64(32),
1333 DataValue::U64(42),
1334 ];
1335
1336 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1337
1338 let index = KeyIndex::new(keys.clone());
1339 let data_frame =
1340 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1341
1342 let frame = ColumnFrame::new(index, data_frame);
1343 assert_eq!(
1344 frame.get_by_row_index(&"a".into(), 0),
1345 Some(&DataValue::U32(1))
1346 );
1347 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1348 assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1349 let arr = Array2::from_shape_vec(
1350 (2, 2),
1351 vec![
1352 DataValue::U32(1),
1353 DataValue::I32(2),
1354 DataValue::U32(12),
1355 DataValue::I32(22),
1356 ],
1357 )
1358 .expect("BUG: cannot create array");
1359 trace!("{arr:?}");
1360 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1361 }
1362
1363 #[rstest]
1364 #[traced_test]
1365 fn dummy_test_multiple_rows_push() {
1366 let data = vec![
1367 DataValue::U32(1),
1368 DataValue::I32(2),
1369 DataValue::I64(3),
1370 DataValue::U64(4),
1371 DataValue::U32(12),
1372 DataValue::I32(22),
1373 DataValue::I64(32),
1374 DataValue::U64(42),
1375 ];
1376 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1377
1378 let index = KeyIndex::new(keys.clone());
1379 let data_frame =
1380 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1381
1382 let mut frame = ColumnFrame::new(index, data_frame);
1383 assert!(frame
1384 .push(data_value::stdhashmap!(
1385 "a" => DataValue::U32(2),
1386 "b" => DataValue::I32(3),
1387 "c" => DataValue::I64(4),
1388 "d" => DataValue::U64(5)
1389 ))
1390 .is_ok());
1391 let arr = Array2::from_shape_vec(
1392 (3, 2),
1393 vec![
1394 DataValue::U32(1),
1395 DataValue::I32(2),
1396 DataValue::U32(12),
1397 DataValue::I32(22),
1398 DataValue::U32(2),
1399 DataValue::I32(3),
1400 ],
1401 )
1402 .expect("BUG: cannot create array");
1403 trace!("{arr:?}");
1404 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1405 let result = frame.push(data_value::stdhashmap!(
1406 "a" => DataValue::U32(34),
1407 "b" => DataValue::I32(44),
1408 "c" => DataValue::I64(54),
1409 "e" => DataValue::F32(6f32)
1410 ));
1411 assert!(result.is_ok(), "{result:?}");
1412 let arr = Array2::from_shape_vec(
1413 (4, 2),
1414 vec![
1415 DataValue::U64(4),
1416 DataValue::Null,
1417 DataValue::U64(42),
1418 DataValue::Null,
1419 DataValue::U64(5),
1420 DataValue::Null,
1421 DataValue::Null,
1422 DataValue::F32(6f32),
1423 ],
1424 )
1425 .expect("BUG: cannot create array");
1426 trace!("{arr:?}");
1427 assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1428 }
1429
1430 #[rstest]
1431 #[case(
1432 column_frame! {
1433 "group_id" => vec![1, 2],
1434 "feed_tag" => vec![3, 4]
1435 },
1436 Some(vec![Key::from("group_id")]),
1437 ndarray::array!([1.into()], [2.into()])
1438 )]
1439 #[case(
1440 column_frame! {
1441 "group_id" => vec![1, 2],
1442 "feed_tag" => vec![3, 4]
1443 },
1444 Some(vec!["group_id".into(), "feed_tag".into()]),
1445 ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1446 )]
1447 #[case(
1448 column_frame! {
1449 "group_id" => vec![1, 2],
1450 "feed_tag" => vec![3, DataValue::Null]
1451 },
1452 Some(vec!["feed_tag".into()]),
1453 ndarray::array![[3.into()], [DataValue::Null]]
1454 )]
1455 #[case(
1456 column_frame! {
1457 "group_id" => vec![1, 2],
1458 "feed_tag" => vec![1, DataValue::Null]
1459 },
1460 Some(vec!["feed_tag2".into()]),
1461 Array2::<DataValue>::default((0, 0))
1462 )]
1463 #[traced_test]
1464 fn test_select(
1465 #[case] input: ColumnFrame,
1466 #[case] keys: Option<Vec<Key>>,
1467 #[case] expected: Array2<DataValue>,
1468 ) {
1469 trace!("input={input:?}");
1470 let keys_slice = keys.as_deref();
1471 let selected = input.select(keys_slice);
1472 trace!("selected={selected:?}");
1473 assert_eq!(selected, expected);
1474 let selected = input.select_transposed(keys_slice);
1475 trace!("selected_transposed={selected:?}");
1476 assert!(selected.is_ok());
1477 assert_eq!(selected.unwrap(), expected.t());
1478 }
1479
1480 #[rstest]
1481 #[case(
1482 column_frame! {
1483 "group_id" => vec![1, 2],
1484 "feed_tag" => vec![3, 4]
1485 },
1486 Key::from("group_id"),
1487 Some(ndarray::array!(1.into(), 2.into()))
1488 )]
1489 #[case(
1490 column_frame! {
1491 "group_id" => vec![1, 2, 5, 6],
1492 "feed_tag" => vec![3, 4, 7, 8]
1493 },
1494 Key::from("group_id"),
1495 Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1496 )]
1497 #[case(
1498 column_frame! {
1499 "group_id" => vec![1, 2],
1500 "feed_tag" => vec![1, 1]
1501 },
1502 Key::from("feed_tag1"),
1503 None
1504 )]
1505 #[traced_test]
1506 fn test_select_column(
1507 #[case] input: ColumnFrame,
1508 #[case] key: Key,
1509 #[case] expected: Option<Array1<DataValue>>,
1510 ) {
1511 let selected = input.select_column(&key);
1512 trace!("selected={selected:?}");
1513 match expected {
1514 Some(expected) => {
1515 assert!(selected.is_some());
1516 assert_eq!(selected.expect("BUG: checked above"), expected);
1517 }
1518 None => assert!(selected.is_none()),
1519 }
1520 }
1521
1522 #[test]
1523 #[traced_test]
1524 fn empty_join_test() {
1525 let join = JoinRelation::add_columns();
1526 let mut column_frame = ColumnFrame::default();
1527 column_frame
1528 .add_single_column("group_id", Array1::from_vec(vec![]))
1529 .expect("BUG: cannot add column");
1530 let column_frame2 = column_frame! {
1531 "group_id" => vec![2, 1, 3],
1532 "feed_tag" => vec![1, 1, 1],
1533 "clicks" => vec![100, 10, 10],
1534 "imps" => vec![1000, 200, 200]
1535 };
1536 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1537
1538 let joined = column_frame.join(column_frame2, &join);
1539 assert!(joined.is_ok(), "{joined:?}");
1540
1541 trace!("{column_frame:?}");
1542 assert_eq!(
1543 column_frame.select(Some(&[
1544 "group_id".into(),
1545 "feed_tag".into(),
1546 "clicks".into(),
1547 "imps".into()
1548 ])),
1549 ndarray::array!(
1550 [2.into(), 1.into(), 100.into(), 1000.into()],
1551 [1.into(), 1.into(), 10.into(), 200.into()],
1552 [3.into(), 1.into(), 10.into(), 200.into()],
1553 )
1554 );
1555
1556 let mut column_frame2 = column_frame! {
1557 "feed_tag" => vec![1, 1, 1],
1558 "clicks" => vec![100, 10, 10],
1559 "imps" => vec![1000, 200, 200]
1560 };
1561 let mut column_frame = ColumnFrame::default();
1562 column_frame
1563 .add_single_column("group_id", Array1::from_vec(vec![]))
1564 .expect("BUG: cannot add column");
1565 let joined = column_frame2.join(column_frame, &join);
1566 assert!(joined.is_ok(), "{joined:?}");
1567
1568 trace!("{column_frame2:?}");
1569 assert_eq!(
1570 column_frame2.select(Some(&[
1571 "group_id".into(),
1572 "feed_tag".into(),
1573 "clicks".into(),
1574 "imps".into()
1575 ])),
1576 ndarray::array!(
1577 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1578 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1579 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1580 )
1581 );
1582
1583 let mut column_frame = ColumnFrame::default();
1584 column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1585 let joined = column_frame2.join(column_frame, &join);
1586 assert!(joined.is_ok(), "{joined:?}");
1587
1588 trace!("{column_frame2:?}");
1589 assert_eq!(
1590 column_frame2.select(Some(&[
1591 "group_id2".into(),
1592 "feed_tag".into(),
1593 "clicks".into(),
1594 "imps".into()
1595 ])),
1596 ndarray::array!(
1597 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1598 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1599 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1600 )
1601 );
1602 }
1603
1604 #[test]
1605 #[traced_test]
1606 fn join_test() {
1607 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1608 "group_id".into(),
1609 "feed_tag".into(),
1610 ])));
1611 let mut column_frame = column_frame! {
1612 "group_id" => vec![1, 2, 8],
1613 "feed_tag" => vec![1, 1, 10]
1614 };
1615 let column_frame2 = column_frame! {
1616 "group_id" => vec![2, 1, 3],
1617 "feed_tag" => vec![1, 1, 1],
1618 "clicks" => vec![100, 10, 10],
1619 "imps" => vec![1000, 200, 200]
1620 };
1621 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1622
1623 let joined = column_frame.join(column_frame2, &join);
1624 assert!(joined.is_ok(), "{joined:?}");
1625
1626 trace!("{column_frame:?}");
1627 assert_eq!(
1628 column_frame.select(Some(&[
1629 "group_id".into(),
1630 "feed_tag".into(),
1631 "clicks".into(),
1632 "imps".into()
1633 ])),
1634 ndarray::array!(
1635 [1.into(), 1.into(), 10.into(), 200.into()],
1636 [2.into(), 1.into(), 100.into(), 1000.into()],
1637 [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1638 )
1639 )
1640 }
1641
1642 #[test]
1643 #[traced_test]
1644 fn join_test_with_additional() {
1645 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1646 "group_id".into(),
1647 "feed_tag".into(),
1648 ])));
1649 let mut column_frame = column_frame! {
1650 "group_id" => vec![1, 2, 8],
1651 "feed_tag" => vec![1, 1, 10],
1652 "clicked" => vec![0, 0, 1]
1653 };
1654 let column_frame2 = column_frame! {
1655 "group_id" => vec![2, 1, 3],
1656 "feed_tag" => vec![1, 1, 1],
1657 "clicks" => vec![100, 10, 10],
1658 "imps" => vec![1000, 200, 200]
1659 };
1660 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1661
1662 let joined = column_frame.join(column_frame2, &join);
1663 assert!(joined.is_ok(), "{joined:?}");
1664
1665 trace!("{column_frame:?}");
1666 assert_eq!(
1667 column_frame.select(Some(&[
1668 "group_id".into(),
1669 "feed_tag".into(),
1670 "clicks".into(),
1671 "imps".into(),
1672 "clicked".into()
1673 ])),
1674 ndarray::array!(
1675 [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1676 [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1677 [
1678 8.into(),
1679 10.into(),
1680 DataValue::Null,
1681 DataValue::Null,
1682 1.into()
1683 ]
1684 )
1685 )
1686 }
1687
1688 #[test]
1689 #[traced_test]
1690 fn join_test_with_additional_single() {
1691 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1692 "group_id".into(),
1693 "feed_tag".into(),
1694 ])));
1695 let mut column_frame = column_frame! {
1696 "group_id" => vec![1, 2, 8],
1697 "feed_tag" => vec![1, 1, 10],
1698 "clicked" => vec![0, 0, 1]
1699 };
1700 let column_frame2 = column_frame! {
1701 "a" => vec![1],
1702 "group_id" => vec![2],
1703 "feed_tag" => vec![1],
1704 "clicks" => vec![10],
1705 "imps" => vec![200]
1706 };
1707 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1708
1709 let joined = column_frame.join(column_frame2, &join);
1710 assert!(joined.is_ok(), "{joined:?}");
1711
1712 trace!("{column_frame:?}");
1713 assert_eq!(
1714 column_frame.select(Some(&[
1715 "group_id".into(),
1716 "feed_tag".into(),
1717 "clicks".into(),
1718 "imps".into(),
1719 "clicked".into()
1720 ])),
1721 ndarray::array!(
1722 [
1723 1.into(),
1724 1.into(),
1725 DataValue::Null,
1726 DataValue::Null,
1727 0.into(),
1728 ],
1729 [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1730 [
1731 8.into(),
1732 10.into(),
1733 DataValue::Null,
1734 DataValue::Null,
1735 1.into()
1736 ]
1737 )
1738 )
1739 }
1740
1741 #[rstest]
1742 #[traced_test]
1743 fn cartesian_product_join() {
1744 let mut df = column_frame! {
1745 "group_id" => vec![1, 2, 3],
1746 "feed_tag" => vec![1, 2, 3]
1747 };
1748 let df2 = column_frame! {
1749 "zone_id" => vec![111111, 111133],
1750 "zone_avg_ctr" => vec![0.1, 0.001]
1751 };
1752 assert!(df
1753 .join(
1754 ColumnFrame::default(),
1755 &JoinRelation::new(JoinBy::CartesianProduct)
1756 )
1757 .is_ok());
1758 let join = JoinRelation::new(JoinBy::CartesianProduct);
1759 let result = df.join(df2, &join);
1760 assert!(result.is_ok(), "{result:?}");
1761 let selected = df.select(None);
1762 trace!("{selected:?}");
1763 assert_eq!(
1764 selected,
1765 ndarray::array!(
1766 [1.into(), 1.into(), 111111.into(), 0.1.into()],
1767 [1.into(), 1.into(), 111133.into(), 0.001.into()],
1768 [2.into(), 2.into(), 111111.into(), 0.1.into()],
1769 [2.into(), 2.into(), 111133.into(), 0.001.into()],
1770 [3.into(), 3.into(), 111111.into(), 0.1.into()],
1771 [3.into(), 3.into(), 111133.into(), 0.001.into()],
1772 )
1773 );
1774
1775 let df2 = column_frame! {
1776 "zone_id" => vec![111]
1777 };
1778 let result = df.join(df2, &join);
1779 assert!(result.is_ok(), "{result:?}");
1780 let selected = df.select(None);
1781 trace!("{selected:?}");
1782 assert_eq!(
1783 selected,
1784 ndarray::array!(
1785 [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
1786 [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
1787 [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
1788 [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
1789 [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
1790 [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
1791 )
1792 );
1793 }
1794
1795 #[rstest]
1796 #[traced_test]
1797 fn broadcast_join() {
1798 let mut df = column_frame! {
1799 "group_id" => vec![1, 2, 3],
1800 "feed_tag" => vec![1, 2, 3]
1801 };
1802 let df2 = column_frame! {
1803 "zone_id" => vec![111111]
1804 };
1805 assert!(df
1806 .join(
1807 ColumnFrame::default(),
1808 &JoinRelation::new(JoinBy::Broadcast)
1809 )
1810 .is_ok());
1811 let join = JoinRelation::new(JoinBy::Broadcast);
1812 assert!(df.join(df2, &join).is_ok());
1813 let selected = df.select(None);
1814 trace!("{selected:?}");
1815 assert_eq!(
1816 selected,
1817 ndarray::array!(
1818 [1.into(), 1.into(), 111111.into()],
1819 [2.into(), 2.into(), 111111.into()],
1820 [3.into(), 3.into(), 111111.into()]
1821 )
1822 );
1823 }
1824 #[rstest]
1825 #[traced_test]
1826 fn merge_test() {
1827 let mut df = column_frame! {
1828 "group_id" => vec![1, 2, 3],
1829 "feed_tag" => vec![1, 2, 3]
1830 };
1831 let df2 = column_frame! {
1832 "group_id" => vec![11, 21, 31],
1833 "feed_tag" => vec![12, 22, 32]
1834 };
1835
1836 let join = JoinRelation::new(JoinBy::Replace);
1837 assert!(df.join(df2, &join).is_ok());
1838 let selected = df.select(None);
1839 trace!("{selected:?}");
1840 assert_eq!(
1841 selected,
1842 ndarray::array!(
1843 [11.into(), 12.into()],
1844 [21.into(), 22.into()],
1845 [31.into(), 32.into()]
1846 )
1847 );
1848 }
1849
1850 #[rstest]
1851 #[traced_test]
1852 fn extend_test() {
1853 let mut df = column_frame! {
1854 "group_id" => vec![1, 2, 3],
1855 "feed_tag" => vec![1, 2, 3]
1856 };
1857 let df2 = column_frame! {
1858 "group_id" => vec![11, 21, 31],
1859 "feed_tag" => vec![5, 6, 7]
1860 };
1861 assert!(df
1862 .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
1863 .is_ok());
1864
1865 let join = JoinRelation::new(JoinBy::Extend);
1866 assert!(df.join(df2, &join).is_ok());
1867 let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
1868 trace!("{selected:?}");
1869 assert_eq!(
1870 selected,
1871 ndarray::array!(
1872 [1.into(), 1.into()],
1873 [2.into(), 2.into()],
1874 [3.into(), 3.into()],
1875 [5.into(), 11.into()],
1876 [6.into(), 21.into()],
1877 [7.into(), 31.into()]
1878 )
1879 );
1880 let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
1881 trace!("{as_map:?}");
1882 assert_eq!(
1883 as_map,
1884 stdhashmap!(
1885 "feed_tag" => vec![1, 2, 3, 5, 6, 7],
1886 "group_id" => vec![1, 2, 3, 11, 21, 31]
1887 )
1888 );
1889
1890 let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
1891 trace!("{as_map:?}");
1892 assert_eq!(as_map, HashMap::default());
1893 }
1894
1895 #[rstest]
1896 #[traced_test]
1897 fn extend_test_with_non_existing_cols() {
1898 let mut df = column_frame! {
1899 "group_id" => vec![1, 2, 3],
1900 "feed_tag" => vec![1, 2, 3]
1901 };
1902 let mut df2 = column_frame! {
1903 "group_id" => vec![11, 21, 31],
1904 "feed_tag" => vec![5, 6, 7],
1905 "clicks" => vec![100, 200, 300],
1906 "impressions" => vec![1000, 2000, 3000]
1907 };
1908 let df_bckp = df.clone();
1909 let join = JoinRelation::new(JoinBy::Extend);
1910 assert!(df.join(df2.clone(), &join).is_ok());
1911 let selected = df.select(None);
1912 trace!("{selected:?}");
1913 assert_eq!(
1914 selected,
1915 ndarray::array!(
1916 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
1917 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
1918 [3.into(), 3.into(), DataValue::Null, DataValue::Null],
1919 [11.into(), 5.into(), 100.into(), 1000.into()],
1920 [21.into(), 6.into(), 200.into(), 2000.into()],
1921 [31.into(), 7.into(), 300.into(), 3000.into()]
1922 )
1923 );
1924 let join = JoinRelation::new(JoinBy::Extend);
1925 let r = df2.join(df_bckp, &join);
1926 assert!(r.is_ok(), "{r:?}");
1927 let selected = df2.select(None);
1928 trace!("{selected:?}");
1929 assert_eq!(
1930 selected,
1931 ndarray::array!(
1932 [11.into(), 5.into(), 100.into(), 1000.into()],
1933 [21.into(), 6.into(), 200.into(), 2000.into()],
1934 [31.into(), 7.into(), 300.into(), 3000.into()],
1935 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
1936 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
1937 [3.into(), 3.into(), DataValue::Null, DataValue::Null]
1938 )
1939 );
1940 }
1941
1942 #[rstest]
1943 #[traced_test]
1944 fn extend_test_with_non_existing_cols_wrong_order() {
1945 let mut df = column_frame! {
1946 "group_id" => vec![1, 2, 3],
1947 "feed_tag" => vec![1, 2, 3]
1948 };
1949 let df2 = column_frame! {
1950 "feed_tag" => vec![5, 6, 7],
1951 "group_id" => vec![11, 21, 31]
1952 };
1953 let join = JoinRelation::new(JoinBy::Extend);
1954 let err = df.join(df2, &join);
1955 assert!(err.is_ok(), "{err:?}");
1956 }
1957
1958 #[rstest]
1959 #[traced_test]
1960 fn test_replace_not_compatible() {
1961 let mut df = column_frame! {
1962 "group_id" => vec![1, 2, 3],
1963 "feed_tag" => vec![1, 2, 3]
1964 };
1965 let df2 = column_frame! {
1966 "feed_tag" => vec![5, 6],
1967 "group_id" => vec![11, 21]
1968 };
1969 let join = JoinRelation::new(JoinBy::Replace);
1970 let err = df.join(df2, &join);
1971 assert!(err.is_err(), "{err:?}");
1972 let empty = ColumnFrame::default();
1973 let err = df.join(empty, &join);
1974 assert!(err.is_ok(), "{err:?}");
1975 }
1976
1977 #[rstest]
1978 #[traced_test]
1979 fn test_different_data() {
1980 let mut df = column_frame! {
1981 "group_id" => vec![1, 2, 3],
1982 "feed_tag" => vec![1, 2, 3]
1983 };
1984 let df2 = column_frame! {
1985 "group_id" => vec![11, 21],
1986 "a" => vec![5, 6]
1987 };
1988 let join = JoinRelation::new(JoinBy::Extend);
1989 let err = df.join(df2, &join);
1990 assert!(err.is_ok(), "{err:?}");
1991 println!("{df:?}");
1992 let expected_df = ColumnFrame::new(
1993 KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
1994 ndarray::array!(
1995 [1.into(), 1.into(), DataValue::Null],
1996 [2.into(), 2.into(), DataValue::Null],
1997 [3.into(), 3.into(), DataValue::Null],
1998 [11.into(), DataValue::Null, 5.into()],
1999 [21.into(), DataValue::Null, 6.into()]
2000 ),
2001 );
2002 assert_eq!(df, expected_df)
2003 }
2004
2005 #[rstest]
2006 #[traced_test]
2007 fn serde_column_frame() {
2008 let df = column_frame! {
2009 "group_id" => vec![1u64, 2u64, 3u64],
2010 "feed_tag" => vec![1u64, 2u64, 3u64]
2011 };
2012 let key_idx = df.index.clone();
2013 let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2014 let deserialized: KeyIndex =
2015 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2016 assert_eq!(key_idx, deserialized);
2017 assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2018 let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2019 let deserialized: ColumnFrame =
2020 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2021 assert_eq!(df, deserialized);
2022 }
2023
2024 #[rstest]
2025 #[traced_test]
2026 fn update_value() {
2027 let mut df = column_frame! {
2028 "group_id" => vec![1, 2, 3],
2029 "feed_tag" => vec![1, 2, 3]
2030 };
2031 let group_id: Key = "group_id".into();
2032 let v = df.get_mut_by_row_index(&group_id, 1);
2033 assert!(v.is_some());
2034 let v = v.unwrap();
2035 assert_eq!(v, &DataValue::I32(2));
2036 *v = DataValue::U64(22);
2037 let v = df.get_by_row_index(&group_id, 1);
2038 assert!(v.is_some());
2039 let v = v.unwrap();
2040 assert_eq!(v, &DataValue::U64(22));
2041
2042 assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2043 }
2044}