1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
22pub struct ColumnFrame {
23 pub index: KeyIndex,
24 pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28 Continue,
29 End,
30}
31
32impl Continue {
33 pub fn should_end(&self) -> bool {
34 matches!(self, Self::End)
35 }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "\n|")?;
44
45 for key in &self.index.keys {
46 write!(f, " {key} |")?;
47 }
48
49 if self.index.is_empty() {
50 writeln!(f, "|")?;
51 }
52
53 if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55 write!(f, "\n|")?;
56 for value in row.iter() {
57 write!(f, " {:10?} |", crate::detect_dtype(value))?;
59 }
60 writeln!(f)?;
61 }
62
63 writeln!(f, "---")?;
64
65 for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67 write!(f, "|")?;
68
69 for value in row.iter() {
70 write!(f, " {value} |")?;
72 }
73 writeln!(f)?;
74
75 if n >= 256 {
76 writeln!(f, "... (dataframe is too long)")?;
77 break;
78 }
79 }
80
81 writeln!(f, "---")
82 }
83}
84
85impl ColumnFrame {
86 pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
87 Self {
88 index: index.into(),
89 data_frame,
90 }
91 }
92
93 pub fn keys(&self) -> &[Key] {
94 self.index.get_keys()
95 }
96
97 pub fn len(&self) -> usize {
98 self.data_frame.nrows()
99 }
100
101 pub fn is_empty(&self) -> bool {
102 self.data_frame.nrows() == 0
103 }
104
105 pub fn shrink(&mut self) {
106 }
108
109 pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
113 let mut keys = self.index.keys.clone();
114 for key in keys.iter_mut() {
115 if !force && matches!(key.ctype, crate::DataType::Unknown) {
116 let column = self
117 .get_single_column(key)
118 .ok_or_else(|| Error::EmptyData)?;
119 let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
120 key.ctype = dtype;
121 } else if force {
122 let column = self
123 .get_single_column(key)
124 .ok_or_else(|| Error::EmptyData)?;
125 let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
126 key.ctype = dtype;
127 }
128 }
129 self.index.keys = keys;
130
131 Ok(())
132 }
133 pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
134 let mut errors = vec![];
135 let keys = self.index.keys.clone();
136 for key in keys {
137 tracing::trace!("key: {key:?}- {:?}", key.ctype);
138 if let Err(e) = self.try_fix_column_by_key(&key) {
139 errors.push((key, e.to_string()));
140 }
141 }
142 if errors.is_empty() {
143 Ok(())
144 } else {
145 Err(Error::CastFailed(errors))
146 }
147 }
148
149 pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
150 let idx = self
151 .index
152 .get_column_index(key)
153 .ok_or(Error::MissingField(format!("{key}").into()))?;
154 let mut col = self.data_frame.column_mut(idx);
155
156 col.mapv_inplace(|item| {
157 let x = &item;
158 match key.ctype {
159 crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
160 crate::DataType::U32 => DataValue::U32(u32::extract(x)),
161 crate::DataType::I32 => DataValue::I32(i32::extract(x)),
162 crate::DataType::U64 => DataValue::U64(u64::extract(x)),
163 crate::DataType::I64 => DataValue::I64(i64::extract(x)),
164 crate::DataType::F32 => DataValue::F32(f32::extract(x)),
165 crate::DataType::F64 => DataValue::F64(f64::extract(x)),
166 crate::DataType::U8 => DataValue::U8(u8::extract(x)),
167 crate::DataType::String => DataValue::String(String::extract(x).into()),
168 crate::DataType::Bytes => item,
169 crate::DataType::Map => item,
170 crate::DataType::Vec => item,
171 crate::DataType::Unknown => item,
172 }
173 });
174 Ok(())
175 }
176
177 pub fn get_mut_view(&mut self) -> ArrayViewMut2<'_, DataValue> {
178 self.data_frame.view_mut()
179 }
180
181 pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
182 self.index.rename_key(old, new)
183 }
184
185 pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
186 self.index.add_alias(key, alias)
187 }
188
189 pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
195 let selected = self.select(Some(keys));
196 let mut result = Vec::with_capacity(selected.nrows());
197 for row in selected.rows() {
198 let mut r = Vec::with_capacity(selected.ncols());
199 for value in row.iter() {
200 r.push(D::extract(value));
201 }
202 result.push(r);
203 }
204 result
205 }
206
207 pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
213 let keys = keys.unwrap_or_else(|| self.index.get_keys());
214 let key_indexes = self.index.select(keys);
215 if key_indexes.is_empty() {
216 return Ok(Array2::default((0, 0)));
217 }
218 let data_vec: Vec<Array1<DataValue>> = key_indexes
219 .indexes()
220 .iter()
221 .map(|x| self.data_frame.column(*x).to_owned())
222 .collect();
223 to_array2(data_vec)
224 }
225
226 pub fn select_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
230 self.index
231 .get_column_index(key)
232 .map(|x| self.data_frame.column(x))
233 }
234
235 pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
236 where
237 F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
238 {
239 func(keys, self)
240 }
241
242 pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
247 if row_index >= self.data_frame.nrows() {
248 return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
249 }
250 let Some(column_index) = self.index.get_column_index(column) else {
251 return Err(Error::NotFound(column.clone()));
252 };
253 Ok(column_index)
254 }
255
256 pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
260 trace!(
261 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
262 self.data_frame.len(),
263 self.data_frame.nrows()
264 );
265 trace!("{:?}", self.data_frame);
266 match self.validate_entry_access(column, row_index) {
267 Ok(column_index) => self.data_frame.get((row_index, column_index)),
268 Err(e) => {
269 trace!("Error: {e}");
270 None
271 }
272 }
273 }
274
275 pub fn get_mut_by_row_index(
279 &mut self,
280 column: &Key,
281 row_index: usize,
282 ) -> Option<&mut DataValue> {
283 trace!(
284 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
285 self.data_frame.len(),
286 self.data_frame.nrows()
287 );
288 trace!("{:?}", self.data_frame);
289 match self.validate_entry_access(column, row_index) {
290 Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
291 Err(e) => {
292 trace!("Error: {e}");
293 None
294 }
295 }
296 }
297
298 pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
303 let keys = keys.unwrap_or_else(|| self.index.get_keys());
304 let indexes = self.index.select(keys);
305 if indexes.is_empty() {
306 return Default::default();
307 }
308
309 let mut new_data_frame = HashMap::with_capacity(keys.len());
310
311 for key in keys.iter() {
312 if let Some(column_index_in_source) = indexes.get_column_index(key) {
313 let column = self.data_frame.column(column_index_in_source);
314 new_data_frame.insert(key.clone(), column.to_vec());
315 }
316 }
317
318 new_data_frame
319 }
320
321 pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
326 let keys = keys.unwrap_or_else(|| self.index.get_keys());
327 let indexes = self.index.select(keys);
328 if indexes.is_empty() {
329 return Array2::default((0, 0));
330 }
331 let mut new_data_frame = Array2::default((self.data_frame.nrows(), keys.len()));
332
333 for (idx, key) in keys.iter().enumerate() {
334 if let Some(column_index_in_source) = indexes.get_column_index(key) {
335 new_data_frame
336 .column_mut(idx)
337 .assign(&self.data_frame.column(column_index_in_source));
338 }
339 }
340
341 new_data_frame
342 }
343
344 fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
345 self.index.store_key(key);
346 let len = self.data_frame.nrows();
347 self.data_frame.push_column(Array1::default(len).view())?;
348 Ok(())
349 }
350
351 pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
355 let mut arr = vec![];
356 for key in row_candidate.keys() {
357 if self.index.get_column_index(&key).is_none() {
358 self.extend_dataframe_for_column(key)?;
359 }
360 }
361 for index in self.index.get_keys() {
362 if let Some(value) = row_candidate.get_value_ref(index) {
363 arr.push(value.clone());
364 } else {
365 arr.push(DataValue::Null);
366 }
367 }
368 self.data_frame.push_row(Array::from_vec(arr).view())?;
369 Ok(())
370 }
371
372 pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
373 let mut indexes = KeyIndex::default();
375 let data = self.select(Some(keys));
377 for key in keys {
379 if let Some((current, _idx)) = self.index.remove_key(key) {
380 indexes.store_key(current);
381 }
382 }
383 let rest = self.select(Some(self.keys()));
385 let keys = self.index.get_keys().to_vec();
386 self.data_frame = rest;
387 self.index = KeyIndex::new(keys);
388
389 Ok(Self::new(indexes, data))
391 }
392
393 fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
394 if self.index.is_empty() {
395 self.index = other.index.clone();
396 self.data_frame = other.data_frame.clone();
397 return Ok(Continue::End);
398 }
399 if other.index.is_empty() {
400 return Ok(Continue::End);
401 }
402 if self.is_empty() {
403 self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
404 }
405
406 Ok(Continue::Continue)
407 }
408
409 fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
410 for key in other.index.get_keys() {
411 if self.index.get_column_index(key).is_none() {
412 self.extend_dataframe_for_column(key.clone())?;
413 }
414 }
415 Ok(())
416 }
417
418 fn try_extend(&mut self, other: Self) -> Result<(), Error> {
419 let mut joined_keys = self.index.clone();
420 for key in other.keys() {
422 if self.index.get_column_index(key).is_none() {
423 joined_keys.store_key(key.clone());
424 }
425 }
426
427 let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
428 let mut arr = Array2::default((sum_len, joined_keys.len()));
429 let increment = self.data_frame.nrows();
430 for key in joined_keys.get_keys() {
431 let index_result = joined_keys
432 .get_column_index(key)
433 .expect("BUG: index for this has to be defined");
434 if let Some(index) = self.index.get_column_index(key) {
435 for (idx, value) in self.data_frame.column(index).iter().enumerate() {
436 if let Some(x) = arr.get_mut((idx, index_result)) {
437 *x = value.to_owned();
438 }
439 }
440 }
441
442 if let Some(index) = other.index.get_column_index(key) {
443 for (idx, value) in other.data_frame.column(index).iter().enumerate() {
444 if let Some(x) = arr.get_mut((increment + idx, index_result)) {
445 *x = value.to_owned();
446 }
447 }
448 }
449 }
450 *self = ColumnFrame::new(joined_keys, arr);
451 Ok(())
452 }
453
454 pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
462 if self.check_or_init_frame(&other)?.should_end() {
463 return Ok(());
464 }
465
466 if self.index.check_order_of_indexes(&other.index).is_err() {
467 return self.try_extend(other);
468 }
469
470 trace!(
471 "Extend columns from other {:?} vs {:?}",
472 other.index.get_keys(),
473 self.index.get_keys()
474 );
475
476 if other.data_frame.ncols() < self.data_frame.ncols() {
477 other.extend_columns_from_other(self)?;
478 } else {
479 self.extend_columns_from_other(&other)?;
480 }
481 self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
482
483 Ok(())
484 }
485
486 pub fn replace(&mut self, other: Self) -> Result<(), Error> {
492 if self.check_or_init_frame(&other)?.should_end() {
493 return Ok(());
494 }
495
496 if self.data_frame.len() > other.data_frame.len() {
497 return Err(Error::DataSetSizeDoesntMatch(
498 self.data_frame.len(),
499 other.data_frame.len(),
500 ));
501 }
502 self.index = other.index;
503 self.data_frame = other.data_frame;
504
505 Ok(())
506 }
507
508 pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
511 if self.check_or_init_frame(&right)?.should_end() {
512 return Ok(());
513 }
514 let new_columnns = right.index.get_complement_keys(self.index.get_keys());
515 self.extend_columns_from_other(&right)?;
517 let index = Index::new(keys.to_vec(), self);
519 tracing::trace!("Index {index:?}");
520 let right_indexes = right.index.select(keys);
521
522 let mut new_df = Array2::default((self.len(), self.index.len()));
523 new_df.assign(&self.data_frame);
524 trace!("Indexes: {right_indexes:?}");
525 trace!("New DF: {new_df:?}");
526 trace!("Right DF: {:?}", right.data_frame);
527 trace!("current {:?}", self.data_frame);
528 for c_row in right.data_frame.rows() {
532 let values = right_indexes
533 .get_keys()
534 .iter()
535 .map(|k| {
536 c_row
537 .get(
538 right_indexes
539 .get_column_index(k)
540 .expect("BUG: Something is very wrong"),
541 )
542 .cloned()
543 .unwrap_or_default()
544 })
545 .collect::<Vec<_>>();
546 if let Some(row_index) = index.get(&values) {
547 trace!("Index: {row_index} {c_row:?} -> {values:?} vs {new_df:?}");
548 for complement_key in new_columnns.iter() {
549 let column_index = self
550 .index
551 .get_column_index(complement_key)
552 .expect("BUG: Something is very wrong");
553 if let Some(right_column_index) = right.index.get_column_index(complement_key) {
554 trace!("Filling Index: [{complement_key:?}] ri:{row_index} rci:{right_column_index:?} -> {:?} vs {:?}", c_row[right_column_index], new_df.get_mut((row_index, column_index)));
555 if let Some(v) = new_df.get_mut((row_index, column_index)) {
556 *v = c_row[right_column_index].to_owned();
557 }
558 }
559 }
560 }
561 }
562
563 self.data_frame = new_df;
564
565 Ok(())
566 }
567
568 pub fn add_single_column<K: Into<Key>>(
572 &mut self,
573 key: K,
574 column: Array1<DataValue>,
575 ) -> Result<(), Error> {
576 let key = key.into();
577 if self.index.get_column_index(&key).is_some() {
578 return Err(Error::ColumnAlreadyExists(key));
579 }
580 if self.len() != column.len() && !self.is_empty() {
581 return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
582 }
583
584 self.index.store_key(key.clone());
585 let rows = column.len();
586 let column_index = self
587 .index
588 .get_column_index(&key)
589 .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
590 if self.is_empty() && self.index.len() == 1 {
591 self.data_frame = column.into_shape_clone((rows, 1))?;
592 assert_eq!(self.data_frame.column(column_index).len(), rows);
593 } else if self.is_empty() {
594 self.data_frame = Array2::default((column.len(), self.index.len() - 1));
595 self.data_frame.push_column(column.view())?;
596 assert_eq!(self.data_frame.column(column_index).len(), rows);
597 } else {
598 self.data_frame.push_column(column.view())?;
599 }
600 assert_eq!(self.data_frame.column(column_index).len(), rows);
601
602 Ok(())
603 }
604 pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
608 if self.check_or_init_frame(&other)?.should_end() {
609 return Ok(());
610 }
611
612 self.extend_columns_from_other(&other)?;
613 for (idx, key) in other.index.get_keys().iter().enumerate() {
614 if let Some(index) = self.index.get_column_index(key) {
615 trace!("Other array = {:?}", other.data_frame.dim());
616 if other.data_frame.dim() == (0, 0) {
617 self.data_frame.column_mut(index).fill(DataValue::Null);
618 continue;
619 }
620 let arr = other.data_frame.column(idx);
621 trace!(
622 "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
623 self.data_frame.nrows(),
624 arr.len()
625 );
626 if arr.len() != self.data_frame.nrows() {
627 self.data_frame.column_mut(index).fill(DataValue::Null);
628 } else {
629 self.data_frame.column_mut(index).assign(&arr);
630 }
631 }
632 }
633 Ok(())
634 }
635
636 pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
641 if self.check_or_init_frame(&other)?.should_end() {
642 return Ok(());
643 }
644 if other.data_frame.nrows() != 1 {
645 return Err(Error::CannotBroadcast);
646 }
647 self.extend_columns_from_other(&other)?;
648 let mut new_df = Array2::default((self.len(), self.index.len()));
649 for (idx, key) in self.index.get_keys().iter().enumerate() {
650 if let Some(other_idx) = other.index.get_column_index(key) {
651 new_df
652 .column_mut(idx)
653 .assign(&other.data_frame.column(other_idx));
654 } else {
655 new_df.column_mut(idx).assign(&self.data_frame.column(idx));
656 }
657 }
658 self.data_frame = new_df;
659 Ok(())
660 }
661
662 pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
667 if self.check_or_init_frame(&other)?.should_end() {
668 return Ok(());
669 }
670 for other_key in other.keys() {
673 if self.index.get_column_index(other_key).is_none() {
674 self.index.store_key(other_key.clone());
675 } else {
676 self.index.store_key(Key::new(
677 format!("{}-{}", other_key, other_key.id()).as_str(),
678 other_key.ctype,
679 ));
680 }
681 }
682 let max_rows = self.len() * other.len();
683 let ncols = self.index.len();
684 let mut new_df = Array2::default((max_rows, ncols));
686
687 let mut cur_idx = 0;
688 for cur_row in self.data_frame.rows() {
689 for other_row in other.data_frame.rows() {
690 new_df
691 .slice_mut(s![cur_idx, ..])
692 .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
693 cur_idx += 1;
694 }
695 }
696 self.data_frame = new_df;
697 Ok(())
698 }
699
700 pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
708 use JoinBy::*;
709 match &join_type.join_type {
710 AddColumns => self.add_columns(right),
711 Replace => self.replace(right),
712 Extend => self.extend(right),
713 Broadcast => self.broadcast(right),
714 CartesianProduct => self.cartesian_product(right),
715 JoinById(join) => self.join_by_id_inner(right, &join.keys),
716 }
717 }
718
719 pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
720 self.index
721 .get_column_index(key)
722 .map(|x| self.data_frame.column(x))
723 }
724
725 pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
726 let index = self
727 .index
728 .get_column_index(key)
729 .ok_or(Error::NotFound(key.clone()))?;
730 let column = self.data_frame.column(index);
731 let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
732 tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
733 data_with_index.sort_by(|(a_idx, a_val), (b_idx, b_val)| {
734 a_val
735 .partial_cmp(b_val)
736 .unwrap_or(std::cmp::Ordering::Equal)
737 .then_with(|| a_idx.cmp(b_idx))
738 });
739
740 tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
741 let indicies = data_with_index
742 .into_iter()
743 .map(|(idx, _)| idx)
744 .collect::<Vec<_>>();
745
746 Ok(sorted_df::SortedDataFrame::new(self, indicies))
747 }
748
749 pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
750 let mut final_indices = Vec::new();
751 let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
752 for rule in &filter.rules {
753 final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
754 }
755
756 final_indices.sort_unstable();
757 final_indices.dedup();
758 let mut new_df = ColumnFrame::new(
759 self.index.clone(),
760 Array2::default((final_indices.len(), self.index.len())),
761 );
762 final_indices
763 .iter()
764 .enumerate()
765 .for_each(|(cur_idx, row_idx)| {
766 new_df
767 .data_frame
768 .slice_mut(s![cur_idx, ..])
769 .assign(&self.data_frame.slice(s![*row_idx, ..]));
770 });
771
772 Ok(new_df)
773 }
774}
775
776pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
777 let width = source.len();
778 let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
779 let height = flattened.len() / width;
780 Ok(flattened.into_shape_with_order((width, height))?)
781}
782#[macro_export]
783macro_rules! df {
784 ($($everything:tt)*) => {
785 $crate::DataFrame::new($crate::column_frame!($($everything)*))
786 };
787}
788
789#[macro_export]
790macro_rules! column_frame {
791 ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
793 ($($key:expr => vec![$($value:expr),*]),*) => {
795 $crate::column_frame!($($key => [$($value),*]),*)
796 };
797 ($($key:expr => [$($value:expr),*]),*) => {
799 {
800 let data = ::ndarray::arr2(&[$(
801 [$($value.into(),)*],
802 )*]);
803
804 let _keys = vec![$($key.into(),)*];
805
806 $crate::ColumnFrame::new(
807 $crate::KeyIndex::new(_keys),
808 data.reversed_axes()
809 )
810 }
811 };
812 ($($key:expr => $value:expr),*) => {
814 {
815 let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
816 let _keys = vec![$($key.into(),)*];
817
818 $crate::ColumnFrame::new(
819 $crate::KeyIndex::new(_keys),
820 _data,
821 )
822 }
823 };
824}
825
826#[cfg(test)]
827mod test {
828 use crate::{filter::FilterRules, JoinById};
829
830 use super::*;
831 use data_value::stdhashmap;
832 use ndarray::ArrayView;
833 use rstest::*;
834 use tracing_test::traced_test;
835
836 #[rstest]
837 #[case(
838 column_frame! {
839 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
840 "b" => [4, 5, 6],
841 "c" => [7, 8, 9]
842 },
843 column_frame! {
844 "t" => [1752001987000000u64],
845 "b" => [5],
846 "c" => [8]
847 },
848 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
849 )]
850 #[case(
851 column_frame! {
852 "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
853 "b" => [4, 5, 6],
854 "c" => [7, 8, 9]
855 },
856 column_frame! {
857 "t" => [1752001987000000f64],
858 "b" => [5],
859 "c" => [8]
860 },
861 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
862 )]
863 #[case(
864 column_frame! {
865 "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
866 "b" => [4, 5, 6],
867 "c" => [7, 8, 9]
868 },
869 column_frame! {
870 "t" => [1752001987000000i64],
871 "b" => [5],
872 "c" => [8]
873 },
874 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
875 )]
876 #[case(
877 column_frame! {
878 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
879 "b" => [4, 5, 6],
880 "c" => [7, 8, 9]
881 },
882 column_frame! {
883 "t" => [1751001987000000u64],
884 "b" => [4],
885 "c" => [7]
886 },
887 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
888 )]
889 #[case(
890 column_frame! {
891 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
892 "b" => [4, 5, 6],
893 "c" => [7, 8, 9]
894 },
895 column_frame! {
896 "t" => ["2025-07-08 18:13:07"],
897 "b" => [4],
898 "c" => [7]
899 },
900 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
901 )]
902 #[case(
903 column_frame! {
904 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
905 "b" => [4, 5, 6],
906 "c" => [7, 8, 9]
907 },
908 column_frame! {
909 "t" => [],
910 "b" => [],
911 "c" => []
912 },
913 FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
914 )]
915 #[case(
916 column_frame! {
917 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
918 "b" => [4, 5, 6],
919 "c" => [7, 8, 9]
920 },
921 column_frame! {
922 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
923 "b" => [4, 5, 6],
924 "c" => [7, 8, 9]
925 },
926 FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
927 )]
928 #[case(
929 column_frame! {
930 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
931 "b" => [4, 5, 6],
932 "c" => [7, 8, 9]
933 },
934 column_frame! {
935 "t" => [DataValue::Vec(vec![])],
936 "b" => [5],
937 "c" => [ 8]
938 },
939 FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
940 )]
941 #[case(
942 column_frame! {
943 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
944 "b" => [4, 5, 6],
945 "c" => [7, 8, 9]
946 },
947 column_frame! {
948 "t" => [DataValue::Vec(vec![1.into()])],
949 "b" => [6],
950 "c" => [9]
951 },
952 FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
953 )]
954 #[case(
955 column_frame! {
956 "a" => [1, 2, 3],
957 "b" => [4, 5, 6],
958 "c" => [7, 8, 9]
959 },
960 column_frame! {
961 "a" => [1, 2],
962 "b" => [4, 5],
963 "c" => [7, 8]
964 },
965 FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
966 )]
967 #[case(
968 column_frame! {
969 "a" => [1, 2, 3],
970 "b" => [4, 5, 6],
971 "c" => [7, 8, 9]
972 },
973 column_frame! {
974 "a" => [2],
975 "b" => [5],
976 "c" => [8]
977 },
978 FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
979 )]
980 #[case(
981 column_frame! {
982 "a" => [1, 2, 3],
983 "b" => [4, 5, 6],
984 "c" => [7, 8, 9]
985 },
986 column_frame! {
987 "a" => [],
988 "b" => [],
989 "c" => []
990 },
991 FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
992 )]
993 #[case(
994 column_frame! {
995 "a" => [1, 2, 3],
996 "b" => [4, 5, 6],
997 "c" => [7, 8, 9]
998 },
999 column_frame! {
1000 "a" => [1, 2],
1001 "b" => [4, 5],
1002 "c" => [7, 8]
1003 },
1004 FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1005 )]
1006 #[case(
1007 column_frame! {
1008 "a" => [1, 2, 3],
1009 "b" => [4, 5, 6],
1010 "c" => [7, 8, 9]
1011 },
1012 column_frame! {
1013 "a" => [2],
1014 "b" => [5],
1015 "c" => [8]
1016 },
1017 FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1018 )]
1019 #[case(
1020 column_frame! {
1021 "a" => ["abcd", "ab", "abcdefg"],
1022 "b" => [4, 5, 6],
1023 "c" => [7, 8, 9]
1024 },
1025 column_frame! {
1026 "a" => ["abcd","abcdefg"],
1027 "b" => [4, 6],
1028 "c" => [7, 9]
1029 },
1030 FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1031 )]
1032 #[case(
1033 column_frame! {
1034 "a" => [1, 2, 3],
1035 "b" => [4, 5, 6],
1036 "c" => [7, 8, 9]
1037 },
1038 column_frame! {
1039 "a" => [1],
1040 "b" => [4],
1041 "c" => [7]
1042 },
1043 FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1044 )]
1045 #[case(
1046 column_frame! {
1047 "a" => [1, 2, 3],
1048 "b" => [4, 5, 6],
1049 "c" => [7, 8, 9]
1050 },
1051 column_frame! {
1052 "a" => [2, 3],
1053 "b" => [5, 6],
1054 "c" => [8, 9]
1055 },
1056 FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1057 )]
1058 #[case(
1059 column_frame! {
1060 "a" => [1f64, 2f64, 3f64],
1061 "b" => [4, 5, 6],
1062 "c" => [7, 8, 9]
1063 },
1064 column_frame! {
1065 "a" => [1f64, 2f64],
1066 "b" => [4, 5],
1067 "c" => [7, 8]
1068 },
1069 FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1070 )]
1071 #[case(
1072 column_frame! {
1073 "a" => [1f64, 2f64, 3f64],
1074 "b" => [4i64, 5i64, 6i64],
1075 "c" => [7i64, 8i64, 9i64]
1076 },
1077 column_frame! {
1078 "a" => [1f64, 2f64],
1079 "b" => [4i64, 5i64],
1080 "c" => [7i64, 8i64]
1081 },
1082 FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1083 )]
1084 #[traced_test]
1085 fn filter_test(
1086 #[case] df: ColumnFrame,
1087 #[case] expected: ColumnFrame,
1088 #[case] filter: FilterRules,
1089 ) {
1090 let filtered = df.filter(&filter).expect("BUG: cannot filter");
1091 assert_eq!(filtered, expected);
1092 }
1093
1094 #[rstest]
1095 #[traced_test]
1096 fn test_macro() {
1097 let df = column_frame! {
1098 "a" => 1,
1099 "b" => 2,
1100 "c" => 3,
1101 "d" => 4,
1102 };
1103
1104 assert_eq!(df.len(), 1);
1105 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1106 let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1107 .expect("BUG: cannot create array");
1108 assert_eq!(df.select(None), f);
1109
1110 let df = column_frame! {
1111 "a" => [1, 2, 3],
1112 "b" => [4, 5, 6],
1113 "c" => [7, 8, 9]
1114 };
1115
1116 assert_eq!(df.len(), 3);
1117 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1118 let f = Array2::from_shape_vec(
1119 (3, 3),
1120 vec![
1121 1.into(),
1122 4.into(),
1123 7.into(),
1124 2.into(),
1125 5.into(),
1126 8.into(),
1127 3.into(),
1128 6.into(),
1129 9.into(),
1130 ],
1131 )
1132 .expect("BUG: cannot create array");
1133 let selected = df.select(None);
1134 trace!("{selected:?}");
1135 assert_eq!(selected, f);
1136
1137 let df1 = df! {
1138 "a" => [1, 2, 3],
1139 "b" => [4, 5, 6],
1140 "c" => [7, 8, 9]
1141 };
1142
1143 let formatted = format!("{}", df);
1145 debug!("{}", formatted);
1146
1147 assert_eq!(df1, crate::DataFrame::from(df));
1148 }
1149
1150 #[rstest]
1151 #[case(
1152 column_frame! {
1153 "a" => [1, 2, 3],
1154 "b" => [4, 5, 6],
1155 "c" => [7, 8, 9]
1156 },
1157 column_frame! {
1158 "a_new" => [1, 2, 3],
1159 "b" => [4, 5, 6],
1160 "c" => [7, 8, 9]
1161 },
1162 vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1163 vec![("a", "a_new".into())]
1164 )]
1165 #[traced_test]
1166 fn rename_test(
1167 #[case] df: ColumnFrame,
1168 #[case] expected: ColumnFrame,
1169 #[case] keys: Vec<Key>,
1170 #[case] renames: Vec<(&str, Key)>,
1171 ) {
1172 let mut df = df;
1173 for (old, new) in renames {
1174 df.rename_key(old, new).expect("BUG: cannot rename key");
1175 }
1176 assert_eq!(df, expected);
1177 assert_eq!(df.keys(), keys.as_slice());
1178 }
1179
1180 #[rstest]
1181 #[case(
1182 column_frame!("a" => [1, 2, 3]),
1183 Key::new("a", crate::DataType::I32),
1184 column_frame!("a" => [1i32, 2i32, 3i32])
1185 )]
1186 #[case(
1187 column_frame!("a" => [1, 2, 3]),
1188 Key::new("a", crate::DataType::U32),
1189 column_frame!("a" => [1u32, 2u32, 3u32])
1190 )]
1191 #[case(
1192 column_frame!("a" => [1, 2, 3]),
1193 Key::new("a", crate::DataType::I64),
1194 column_frame!("a" => [1i64, 2i64, 3i64])
1195 )]
1196 #[case(
1197 column_frame!("a" => [1, 2, 3]),
1198 Key::new("a", crate::DataType::U64),
1199 column_frame!("a" => [1u64, 2u64, 3u64])
1200 )]
1201 #[case(
1202 column_frame!("a" => [1, 2, 3]),
1203 Key::new("a", crate::DataType::F64),
1204 column_frame!("a" => [1f64, 2f64, 3f64])
1205 )]
1206 #[case(
1207 column_frame!("a" => [1, 2, 3]),
1208 Key::new("a", crate::DataType::F32),
1209 column_frame!("a" => [1f32, 2f32, 3f32])
1210 )]
1211 fn test_try_fix_dtype(
1217 #[case] mut df: ColumnFrame,
1218 #[case] key: Key,
1219 #[case] expected: ColumnFrame,
1220 ) {
1221 assert!(df.try_fix_column_by_key(&key).is_ok());
1222 assert_eq!(
1223 df.select(Some(&[key.clone()])),
1224 expected.select(Some(&[key.clone()]))
1225 );
1226 }
1227
1228 #[fixture]
1229 fn unknown_df() -> ColumnFrame {
1230 let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1231
1232 hm.insert("a".into(), vec![1u32.into()]);
1233 hm.insert("b".into(), vec![3i64.into()]);
1234 hm.insert("c".into(), vec![1f64.into()]);
1235 hm.insert("d".into(), vec![1u64.into()]);
1236
1237 hm.into()
1238 }
1239 #[rstest]
1240 #[case(stdhashmap!(
1241 "a" => crate::DataType::U32,
1242 "b" => crate::DataType::I64,
1243 "c" => crate::DataType::F64,
1244 "d" => crate::DataType::U64)
1245 )]
1246 fn test_try_fix_dtype_unknown(
1247 mut unknown_df: ColumnFrame,
1248 #[case] dtypes: HashMap<String, crate::DataType>,
1249 ) {
1250 for dtype in dtypes.iter() {
1251 let t: &Key = unknown_df
1252 .keys()
1253 .iter()
1254 .find(|x| x.name() == dtype.0)
1255 .unwrap();
1256 assert_eq!(t.ctype, crate::DataType::Unknown);
1257 }
1258 assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1259 for dtype in dtypes.iter() {
1260 let t: &Key = unknown_df
1261 .keys()
1262 .iter()
1263 .find(|x| x.name() == dtype.0)
1264 .unwrap();
1265 assert_eq!(t.ctype, *dtype.1);
1266 assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1267 }
1268 assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1269 }
1270
1271 #[rstest]
1272 #[case(
1273 column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1274 Key::new("a", crate::DataType::F32),
1275 column_frame!("a" => [1f32, 2f32, 3f32])
1276 )]
1277 #[traced_test]
1278 fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1279 assert!(df.try_fix_dtype().is_ok());
1280 assert_eq!(
1281 df.select(Some(&[key.clone()])),
1282 expected.select(Some(&[key]))
1283 )
1284 }
1285
1286 #[rstest]
1287 #[traced_test]
1288 fn test_not_key_fix() {
1289 let mut cf = column_frame!("a" => [1]);
1290 let non_existing = Key::new("b", crate::DataType::I32);
1291 assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1292 }
1293
1294 #[rstest]
1295 #[case(
1296 column_frame! {
1297 "a" => [1, 2, 3],
1298 "b" => [4, 5, 6],
1299 "c" => [7, 8, 9]
1300 },
1301 vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1302 vec![("a", "a_alias")]
1303 )]
1304 #[traced_test]
1305 fn alias_test(
1306 #[case] df: ColumnFrame,
1307 #[case] keys: Vec<Key>,
1308 #[case] aliases: Vec<(&str, &str)>,
1309 ) {
1310 let mut df = df;
1311 for (old, new) in aliases {
1312 df.add_alias(old, new).expect("BUG: cannot rename key");
1313 }
1314 let origin_keys = df.keys().to_vec();
1315 let selected_aliases = df.select(Some(keys.as_slice()));
1316 let selected = df.select(Some(origin_keys.as_slice()));
1317 assert_eq!(selected, selected_aliases);
1318 }
1319
1320 #[rstest]
1321 #[traced_test]
1322 fn test_mut_view() {
1323 let data = vec![
1324 DataValue::from(1f64),
1325 DataValue::from(4f32),
1326 DataValue::from(2f64),
1327 DataValue::from(f32::NAN),
1328 DataValue::from(f64::NAN),
1329 DataValue::from(f32::INFINITY),
1330 ];
1331 let keys: Vec<Key> = vec!["a".into(), "b".into()];
1332
1333 let index = KeyIndex::new(keys.clone());
1334 let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1335 let mut df = ColumnFrame::new(index.clone(), df);
1336 df.get_mut_view().mapv_inplace(|x| match x {
1337 DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1338 DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1339 e => e,
1340 });
1341 let data = vec![
1342 DataValue::from(1f64),
1343 DataValue::from(4f32),
1344 DataValue::from(2f64),
1345 DataValue::from(0f32),
1346 DataValue::from(0f64),
1347 DataValue::from(0f32),
1348 ];
1349 let expected = ColumnFrame::new(
1350 index,
1351 Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1352 );
1353 assert_eq!(df, expected);
1354 }
1355
1356 #[rstest]
1357 #[traced_test]
1358 fn dummy_test() {
1359 let data = vec![
1360 DataValue::U32(1),
1361 DataValue::I32(2),
1362 DataValue::I64(3),
1363 DataValue::U64(4),
1364 ];
1365
1366 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1367
1368 let index = KeyIndex::new(keys.clone());
1369 let mut data_frame = Array2::default((1, keys.len()));
1370 for (idx, entry) in data.iter().enumerate() {
1371 data_frame
1372 .column_mut(idx)
1373 .assign(&ArrayView::from(&[entry.clone()]));
1374 }
1375
1376 let frame = ColumnFrame::new(index, data_frame);
1377 assert_eq!(
1378 frame.get_by_row_index(&"a".into(), 0),
1379 Some(&DataValue::U32(1))
1380 );
1381 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1382 assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1383 assert_eq!(
1384 frame.select(Some(&["a".into(), "b".into()])),
1385 Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1386 .expect("BUG: cannot create array")
1387 );
1388 }
1389
1390 #[rstest]
1391 #[traced_test]
1392 fn dummy_test_multiple_rows() {
1393 let data = vec![
1394 DataValue::U32(1),
1395 DataValue::I32(2),
1396 DataValue::I64(3),
1397 DataValue::U64(4),
1398 DataValue::U32(12),
1399 DataValue::I32(22),
1400 DataValue::I64(32),
1401 DataValue::U64(42),
1402 ];
1403
1404 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1405
1406 let index = KeyIndex::new(keys.clone());
1407 let data_frame =
1408 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1409
1410 let frame = ColumnFrame::new(index, data_frame);
1411 assert_eq!(
1412 frame.get_by_row_index(&"a".into(), 0),
1413 Some(&DataValue::U32(1))
1414 );
1415 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1416 assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1417 let arr = Array2::from_shape_vec(
1418 (2, 2),
1419 vec![
1420 DataValue::U32(1),
1421 DataValue::I32(2),
1422 DataValue::U32(12),
1423 DataValue::I32(22),
1424 ],
1425 )
1426 .expect("BUG: cannot create array");
1427 trace!("{arr:?}");
1428 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1429 }
1430
1431 #[rstest]
1432 #[traced_test]
1433 fn dummy_test_multiple_rows_push() {
1434 let data = vec![
1435 DataValue::U32(1),
1436 DataValue::I32(2),
1437 DataValue::I64(3),
1438 DataValue::U64(4),
1439 DataValue::U32(12),
1440 DataValue::I32(22),
1441 DataValue::I64(32),
1442 DataValue::U64(42),
1443 ];
1444 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1445
1446 let index = KeyIndex::new(keys.clone());
1447 let data_frame =
1448 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1449
1450 let mut frame = ColumnFrame::new(index, data_frame);
1451 assert!(frame
1452 .push(data_value::stdhashmap!(
1453 "a" => DataValue::U32(2),
1454 "b" => DataValue::I32(3),
1455 "c" => DataValue::I64(4),
1456 "d" => DataValue::U64(5)
1457 ))
1458 .is_ok());
1459 let arr = Array2::from_shape_vec(
1460 (3, 2),
1461 vec![
1462 DataValue::U32(1),
1463 DataValue::I32(2),
1464 DataValue::U32(12),
1465 DataValue::I32(22),
1466 DataValue::U32(2),
1467 DataValue::I32(3),
1468 ],
1469 )
1470 .expect("BUG: cannot create array");
1471 trace!("{arr:?}");
1472 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1473 let result = frame.push(data_value::stdhashmap!(
1474 "a" => DataValue::U32(34),
1475 "b" => DataValue::I32(44),
1476 "c" => DataValue::I64(54),
1477 "e" => DataValue::F32(6f32)
1478 ));
1479 assert!(result.is_ok(), "{result:?}");
1480 let arr = Array2::from_shape_vec(
1481 (4, 2),
1482 vec![
1483 DataValue::U64(4),
1484 DataValue::Null,
1485 DataValue::U64(42),
1486 DataValue::Null,
1487 DataValue::U64(5),
1488 DataValue::Null,
1489 DataValue::Null,
1490 DataValue::F32(6f32),
1491 ],
1492 )
1493 .expect("BUG: cannot create array");
1494 trace!("{arr:?}");
1495 assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1496 }
1497
1498 #[rstest]
1499 #[case(
1500 column_frame! {
1501 "group_id" => vec![1, 2],
1502 "feed_tag" => vec![3, 4]
1503 },
1504 Some(vec![Key::from("group_id")]),
1505 ndarray::array!([1.into()], [2.into()])
1506 )]
1507 #[case(
1508 column_frame! {
1509 "group_id" => vec![1, 2],
1510 "feed_tag" => vec![3, 4]
1511 },
1512 Some(vec!["group_id".into(), "feed_tag".into()]),
1513 ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1514 )]
1515 #[case(
1516 column_frame! {
1517 "group_id" => vec![1, 2],
1518 "feed_tag" => vec![3, DataValue::Null]
1519 },
1520 Some(vec!["feed_tag".into()]),
1521 ndarray::array![[3.into()], [DataValue::Null]]
1522 )]
1523 #[case(
1524 column_frame! {
1525 "group_id" => vec![1, 2],
1526 "feed_tag" => vec![1, DataValue::Null]
1527 },
1528 Some(vec!["feed_tag2".into()]),
1529 Array2::<DataValue>::default((0, 0))
1530 )]
1531 #[traced_test]
1532 fn test_select(
1533 #[case] input: ColumnFrame,
1534 #[case] keys: Option<Vec<Key>>,
1535 #[case] expected: Array2<DataValue>,
1536 ) {
1537 trace!("input={input:?}");
1538 let keys_slice = keys.as_deref();
1539 let selected = input.select(keys_slice);
1540 trace!("selected={selected:?}");
1541 assert_eq!(selected, expected);
1542 let selected = input.select_transposed(keys_slice);
1543 trace!("selected_transposed={selected:?}");
1544 assert!(selected.is_ok());
1545 assert_eq!(selected.unwrap(), expected.t());
1546 }
1547
1548 #[rstest]
1549 #[case(
1550 column_frame! {
1551 "group_id" => vec![1, 2],
1552 "feed_tag" => vec![3, 4]
1553 },
1554 Key::from("group_id"),
1555 Some(ndarray::array!(1.into(), 2.into()))
1556 )]
1557 #[case(
1558 column_frame! {
1559 "group_id" => vec![1, 2, 5, 6],
1560 "feed_tag" => vec![3, 4, 7, 8]
1561 },
1562 Key::from("group_id"),
1563 Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1564 )]
1565 #[case(
1566 column_frame! {
1567 "group_id" => vec![1, 2],
1568 "feed_tag" => vec![1, 1]
1569 },
1570 Key::from("feed_tag1"),
1571 None
1572 )]
1573 #[traced_test]
1574 fn test_select_column(
1575 #[case] input: ColumnFrame,
1576 #[case] key: Key,
1577 #[case] expected: Option<Array1<DataValue>>,
1578 ) {
1579 let selected = input.select_column(&key);
1580 trace!("selected={selected:?}");
1581 match expected {
1582 Some(expected) => {
1583 assert!(selected.is_some());
1584 assert_eq!(selected.expect("BUG: checked above"), expected);
1585 }
1586 None => assert!(selected.is_none()),
1587 }
1588 }
1589
1590 #[test]
1591 #[traced_test]
1592 fn empty_join_test() {
1593 let join = JoinRelation::add_columns();
1594 let mut column_frame = ColumnFrame::default();
1595 column_frame
1596 .add_single_column("group_id", Array1::from_vec(vec![]))
1597 .expect("BUG: cannot add column");
1598 let column_frame2 = column_frame! {
1599 "group_id" => vec![2, 1, 3],
1600 "feed_tag" => vec![1, 1, 1],
1601 "clicks" => vec![100, 10, 10],
1602 "imps" => vec![1000, 200, 200]
1603 };
1604 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1605
1606 let joined = column_frame.join(column_frame2, &join);
1607 assert!(joined.is_ok(), "{joined:?}");
1608
1609 trace!("{column_frame:?}");
1610 assert_eq!(
1611 column_frame.select(Some(&[
1612 "group_id".into(),
1613 "feed_tag".into(),
1614 "clicks".into(),
1615 "imps".into()
1616 ])),
1617 ndarray::array!(
1618 [2.into(), 1.into(), 100.into(), 1000.into()],
1619 [1.into(), 1.into(), 10.into(), 200.into()],
1620 [3.into(), 1.into(), 10.into(), 200.into()],
1621 )
1622 );
1623
1624 let mut column_frame2 = column_frame! {
1625 "feed_tag" => vec![1, 1, 1],
1626 "clicks" => vec![100, 10, 10],
1627 "imps" => vec![1000, 200, 200]
1628 };
1629 let mut column_frame = ColumnFrame::default();
1630 column_frame
1631 .add_single_column("group_id", Array1::from_vec(vec![]))
1632 .expect("BUG: cannot add column");
1633 let joined = column_frame2.join(column_frame, &join);
1634 assert!(joined.is_ok(), "{joined:?}");
1635
1636 trace!("{column_frame2:?}");
1637 assert_eq!(
1638 column_frame2.select(Some(&[
1639 "group_id".into(),
1640 "feed_tag".into(),
1641 "clicks".into(),
1642 "imps".into()
1643 ])),
1644 ndarray::array!(
1645 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1646 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1647 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1648 )
1649 );
1650
1651 let mut column_frame = ColumnFrame::default();
1652 column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1653 let joined = column_frame2.join(column_frame, &join);
1654 assert!(joined.is_ok(), "{joined:?}");
1655
1656 trace!("{column_frame2:?}");
1657 assert_eq!(
1658 column_frame2.select(Some(&[
1659 "group_id2".into(),
1660 "feed_tag".into(),
1661 "clicks".into(),
1662 "imps".into()
1663 ])),
1664 ndarray::array!(
1665 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1666 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1667 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1668 )
1669 );
1670 }
1671
1672 #[test]
1673 #[traced_test]
1674 fn join_test() {
1675 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1676 "group_id".into(),
1677 "feed_tag".into(),
1678 ])));
1679 let mut column_frame = column_frame! {
1680 "group_id" => vec![1, 2, 8],
1681 "feed_tag" => vec![1, 1, 10]
1682 };
1683 let column_frame2 = column_frame! {
1684 "group_id" => vec![2, 1, 3],
1685 "feed_tag" => vec![1, 1, 1],
1686 "clicks" => vec![100, 10, 10],
1687 "imps" => vec![1000, 200, 200]
1688 };
1689 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1690
1691 let joined = column_frame.join(column_frame2, &join);
1692 assert!(joined.is_ok(), "{joined:?}");
1693
1694 trace!("{column_frame:?}");
1695 assert_eq!(
1696 column_frame.select(Some(&[
1697 "group_id".into(),
1698 "feed_tag".into(),
1699 "clicks".into(),
1700 "imps".into()
1701 ])),
1702 ndarray::array!(
1703 [1.into(), 1.into(), 10.into(), 200.into()],
1704 [2.into(), 1.into(), 100.into(), 1000.into()],
1705 [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1706 )
1707 )
1708 }
1709
1710 #[test]
1711 #[traced_test]
1712 fn join_test_with_additional() {
1713 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1714 "group_id".into(),
1715 "feed_tag".into(),
1716 ])));
1717 let mut column_frame = column_frame! {
1718 "group_id" => vec![1, 2, 8],
1719 "feed_tag" => vec![1, 1, 10],
1720 "clicked" => vec![0, 0, 1]
1721 };
1722 let column_frame2 = column_frame! {
1723 "group_id" => vec![2, 1, 3],
1724 "feed_tag" => vec![1, 1, 1],
1725 "clicks" => vec![100, 10, 10],
1726 "imps" => vec![1000, 200, 200]
1727 };
1728 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1729
1730 let joined = column_frame.join(column_frame2, &join);
1731 assert!(joined.is_ok(), "{joined:?}");
1732
1733 trace!("{column_frame:?}");
1734 assert_eq!(
1735 column_frame.select(Some(&[
1736 "group_id".into(),
1737 "feed_tag".into(),
1738 "clicks".into(),
1739 "imps".into(),
1740 "clicked".into()
1741 ])),
1742 ndarray::array!(
1743 [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1744 [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1745 [
1746 8.into(),
1747 10.into(),
1748 DataValue::Null,
1749 DataValue::Null,
1750 1.into()
1751 ]
1752 )
1753 )
1754 }
1755
1756 #[test]
1757 #[traced_test]
1758 fn join_test_with_additional_single() {
1759 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1760 "group_id".into(),
1761 "feed_tag".into(),
1762 ])));
1763 let mut column_frame = column_frame! {
1764 "group_id" => vec![1, 2, 8],
1765 "feed_tag" => vec![1, 1, 10],
1766 "clicked" => vec![0, 0, 1]
1767 };
1768 let column_frame2 = column_frame! {
1769 "a" => vec![1],
1770 "group_id" => vec![2],
1771 "feed_tag" => vec![1],
1772 "clicks" => vec![10],
1773 "imps" => vec![200]
1774 };
1775 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1776
1777 let joined = column_frame.join(column_frame2, &join);
1778 assert!(joined.is_ok(), "{joined:?}");
1779
1780 trace!("{column_frame:?}");
1781 assert_eq!(
1782 column_frame.select(Some(&[
1783 "group_id".into(),
1784 "feed_tag".into(),
1785 "clicks".into(),
1786 "imps".into(),
1787 "clicked".into()
1788 ])),
1789 ndarray::array!(
1790 [
1791 1.into(),
1792 1.into(),
1793 DataValue::Null,
1794 DataValue::Null,
1795 0.into(),
1796 ],
1797 [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1798 [
1799 8.into(),
1800 10.into(),
1801 DataValue::Null,
1802 DataValue::Null,
1803 1.into()
1804 ]
1805 )
1806 )
1807 }
1808
1809 #[rstest]
1810 #[traced_test]
1811 fn cartesian_product_join() {
1812 let mut df = column_frame! {
1813 "group_id" => vec![1, 2, 3],
1814 "feed_tag" => vec![1, 2, 3]
1815 };
1816 let df2 = column_frame! {
1817 "zone_id" => vec![111111, 111133],
1818 "zone_avg_ctr" => vec![0.1, 0.001]
1819 };
1820 assert!(df
1821 .join(
1822 ColumnFrame::default(),
1823 &JoinRelation::new(JoinBy::CartesianProduct)
1824 )
1825 .is_ok());
1826 let join = JoinRelation::new(JoinBy::CartesianProduct);
1827 let result = df.join(df2, &join);
1828 assert!(result.is_ok(), "{result:?}");
1829 let selected = df.select(None);
1830 trace!("{selected:?}");
1831 assert_eq!(
1832 selected,
1833 ndarray::array!(
1834 [1.into(), 1.into(), 111111.into(), 0.1.into()],
1835 [1.into(), 1.into(), 111133.into(), 0.001.into()],
1836 [2.into(), 2.into(), 111111.into(), 0.1.into()],
1837 [2.into(), 2.into(), 111133.into(), 0.001.into()],
1838 [3.into(), 3.into(), 111111.into(), 0.1.into()],
1839 [3.into(), 3.into(), 111133.into(), 0.001.into()],
1840 )
1841 );
1842
1843 let df2 = column_frame! {
1844 "zone_id" => vec![111]
1845 };
1846 let result = df.join(df2, &join);
1847 assert!(result.is_ok(), "{result:?}");
1848 let selected = df.select(None);
1849 trace!("{selected:?}");
1850 assert_eq!(
1851 selected,
1852 ndarray::array!(
1853 [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
1854 [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
1855 [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
1856 [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
1857 [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
1858 [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
1859 )
1860 );
1861 }
1862
1863 #[rstest]
1864 #[traced_test]
1865 fn broadcast_join() {
1866 let mut df = column_frame! {
1867 "group_id" => vec![1, 2, 3],
1868 "feed_tag" => vec![1, 2, 3]
1869 };
1870 let df2 = column_frame! {
1871 "zone_id" => vec![111111]
1872 };
1873 assert!(df
1874 .join(
1875 ColumnFrame::default(),
1876 &JoinRelation::new(JoinBy::Broadcast)
1877 )
1878 .is_ok());
1879 let join = JoinRelation::new(JoinBy::Broadcast);
1880 assert!(df.join(df2, &join).is_ok());
1881 let selected = df.select(None);
1882 trace!("{selected:?}");
1883 assert_eq!(
1884 selected,
1885 ndarray::array!(
1886 [1.into(), 1.into(), 111111.into()],
1887 [2.into(), 2.into(), 111111.into()],
1888 [3.into(), 3.into(), 111111.into()]
1889 )
1890 );
1891 }
1892 #[rstest]
1893 #[traced_test]
1894 fn merge_test() {
1895 let mut df = column_frame! {
1896 "group_id" => vec![1, 2, 3],
1897 "feed_tag" => vec![1, 2, 3]
1898 };
1899 let df2 = column_frame! {
1900 "group_id" => vec![11, 21, 31],
1901 "feed_tag" => vec![12, 22, 32]
1902 };
1903
1904 let join = JoinRelation::new(JoinBy::Replace);
1905 assert!(df.join(df2, &join).is_ok());
1906 let selected = df.select(None);
1907 trace!("{selected:?}");
1908 assert_eq!(
1909 selected,
1910 ndarray::array!(
1911 [11.into(), 12.into()],
1912 [21.into(), 22.into()],
1913 [31.into(), 32.into()]
1914 )
1915 );
1916 }
1917
1918 #[rstest]
1919 #[traced_test]
1920 fn extend_test() {
1921 let mut df = column_frame! {
1922 "group_id" => vec![1, 2, 3],
1923 "feed_tag" => vec![1, 2, 3]
1924 };
1925 let df2 = column_frame! {
1926 "group_id" => vec![11, 21, 31],
1927 "feed_tag" => vec![5, 6, 7]
1928 };
1929 assert!(df
1930 .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
1931 .is_ok());
1932
1933 let join = JoinRelation::new(JoinBy::Extend);
1934 assert!(df.join(df2, &join).is_ok());
1935 let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
1936 trace!("{selected:?}");
1937 assert_eq!(
1938 selected,
1939 ndarray::array!(
1940 [1.into(), 1.into()],
1941 [2.into(), 2.into()],
1942 [3.into(), 3.into()],
1943 [5.into(), 11.into()],
1944 [6.into(), 21.into()],
1945 [7.into(), 31.into()]
1946 )
1947 );
1948 let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
1949 trace!("{as_map:?}");
1950 assert_eq!(
1951 as_map,
1952 stdhashmap!(
1953 "feed_tag" => vec![1, 2, 3, 5, 6, 7],
1954 "group_id" => vec![1, 2, 3, 11, 21, 31]
1955 )
1956 );
1957
1958 let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
1959 trace!("{as_map:?}");
1960 assert_eq!(as_map, HashMap::default());
1961 }
1962
1963 #[rstest]
1964 #[traced_test]
1965 fn extend_test_with_non_existing_cols() {
1966 let mut df = column_frame! {
1967 "group_id" => vec![1, 2, 3],
1968 "feed_tag" => vec![1, 2, 3]
1969 };
1970 let mut df2 = column_frame! {
1971 "group_id" => vec![11, 21, 31],
1972 "feed_tag" => vec![5, 6, 7],
1973 "clicks" => vec![100, 200, 300],
1974 "impressions" => vec![1000, 2000, 3000]
1975 };
1976 let df_bckp = df.clone();
1977 let join = JoinRelation::new(JoinBy::Extend);
1978 assert!(df.join(df2.clone(), &join).is_ok());
1979 let selected = df.select(None);
1980 trace!("{selected:?}");
1981 assert_eq!(
1982 selected,
1983 ndarray::array!(
1984 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
1985 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
1986 [3.into(), 3.into(), DataValue::Null, DataValue::Null],
1987 [11.into(), 5.into(), 100.into(), 1000.into()],
1988 [21.into(), 6.into(), 200.into(), 2000.into()],
1989 [31.into(), 7.into(), 300.into(), 3000.into()]
1990 )
1991 );
1992 let join = JoinRelation::new(JoinBy::Extend);
1993 let r = df2.join(df_bckp, &join);
1994 assert!(r.is_ok(), "{r:?}");
1995 let selected = df2.select(None);
1996 trace!("{selected:?}");
1997 assert_eq!(
1998 selected,
1999 ndarray::array!(
2000 [11.into(), 5.into(), 100.into(), 1000.into()],
2001 [21.into(), 6.into(), 200.into(), 2000.into()],
2002 [31.into(), 7.into(), 300.into(), 3000.into()],
2003 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2004 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2005 [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2006 )
2007 );
2008 }
2009
2010 #[rstest]
2011 #[traced_test]
2012 fn extend_test_with_non_existing_cols_wrong_order() {
2013 let mut df = column_frame! {
2014 "group_id" => vec![1, 2, 3],
2015 "feed_tag" => vec![1, 2, 3]
2016 };
2017 let df2 = column_frame! {
2018 "feed_tag" => vec![5, 6, 7],
2019 "group_id" => vec![11, 21, 31]
2020 };
2021 let join = JoinRelation::new(JoinBy::Extend);
2022 let err = df.join(df2, &join);
2023 assert!(err.is_ok(), "{err:?}");
2024 }
2025
2026 #[rstest]
2027 #[traced_test]
2028 fn test_replace_not_compatible() {
2029 let mut df = column_frame! {
2030 "group_id" => vec![1, 2, 3],
2031 "feed_tag" => vec![1, 2, 3]
2032 };
2033 let df2 = column_frame! {
2034 "feed_tag" => vec![5, 6],
2035 "group_id" => vec![11, 21]
2036 };
2037 let join = JoinRelation::new(JoinBy::Replace);
2038 let err = df.join(df2, &join);
2039 assert!(err.is_err(), "{err:?}");
2040 let empty = ColumnFrame::default();
2041 let err = df.join(empty, &join);
2042 assert!(err.is_ok(), "{err:?}");
2043 }
2044
2045 #[rstest]
2046 #[traced_test]
2047 fn test_different_data() {
2048 let mut df = column_frame! {
2049 "group_id" => vec![1, 2, 3],
2050 "feed_tag" => vec![1, 2, 3]
2051 };
2052 let df2 = column_frame! {
2053 "group_id" => vec![11, 21],
2054 "a" => vec![5, 6]
2055 };
2056 let join = JoinRelation::new(JoinBy::Extend);
2057 let err = df.join(df2, &join);
2058 assert!(err.is_ok(), "{err:?}");
2059 println!("{df:?}");
2060 let expected_df = ColumnFrame::new(
2061 KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2062 ndarray::array!(
2063 [1.into(), 1.into(), DataValue::Null],
2064 [2.into(), 2.into(), DataValue::Null],
2065 [3.into(), 3.into(), DataValue::Null],
2066 [11.into(), DataValue::Null, 5.into()],
2067 [21.into(), DataValue::Null, 6.into()]
2068 ),
2069 );
2070 assert_eq!(df, expected_df)
2071 }
2072
2073 #[rstest]
2074 #[traced_test]
2075 fn serde_column_frame() {
2076 let df = column_frame! {
2077 "group_id" => vec![1u64, 2u64, 3u64],
2078 "feed_tag" => vec![1u64, 2u64, 3u64]
2079 };
2080 let key_idx = df.index.clone();
2081 let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2082 let deserialized: KeyIndex =
2083 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2084 assert_eq!(key_idx, deserialized);
2085 assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2086 let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2087 let deserialized: ColumnFrame =
2088 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2089 assert_eq!(df, deserialized);
2090 }
2091
2092 #[rstest]
2093 #[traced_test]
2094 fn update_value() {
2095 let mut df = column_frame! {
2096 "group_id" => vec![1, 2, 3],
2097 "feed_tag" => vec![1, 2, 3]
2098 };
2099 let group_id: Key = "group_id".into();
2100 let v = df.get_mut_by_row_index(&group_id, 1);
2101 assert!(v.is_some());
2102 let v = v.unwrap();
2103 assert_eq!(v, &DataValue::I32(2));
2104 *v = DataValue::U64(22);
2105 let v = df.get_by_row_index(&group_id, 1);
2106 assert!(v.is_some());
2107 let v = v.unwrap();
2108 assert_eq!(v, &DataValue::U64(22));
2109
2110 assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2111 }
2112}