1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
22pub struct ColumnFrame {
23 pub index: KeyIndex,
24 pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28 Continue,
29 End,
30}
31
32impl Continue {
33 pub fn should_end(&self) -> bool {
34 matches!(self, Self::End)
35 }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "\n|")?;
44
45 for key in &self.index.keys {
46 write!(f, " {key} |")?;
47 }
48
49 if self.index.is_empty() {
50 writeln!(f, "|")?;
51 }
52
53 if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55 write!(f, "\n|")?;
56 for value in row.iter() {
57 write!(f, " {:10?} |", crate::detect_dtype(value))?;
59 }
60 writeln!(f)?;
61 }
62
63 writeln!(f, "---")?;
64
65 for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67 write!(f, "|")?;
68
69 for value in row.iter() {
70 write!(f, " {value} |")?;
72 }
73 writeln!(f)?;
74
75 if n >= 256 {
76 writeln!(f, "... (dataframe is too long)")?;
77 break;
78 }
79 }
80
81 writeln!(f, "---")
82 }
83}
84pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
85 let x = &item;
86 match dtype {
87 crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
88 crate::DataType::U32 => DataValue::U32(u32::extract(x)),
89 crate::DataType::I32 => DataValue::I32(i32::extract(x)),
90 crate::DataType::U64 => DataValue::U64(u64::extract(x)),
91 crate::DataType::I64 => DataValue::I64(i64::extract(x)),
92 crate::DataType::F32 => DataValue::F32(f32::extract(x)),
93 crate::DataType::U128 => DataValue::U128(u128::extract(x)),
94 crate::DataType::I128 => DataValue::I128(i128::extract(x)),
95 crate::DataType::F64 => DataValue::F64(f64::extract(x)),
96 crate::DataType::U8 => DataValue::U8(u8::extract(x)),
97 crate::DataType::String => DataValue::String(String::extract(x).into()),
98 crate::DataType::Bytes => item,
99 crate::DataType::Map => item,
100 crate::DataType::Vec => item,
101 crate::DataType::Unknown => {
102 if matches!(item, DataValue::Null) {
103 return item;
104 }
105 let dtype = crate::detect_dtype(&item);
106 if matches!(dtype, crate::DataType::Unknown) {
108 tracing::error!("Unknown datatype {dtype:?} - {item:?}");
109 return item;
110 }
111 convert_data_value(item, dtype)
112 }
113 }
114}
115pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
116 convert_data_value(item, key.ctype)
117}
118impl ColumnFrame {
119 pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
120 Self {
121 index: index.into(),
122 data_frame,
123 }
124 }
125
126 pub fn keys(&self) -> &[Key] {
127 self.index.get_keys()
128 }
129
130 pub fn len(&self) -> usize {
131 self.data_frame.nrows()
132 }
133
134 pub fn is_empty(&self) -> bool {
135 self.data_frame.nrows() == 0
136 }
137
138 pub fn shrink(&mut self) {
139 let shape = self.data_frame.shape();
140 if shape[0] > 0 && shape[1] > 0 {
141 let mut new_data = Vec::with_capacity(shape[0] * shape[1]);
142 for elem in self.data_frame.iter() {
143 new_data.push(elem.clone());
144 }
145 if let Ok(arr) = Array2::from_shape_vec((shape[0], shape[1]), new_data) {
146 self.data_frame = arr;
147 }
148 }
149 }
150
151 pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
155 for i in 0..self.index.keys.len() {
156 let should_fix = force || matches!(self.index.keys[i].ctype, crate::DataType::Unknown);
157
158 if should_fix {
159 let column = self
160 .get_single_column(&self.index.keys[i])
161 .ok_or_else(|| Error::EmptyData)?;
162 let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
163 self.index.keys[i].ctype = dtype;
164 }
165 }
166
167 Ok(())
168 }
169 pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
170 let mut errors = vec![];
171 let keys = self.index.keys.clone();
172 for key in keys {
173 tracing::trace!("key: {key:?}- {:?}", key.ctype);
174 if let Err(e) = self.try_fix_column_by_key(&key) {
175 errors.push((key, e.to_string()));
176 }
177 }
178 if errors.is_empty() {
179 Ok(())
180 } else {
181 Err(Error::CastFailed(errors))
182 }
183 }
184
185 pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
186 let idx = self
187 .index
188 .get_column_index(key)
189 .ok_or(Error::MissingField(format!("{key}").into()))?;
190 let mut col = self.data_frame.column_mut(idx);
191
192 col.mapv_inplace(|item| convert_dv_to_dtype(key, item));
193 Ok(())
194 }
195
196 pub fn enforce_dtype_for_column(
197 &mut self,
198 key: &str,
199 dtype: crate::DataType,
200 ) -> Result<(), Error> {
201 if let Some(idx) = self.index.get_column_index_by_name(key) {
202 let new_key = Key::new(key, dtype);
203 let mut col = self.data_frame.column_mut(idx);
204
205 col.mapv_inplace(|item| convert_dv_to_dtype(&new_key, item));
206 self.index.rename_key(key, new_key)?;
207 Ok(())
208 } else {
209 Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
210 }
211 }
212
213 pub fn get_mut_view(&mut self) -> ArrayViewMut2<'_, DataValue> {
214 self.data_frame.view_mut()
215 }
216
217 pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
218 self.index.rename_key(old, new)
219 }
220
221 pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
222 self.index.add_alias(key, alias)
223 }
224
225 pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
231 let selected = self.select(Some(keys));
232 let mut result = Vec::with_capacity(selected.nrows());
233 for row in selected.rows() {
234 let mut r = Vec::with_capacity(selected.ncols());
235 for value in row.iter() {
236 r.push(D::extract(value));
237 }
238 result.push(r);
239 }
240 result
241 }
242
243 pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
251 let keys = keys.unwrap_or_else(|| self.index.get_keys());
252 let key_indexes = self.index.select(keys);
253 if key_indexes.is_empty() {
254 return Ok(Array2::default((0, 0)));
255 }
256 let data_vec: Vec<Array1<DataValue>> = key_indexes
257 .indexes()
258 .iter()
259 .map(|x| self.data_frame.column(*x).to_owned())
260 .collect();
261 to_array2(data_vec)
262 }
263
264 pub fn select_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
268 self.index
269 .get_column_index(key)
270 .map(|x| self.data_frame.column(x))
271 }
272
273 pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
274 where
275 F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
276 {
277 func(keys, self)
278 }
279
280 pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
285 if row_index >= self.data_frame.nrows() {
286 return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
287 }
288 let Some(column_index) = self.index.get_column_index(column) else {
289 return Err(Error::NotFound(column.clone()));
290 };
291 Ok(column_index)
292 }
293
294 pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
298 trace!(
299 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
300 self.data_frame.len(),
301 self.data_frame.nrows()
302 );
303 trace!("{:?}", self.data_frame);
304 match self.validate_entry_access(column, row_index) {
305 Ok(column_index) => self.data_frame.get((row_index, column_index)),
306 Err(e) => {
307 trace!("Error: {e}");
308 None
309 }
310 }
311 }
312
313 pub fn get_mut_by_row_index(
317 &mut self,
318 column: &Key,
319 row_index: usize,
320 ) -> Option<&mut DataValue> {
321 trace!(
322 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
323 self.data_frame.len(),
324 self.data_frame.nrows()
325 );
326 trace!("{:?}", self.data_frame);
327 match self.validate_entry_access(column, row_index) {
328 Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
329 Err(e) => {
330 trace!("Error: {e}");
331 None
332 }
333 }
334 }
335
336 pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
341 let keys = keys.unwrap_or_else(|| self.index.get_keys());
342 let indexes = self.index.select(keys);
343 if indexes.is_empty() {
344 return Default::default();
345 }
346
347 let mut new_data_frame = HashMap::with_capacity(keys.len());
348
349 for key in keys.iter() {
350 if let Some(column_index_in_source) = indexes.get_column_index(key) {
351 let column = self.data_frame.column(column_index_in_source);
352 new_data_frame.insert(key.clone(), column.to_vec());
353 }
354 }
355
356 new_data_frame
357 }
358
359 pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
367 let keys = keys.unwrap_or_else(|| self.index.get_keys());
368 let indexes = self.index.select(keys);
369 if indexes.is_empty() || keys.is_empty() {
370 return Array2::default((0, 0));
371 }
372
373 let nrows = self.data_frame.nrows();
374 let ncols = keys.len();
375
376 let mut data = Vec::with_capacity(nrows * ncols);
378
379 let col_mappings: Vec<(usize, usize)> = keys
381 .iter()
382 .enumerate()
383 .filter_map(|(dst_idx, key)| {
384 indexes
385 .get_column_index(key)
386 .map(|src_idx| (dst_idx, src_idx))
387 })
388 .collect();
389
390 data.resize(nrows * ncols, DataValue::Null);
392
393 for row_idx in 0..nrows {
395 for (dst_col, src_col) in &col_mappings {
396 let dst_idx = row_idx * ncols + dst_col;
397 data[dst_idx] = self.data_frame[(row_idx, *src_col)].clone();
398 }
399 }
400
401 Array2::from_shape_vec((nrows, ncols), data).unwrap_or_else(|_| Array2::default((0, 0)))
402 }
403
404 fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
405 self.index.store_key(key);
406 let len = self.data_frame.nrows();
407 self.data_frame.push_column(Array1::default(len).view())?;
408 Ok(())
409 }
410
411 pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
417 let num_keys = self.index.len();
419 let candidate_keys = row_candidate.keys();
420 let mut arr = Vec::with_capacity(num_keys.max(candidate_keys.len()));
421
422 for key in &candidate_keys {
424 if self.index.get_column_index(key).is_none() {
425 self.extend_dataframe_for_column(key.clone())?;
426 }
427 }
428
429 arr.reserve(self.index.len());
431 for index in self.index.get_keys() {
432 arr.push(
433 row_candidate
434 .get_value_ref(index)
435 .cloned()
436 .unwrap_or(DataValue::Null),
437 );
438 }
439
440 self.data_frame.push_row(Array::from_vec(arr).view())?;
441 Ok(())
442 }
443
444 pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
445 let mut indexes = KeyIndex::default();
447 let data = self.select(Some(keys));
449 for key in keys {
451 if let Some((current, _idx)) = self.index.remove_key(key) {
452 indexes.store_key(current);
453 }
454 }
455 let rest = self.select(Some(self.keys()));
457 let keys = self.index.get_keys().to_vec();
458 self.data_frame = rest;
459 self.index = KeyIndex::new(keys);
460
461 Ok(Self::new(indexes, data))
463 }
464
465 fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
466 if self.index.is_empty() {
467 self.index = other.index.clone();
468 self.data_frame = other.data_frame.clone();
469 return Ok(Continue::End);
470 }
471 if other.index.is_empty() {
472 return Ok(Continue::End);
473 }
474 if self.is_empty() {
475 self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
476 }
477
478 Ok(Continue::Continue)
479 }
480
481 fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
482 let missing_keys: Vec<Key> = other
483 .index
484 .get_keys()
485 .iter()
486 .filter(|key| self.index.get_column_index(key).is_none())
487 .cloned()
488 .collect();
489
490 if missing_keys.is_empty() {
491 return Ok(());
492 }
493
494 for key in missing_keys {
495 self.index.store_key(key);
496 }
497
498 let nrows = self.data_frame.nrows();
499 let new_cols = self.index.len() - self.data_frame.ncols();
500
501 if new_cols > 0 {
502 let new_data = Array2::default((nrows, new_cols));
504 self.data_frame = concatenate(Axis(1), &[self.data_frame.view(), new_data.view()])?;
505 }
506
507 Ok(())
508 }
509
510 fn try_extend(&mut self, other: Self) -> Result<(), Error> {
511 let mut joined_keys = self.index.clone();
512 for key in other.keys() {
513 if self.index.get_column_index(key).is_none() {
514 joined_keys.store_key(key.clone());
515 }
516 }
517
518 let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
519 let mut arr = Array2::default((sum_len, joined_keys.len()));
520 let increment = self.data_frame.nrows();
521
522 for key in joined_keys.get_keys() {
523 let index_result = joined_keys.get_column_index(key).ok_or_else(|| {
524 Error::UnknownError(format!(
525 "Index lookup failed for key '{}' in try_extend",
526 key.name()
527 ))
528 })?;
529
530 let mut col = arr.column_mut(index_result);
531
532 if let Some(index) = self.index.get_column_index(key) {
533 let src_col = self.data_frame.column(index);
534 col.slice_mut(s![..increment]).assign(&src_col);
535 }
536
537 if let Some(index) = other.index.get_column_index(key) {
538 let src_col = other.data_frame.column(index);
539 col.slice_mut(s![increment..]).assign(&src_col);
540 }
541 }
542
543 *self = ColumnFrame::new(joined_keys, arr);
544 Ok(())
545 }
546
547 pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
555 if self.check_or_init_frame(&other)?.should_end() {
556 return Ok(());
557 }
558
559 if self.index.check_order_of_indexes(&other.index).is_err() {
560 return self.try_extend(other);
561 }
562
563 trace!(
564 "Extend columns from other {:?} vs {:?}",
565 other.index.get_keys(),
566 self.index.get_keys()
567 );
568
569 if other.data_frame.ncols() < self.data_frame.ncols() {
570 other.extend_columns_from_other(self)?;
571 } else {
572 self.extend_columns_from_other(&other)?;
573 }
574 self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
575
576 Ok(())
577 }
578
579 pub fn replace(&mut self, other: Self) -> Result<(), Error> {
585 if self.check_or_init_frame(&other)?.should_end() {
586 return Ok(());
587 }
588
589 if self.data_frame.len() > other.data_frame.len() {
590 return Err(Error::DataSetSizeDoesntMatch(
591 self.data_frame.len(),
592 other.data_frame.len(),
593 ));
594 }
595 self.index = other.index;
596 self.data_frame = other.data_frame;
597
598 Ok(())
599 }
600
601 pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
605 if self.check_or_init_frame(&right)?.should_end() {
606 return Ok(());
607 }
608
609 let timer = std::time::Instant::now();
610 let new_columns = right.index.get_complement_keys(self.index.get_keys());
611
612 self.extend_columns_from_other(&right)?;
614 tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
615
616 let column_mappings: Vec<(usize, usize)> = new_columns
618 .iter()
619 .filter_map(|key| {
620 let left_idx = self.index.get_column_index(key)?;
621 let right_idx = right.index.get_column_index(key)?;
622 Some((left_idx, right_idx))
623 })
624 .collect();
625
626 let timer = std::time::Instant::now();
628 let index = Index::new(keys.to_vec(), self);
629 tracing::debug!("Left index build took: {}ns", timer.elapsed().as_nanos());
630 tracing::trace!("Index {index:?}");
631
632 let timer = std::time::Instant::now();
633 let right_index = Index::new(keys.to_vec(), &right);
634 let joined_idx = index.join(right_index);
635 tracing::debug!(
636 "Right index build and join took: {}ns",
637 timer.elapsed().as_nanos()
638 );
639
640 let timer = std::time::Instant::now();
643 let joined_idx_len = joined_idx.len();
644
645 for (left_col_idx, right_col_idx) in &column_mappings {
646 let mut left_col = self.data_frame.column_mut(*left_col_idx);
647 let right_col = right.data_frame.column(*right_col_idx);
648
649 for (left_indices, right_indices) in &joined_idx {
650 for right_row_idx in right_indices {
651 for left_idx in left_indices {
652 left_col[*left_idx] = right_col[*right_row_idx].clone();
653 }
654 }
655 }
656 }
657
658 let elapsed = timer.elapsed();
659 tracing::debug!(
660 "Filled {} rows in {}ms|{}s",
661 joined_idx_len,
662 elapsed.as_millis(),
663 elapsed.as_secs()
664 );
665
666 Ok(())
667 }
668
669 pub fn add_single_column<K: Into<Key>>(
673 &mut self,
674 key: K,
675 column: Array1<DataValue>,
676 ) -> Result<(), Error> {
677 let key = key.into();
678 if self.index.get_column_index(&key).is_some() {
679 return Err(Error::ColumnAlreadyExists(key));
680 }
681 if self.len() != column.len() && !self.is_empty() {
682 return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
683 }
684
685 self.index.store_key(key.clone());
686 let rows = column.len();
687 let column_index = self
688 .index
689 .get_column_index(&key)
690 .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
691 if self.is_empty() && self.index.len() == 1 {
692 self.data_frame = column.into_shape_clone((rows, 1))?;
693 assert_eq!(self.data_frame.column(column_index).len(), rows);
694 } else if self.is_empty() {
695 self.data_frame = Array2::default((column.len(), self.index.len() - 1));
696 self.data_frame.push_column(column.view())?;
697 assert_eq!(self.data_frame.column(column_index).len(), rows);
698 } else {
699 self.data_frame.push_column(column.view())?;
700 }
701 assert_eq!(self.data_frame.column(column_index).len(), rows);
702
703 Ok(())
704 }
705 pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
709 if self.check_or_init_frame(&other)?.should_end() {
710 return Ok(());
711 }
712
713 self.extend_columns_from_other(&other)?;
714 for (idx, key) in other.index.get_keys().iter().enumerate() {
715 if let Some(index) = self.index.get_column_index(key) {
716 trace!("Other array = {:?}", other.data_frame.dim());
717 if other.data_frame.dim() == (0, 0) {
718 self.data_frame.column_mut(index).fill(DataValue::Null);
719 continue;
720 }
721 let arr = other.data_frame.column(idx);
722 trace!(
723 "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
724 self.data_frame.nrows(),
725 arr.len()
726 );
727 if arr.len() != self.data_frame.nrows() {
728 self.data_frame.column_mut(index).fill(DataValue::Null);
729 } else {
730 self.data_frame.column_mut(index).assign(&arr);
731 }
732 }
733 }
734 Ok(())
735 }
736
737 pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
742 if self.check_or_init_frame(&other)?.should_end() {
743 return Ok(());
744 }
745 if other.data_frame.nrows() != 1 {
746 return Err(Error::CannotBroadcast);
747 }
748
749 let all_keys = self.index.get_keys().to_vec();
751 let other_keys: Vec<_> = other
752 .index
753 .get_keys()
754 .iter()
755 .filter(|k| self.index.get_column_index(k).is_none())
756 .cloned()
757 .collect();
758
759 for key in &other_keys {
761 self.index.store_key(key.clone());
762 }
763
764 let nrows = self.len();
765 let ncols = self.index.len();
766 let ncols_old = all_keys.len();
767
768 let mut data = Vec::with_capacity(nrows * ncols);
770
771 for row_idx in 0..nrows {
773 for col_idx in 0..ncols_old {
775 data.push(self.data_frame[(row_idx, col_idx)].clone());
776 }
777 for key in &other_keys {
779 if let Some(other_idx) = other.index.get_column_index(key) {
780 data.push(other.data_frame[(0, other_idx)].clone());
781 }
782 }
783 }
784
785 self.data_frame = Array2::from_shape_vec((nrows, ncols), data)
786 .map_err(|e| Error::UnknownError(format!("Broadcast reshape failed: {e}")))?;
787
788 Ok(())
789 }
790
791 pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
796 if self.check_or_init_frame(&other)?.should_end() {
797 return Ok(());
798 }
799 for other_key in other.keys() {
802 if self.index.get_column_index(other_key).is_none() {
803 self.index.store_key(other_key.clone());
804 } else {
805 self.index.store_key(Key::new(
806 format!("{}-{}", other_key, other_key.id()).as_str(),
807 other_key.ctype,
808 ));
809 }
810 }
811 let max_rows = self.len() * other.len();
812 let ncols = self.index.len();
813 let mut new_df = Array2::default((max_rows, ncols));
815
816 let mut cur_idx = 0;
817 for cur_row in self.data_frame.rows() {
818 for other_row in other.data_frame.rows() {
819 new_df
820 .slice_mut(s![cur_idx, ..])
821 .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
822 cur_idx += 1;
823 }
824 }
825 self.data_frame = new_df;
826 Ok(())
827 }
828
829 pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
837 use JoinBy::*;
838 match &join_type.join_type {
839 AddColumns => self.add_columns(right),
840 Replace => self.replace(right),
841 Extend => self.extend(right),
842 Broadcast => self.broadcast(right),
843 CartesianProduct => self.cartesian_product(right),
844 JoinById(join) => self.join_by_id_inner(right, &join.keys),
845 }
846 }
847
848 pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
849 self.index
850 .get_column_index(key)
851 .map(|x| self.data_frame.column(x))
852 }
853
854 pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
855 let index = self
856 .index
857 .get_column_index(key)
858 .ok_or(Error::NotFound(key.clone()))?;
859 let column = self.data_frame.column(index);
860 let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
861 tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
862 data_with_index.sort_by(
863 |(a_idx, a_val), (b_idx, b_val)| match a_val.partial_cmp(b_val) {
864 Some(ordering) => ordering.then_with(|| a_idx.cmp(b_idx)),
865 None => {
866 let a_null = matches!(a_val, DataValue::Null);
867 let b_null = matches!(b_val, DataValue::Null);
868 match (a_null, b_null) {
869 (true, true) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
870 (true, false) => std::cmp::Ordering::Greater.then_with(|| a_idx.cmp(b_idx)),
871 (false, true) => std::cmp::Ordering::Less.then_with(|| a_idx.cmp(b_idx)),
872 (false, false) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
873 }
874 }
875 },
876 );
877
878 tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
879 let indicies = data_with_index
880 .into_iter()
881 .map(|(idx, _)| idx)
882 .collect::<Vec<_>>();
883
884 Ok(sorted_df::SortedDataFrame::new(self, indicies))
885 }
886
887 pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
888 let mut final_indices = Vec::new();
889 let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
890 for rule in &filter.rules {
891 final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
892 }
893
894 final_indices.sort_unstable();
895 final_indices.dedup();
896 let mut new_df = ColumnFrame::new(
897 self.index.clone(),
898 Array2::default((final_indices.len(), self.index.len())),
899 );
900 final_indices
901 .iter()
902 .enumerate()
903 .for_each(|(cur_idx, row_idx)| {
904 new_df
905 .data_frame
906 .slice_mut(s![cur_idx, ..])
907 .assign(&self.data_frame.slice(s![*row_idx, ..]));
908 });
909
910 Ok(new_df)
911 }
912}
913
914pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
915 let width = source.len();
916 let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
917 let height = flattened.len() / width;
918 Ok(flattened.into_shape_with_order((width, height))?)
919}
920#[macro_export]
921macro_rules! df {
922 ($($everything:tt)*) => {
923 $crate::DataFrame::new($crate::column_frame!($($everything)*))
924 };
925}
926
927#[macro_export]
928macro_rules! column_frame {
929 ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
931 ($($key:expr => vec![$($value:expr),*]),*) => {
933 $crate::column_frame!($($key => [$($value),*]),*)
934 };
935 ($($key:expr => [$($value:expr),*]),*) => {
937 {
938 let data = ::ndarray::arr2(&[$(
939 [$($value.into(),)*],
940 )*]);
941
942 let _keys = vec![$($key.into(),)*];
943
944 $crate::ColumnFrame::new(
945 $crate::KeyIndex::new(_keys),
946 data.reversed_axes()
947 )
948 }
949 };
950 ($($key:expr => $value:expr),*) => {
952 {
953 let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
954 let _keys = vec![$($key.into(),)*];
955
956 $crate::ColumnFrame::new(
957 $crate::KeyIndex::new(_keys),
958 _data,
959 )
960 }
961 };
962}
963
964#[cfg(test)]
965mod test {
966 use crate::{filter::FilterRules, JoinById};
967
968 use super::*;
969 use data_value::stdhashmap;
970 use ndarray::ArrayView;
971 use rstest::*;
972 use tracing_test::traced_test;
973
974 #[rstest]
975 #[case(
976 column_frame! {
977 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
978 "b" => [4, 5, 6],
979 "c" => [7, 8, 9]
980 },
981 column_frame! {
982 "t" => [1752001987000000u64],
983 "b" => [5],
984 "c" => [8]
985 },
986 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
987 )]
988 #[case(
989 column_frame! {
990 "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
991 "b" => [4, 5, 6],
992 "c" => [7, 8, 9]
993 },
994 column_frame! {
995 "t" => [1752001987000000f64],
996 "b" => [5],
997 "c" => [8]
998 },
999 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1000 )]
1001 #[case(
1002 column_frame! {
1003 "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
1004 "b" => [4, 5, 6],
1005 "c" => [7, 8, 9]
1006 },
1007 column_frame! {
1008 "t" => [1752001987000000i64],
1009 "b" => [5],
1010 "c" => [8]
1011 },
1012 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1013 )]
1014 #[case(
1015 column_frame! {
1016 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1017 "b" => [4, 5, 6],
1018 "c" => [7, 8, 9]
1019 },
1020 column_frame! {
1021 "t" => [1751001987000000u64],
1022 "b" => [4],
1023 "c" => [7]
1024 },
1025 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1026 )]
1027 #[case(
1028 column_frame! {
1029 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1030 "b" => [4, 5, 6],
1031 "c" => [7, 8, 9]
1032 },
1033 column_frame! {
1034 "t" => ["2025-07-08 18:13:07"],
1035 "b" => [4],
1036 "c" => [7]
1037 },
1038 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1039 )]
1040 #[case(
1041 column_frame! {
1042 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1043 "b" => [4, 5, 6],
1044 "c" => [7, 8, 9]
1045 },
1046 column_frame! {
1047 "t" => [],
1048 "b" => [],
1049 "c" => []
1050 },
1051 FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
1052 )]
1053 #[case(
1054 column_frame! {
1055 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1056 "b" => [4, 5, 6],
1057 "c" => [7, 8, 9]
1058 },
1059 column_frame! {
1060 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1061 "b" => [4, 5, 6],
1062 "c" => [7, 8, 9]
1063 },
1064 FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
1065 )]
1066 #[case(
1067 column_frame! {
1068 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1069 "b" => [4, 5, 6],
1070 "c" => [7, 8, 9]
1071 },
1072 column_frame! {
1073 "t" => [DataValue::Vec(vec![])],
1074 "b" => [5],
1075 "c" => [ 8]
1076 },
1077 FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
1078 )]
1079 #[case(
1080 column_frame! {
1081 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1082 "b" => [4, 5, 6],
1083 "c" => [7, 8, 9]
1084 },
1085 column_frame! {
1086 "t" => [DataValue::Vec(vec![1.into()])],
1087 "b" => [6],
1088 "c" => [9]
1089 },
1090 FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
1091 )]
1092 #[case(
1093 column_frame! {
1094 "a" => [1, 2, 3],
1095 "b" => [4, 5, 6],
1096 "c" => [7, 8, 9]
1097 },
1098 column_frame! {
1099 "a" => [1, 2],
1100 "b" => [4, 5],
1101 "c" => [7, 8]
1102 },
1103 FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
1104 )]
1105 #[case(
1106 column_frame! {
1107 "a" => [1, 2, 3],
1108 "b" => [4, 5, 6],
1109 "c" => [7, 8, 9]
1110 },
1111 column_frame! {
1112 "a" => [2],
1113 "b" => [5],
1114 "c" => [8]
1115 },
1116 FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
1117 )]
1118 #[case(
1119 column_frame! {
1120 "a" => [1, 2, 3],
1121 "b" => [4, 5, 6],
1122 "c" => [7, 8, 9]
1123 },
1124 column_frame! {
1125 "a" => [],
1126 "b" => [],
1127 "c" => []
1128 },
1129 FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
1130 )]
1131 #[case(
1132 column_frame! {
1133 "a" => [1, 2, 3],
1134 "b" => [4, 5, 6],
1135 "c" => [7, 8, 9]
1136 },
1137 column_frame! {
1138 "a" => [1, 2],
1139 "b" => [4, 5],
1140 "c" => [7, 8]
1141 },
1142 FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1143 )]
1144 #[case(
1145 column_frame! {
1146 "a" => [1, 2, 3],
1147 "b" => [4, 5, 6],
1148 "c" => [7, 8, 9]
1149 },
1150 column_frame! {
1151 "a" => [2],
1152 "b" => [5],
1153 "c" => [8]
1154 },
1155 FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1156 )]
1157 #[case(
1158 column_frame! {
1159 "a" => ["abcd", "ab", "abcdefg"],
1160 "b" => [4, 5, 6],
1161 "c" => [7, 8, 9]
1162 },
1163 column_frame! {
1164 "a" => ["abcd","abcdefg"],
1165 "b" => [4, 6],
1166 "c" => [7, 9]
1167 },
1168 FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1169 )]
1170 #[case(
1171 column_frame! {
1172 "a" => [1, 2, 3],
1173 "b" => [4, 5, 6],
1174 "c" => [7, 8, 9]
1175 },
1176 column_frame! {
1177 "a" => [1],
1178 "b" => [4],
1179 "c" => [7]
1180 },
1181 FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1182 )]
1183 #[case(
1184 column_frame! {
1185 "a" => [1, 2, 3],
1186 "b" => [4, 5, 6],
1187 "c" => [7, 8, 9]
1188 },
1189 column_frame! {
1190 "a" => [2, 3],
1191 "b" => [5, 6],
1192 "c" => [8, 9]
1193 },
1194 FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1195 )]
1196 #[case(
1197 column_frame! {
1198 "a" => [1f64, 2f64, 3f64],
1199 "b" => [4, 5, 6],
1200 "c" => [7, 8, 9]
1201 },
1202 column_frame! {
1203 "a" => [1f64, 2f64],
1204 "b" => [4, 5],
1205 "c" => [7, 8]
1206 },
1207 FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1208 )]
1209 #[case(
1210 column_frame! {
1211 "a" => [1f64, 2f64, 3f64],
1212 "b" => [4i64, 5i64, 6i64],
1213 "c" => [7i64, 8i64, 9i64]
1214 },
1215 column_frame! {
1216 "a" => [1f64, 2f64],
1217 "b" => [4i64, 5i64],
1218 "c" => [7i64, 8i64]
1219 },
1220 FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1221 )]
1222 #[traced_test]
1223 fn filter_test(
1224 #[case] df: ColumnFrame,
1225 #[case] expected: ColumnFrame,
1226 #[case] filter: FilterRules,
1227 ) {
1228 let filtered = df.filter(&filter).expect("BUG: cannot filter");
1229 assert_eq!(filtered, expected);
1230 }
1231
1232 #[rstest]
1233 #[traced_test]
1234 fn test_macro() {
1235 let df = column_frame! {
1236 "a" => 1,
1237 "b" => 2,
1238 "c" => 3,
1239 "d" => 4,
1240 };
1241
1242 assert_eq!(df.len(), 1);
1243 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1244 let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1245 .expect("BUG: cannot create array");
1246 assert_eq!(df.select(None), f);
1247
1248 let df = column_frame! {
1249 "a" => [1, 2, 3],
1250 "b" => [4, 5, 6],
1251 "c" => [7, 8, 9]
1252 };
1253
1254 assert_eq!(df.len(), 3);
1255 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1256 let f = Array2::from_shape_vec(
1257 (3, 3),
1258 vec![
1259 1.into(),
1260 4.into(),
1261 7.into(),
1262 2.into(),
1263 5.into(),
1264 8.into(),
1265 3.into(),
1266 6.into(),
1267 9.into(),
1268 ],
1269 )
1270 .expect("BUG: cannot create array");
1271 let selected = df.select(None);
1272 trace!("{selected:?}");
1273 assert_eq!(selected, f);
1274
1275 let df1 = df! {
1276 "a" => [1, 2, 3],
1277 "b" => [4, 5, 6],
1278 "c" => [7, 8, 9]
1279 };
1280
1281 let formatted = format!("{}", df);
1283 debug!("{}", formatted);
1284
1285 assert_eq!(df1, crate::DataFrame::from(df));
1286 }
1287
1288 #[rstest]
1289 #[case(
1290 column_frame! {
1291 "a" => [1, 2, 3],
1292 "b" => [4, 5, 6],
1293 "c" => [7, 8, 9]
1294 },
1295 column_frame! {
1296 "a_new" => [1, 2, 3],
1297 "b" => [4, 5, 6],
1298 "c" => [7, 8, 9]
1299 },
1300 vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1301 vec![("a", "a_new".into())]
1302 )]
1303 #[traced_test]
1304 fn rename_test(
1305 #[case] df: ColumnFrame,
1306 #[case] expected: ColumnFrame,
1307 #[case] keys: Vec<Key>,
1308 #[case] renames: Vec<(&str, Key)>,
1309 ) {
1310 let mut df = df;
1311 for (old, new) in renames {
1312 df.rename_key(old, new).expect("BUG: cannot rename key");
1313 }
1314 assert_eq!(df, expected);
1315 assert_eq!(df.keys(), keys.as_slice());
1316 }
1317
1318 #[rstest]
1319 #[case(
1320 column_frame!("a" => [1, 2, 3]),
1321 Key::new("a", crate::DataType::I32),
1322 column_frame!("a" => [1i32, 2i32, 3i32])
1323 )]
1324 #[case(
1325 column_frame!("a" => [1, 2, 3]),
1326 Key::new("a", crate::DataType::U32),
1327 column_frame!("a" => [1u32, 2u32, 3u32])
1328 )]
1329 #[case(
1330 column_frame!("a" => [1, 2, 3]),
1331 Key::new("a", crate::DataType::I64),
1332 column_frame!("a" => [1i64, 2i64, 3i64])
1333 )]
1334 #[case(
1335 column_frame!("a" => [1, 2, 3]),
1336 Key::new("a", crate::DataType::U64),
1337 column_frame!("a" => [1u64, 2u64, 3u64])
1338 )]
1339 #[case(
1340 column_frame!("a" => [1, 2, 3]),
1341 Key::new("a", crate::DataType::F64),
1342 column_frame!("a" => [1f64, 2f64, 3f64])
1343 )]
1344 #[case(
1345 column_frame!("a" => [1, 2, 3]),
1346 Key::new("a", crate::DataType::F32),
1347 column_frame!("a" => [1f32, 2f32, 3f32])
1348 )]
1349 fn test_try_fix_dtype(
1355 #[case] mut df: ColumnFrame,
1356 #[case] key: Key,
1357 #[case] expected: ColumnFrame,
1358 ) {
1359 assert!(df.try_fix_column_by_key(&key).is_ok());
1360 assert_eq!(
1361 df.select(Some(&[key.clone()])),
1362 expected.select(Some(&[key.clone()]))
1363 );
1364 }
1365
1366 #[fixture]
1367 fn unknown_df() -> ColumnFrame {
1368 let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1369
1370 hm.insert("a".into(), vec![1u32.into()]);
1371 hm.insert("b".into(), vec![3i64.into()]);
1372 hm.insert("c".into(), vec![1f64.into()]);
1373 hm.insert("d".into(), vec![1u64.into()]);
1374
1375 hm.into()
1376 }
1377 #[rstest]
1378 #[case(stdhashmap!(
1379 "a" => crate::DataType::U32,
1380 "b" => crate::DataType::I64,
1381 "c" => crate::DataType::F64,
1382 "d" => crate::DataType::U64)
1383 )]
1384 fn test_try_fix_dtype_unknown(
1385 mut unknown_df: ColumnFrame,
1386 #[case] dtypes: HashMap<String, crate::DataType>,
1387 ) {
1388 for dtype in dtypes.iter() {
1389 let t: &Key = unknown_df
1390 .keys()
1391 .iter()
1392 .find(|x| x.name() == dtype.0)
1393 .unwrap();
1394 assert_ne!(t.ctype, crate::DataType::Unknown);
1395 }
1396 assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1397 for dtype in dtypes.iter() {
1398 let t: &Key = unknown_df
1399 .keys()
1400 .iter()
1401 .find(|x| x.name() == dtype.0)
1402 .unwrap();
1403 assert_eq!(t.ctype, *dtype.1);
1404 assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1405 }
1406 assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1407 }
1408
1409 #[rstest]
1410 #[case(
1411 column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1412 Key::new("a", crate::DataType::F32),
1413 column_frame!("a" => [1f32, 2f32, 3f32])
1414 )]
1415 #[traced_test]
1416 fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1417 assert!(df.try_fix_dtype().is_ok());
1418 assert_eq!(
1419 df.select(Some(&[key.clone()])),
1420 expected.select(Some(&[key]))
1421 )
1422 }
1423
1424 #[rstest]
1425 #[traced_test]
1426 fn test_not_key_fix() {
1427 let mut cf = column_frame!("a" => [1]);
1428 let non_existing = Key::new("b", crate::DataType::I32);
1429 assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1430 }
1431
1432 #[rstest]
1433 #[case(
1434 column_frame! {
1435 "a" => [1, 2, 3],
1436 "b" => [4, 5, 6],
1437 "c" => [7, 8, 9]
1438 },
1439 vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1440 vec![("a", "a_alias")]
1441 )]
1442 #[traced_test]
1443 fn alias_test(
1444 #[case] df: ColumnFrame,
1445 #[case] keys: Vec<Key>,
1446 #[case] aliases: Vec<(&str, &str)>,
1447 ) {
1448 let mut df = df;
1449 for (old, new) in aliases {
1450 df.add_alias(old, new).expect("BUG: cannot rename key");
1451 }
1452 let origin_keys = df.keys().to_vec();
1453 let selected_aliases = df.select(Some(keys.as_slice()));
1454 let selected = df.select(Some(origin_keys.as_slice()));
1455 assert_eq!(selected, selected_aliases);
1456 }
1457
1458 #[rstest]
1459 #[traced_test]
1460 fn test_mut_view() {
1461 let data = vec![
1462 DataValue::from(1f64),
1463 DataValue::from(4f32),
1464 DataValue::from(2f64),
1465 DataValue::from(f32::NAN),
1466 DataValue::from(f64::NAN),
1467 DataValue::from(f32::INFINITY),
1468 ];
1469 let keys: Vec<Key> = vec!["a".into(), "b".into()];
1470
1471 let index = KeyIndex::new(keys.clone());
1472 let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1473 let mut df = ColumnFrame::new(index.clone(), df);
1474 df.get_mut_view().mapv_inplace(|x| match x {
1475 DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1476 DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1477 e => e,
1478 });
1479 let data = vec![
1480 DataValue::from(1f64),
1481 DataValue::from(4f32),
1482 DataValue::from(2f64),
1483 DataValue::from(0f32),
1484 DataValue::from(0f64),
1485 DataValue::from(0f32),
1486 ];
1487 let expected = ColumnFrame::new(
1488 index,
1489 Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1490 );
1491 assert_eq!(df, expected);
1492 }
1493
1494 #[rstest]
1495 #[traced_test]
1496 fn dummy_test() {
1497 let data = vec![
1498 DataValue::U32(1),
1499 DataValue::I32(2),
1500 DataValue::I64(3),
1501 DataValue::U64(4),
1502 ];
1503
1504 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1505
1506 let index = KeyIndex::new(keys.clone());
1507 let mut data_frame = Array2::default((1, keys.len()));
1508 for (idx, entry) in data.iter().enumerate() {
1509 data_frame
1510 .column_mut(idx)
1511 .assign(&ArrayView::from(&[entry.clone()]));
1512 }
1513
1514 let frame = ColumnFrame::new(index, data_frame);
1515 assert_eq!(
1516 frame.get_by_row_index(&"a".into(), 0),
1517 Some(&DataValue::U32(1))
1518 );
1519 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1520 assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1521 assert_eq!(
1522 frame.select(Some(&["a".into(), "b".into()])),
1523 Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1524 .expect("BUG: cannot create array")
1525 );
1526 }
1527
1528 #[rstest]
1529 #[traced_test]
1530 fn dummy_test_multiple_rows() {
1531 let data = vec![
1532 DataValue::U32(1),
1533 DataValue::I32(2),
1534 DataValue::I64(3),
1535 DataValue::U64(4),
1536 DataValue::U32(12),
1537 DataValue::I32(22),
1538 DataValue::I64(32),
1539 DataValue::U64(42),
1540 ];
1541
1542 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1543
1544 let index = KeyIndex::new(keys.clone());
1545 let data_frame =
1546 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1547
1548 let frame = ColumnFrame::new(index, data_frame);
1549 assert_eq!(
1550 frame.get_by_row_index(&"a".into(), 0),
1551 Some(&DataValue::U32(1))
1552 );
1553 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1554 assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1555 let arr = Array2::from_shape_vec(
1556 (2, 2),
1557 vec![
1558 DataValue::U32(1),
1559 DataValue::I32(2),
1560 DataValue::U32(12),
1561 DataValue::I32(22),
1562 ],
1563 )
1564 .expect("BUG: cannot create array");
1565 trace!("{arr:?}");
1566 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1567 }
1568
1569 #[rstest]
1570 #[traced_test]
1571 fn dummy_test_multiple_rows_push() {
1572 let data = vec![
1573 DataValue::U32(1),
1574 DataValue::I32(2),
1575 DataValue::I64(3),
1576 DataValue::U64(4),
1577 DataValue::U32(12),
1578 DataValue::I32(22),
1579 DataValue::I64(32),
1580 DataValue::U64(42),
1581 ];
1582 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1583
1584 let index = KeyIndex::new(keys.clone());
1585 let data_frame =
1586 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1587
1588 let mut frame = ColumnFrame::new(index, data_frame);
1589 assert!(frame
1590 .push(data_value::stdhashmap!(
1591 "a" => DataValue::U32(2),
1592 "b" => DataValue::I32(3),
1593 "c" => DataValue::I64(4),
1594 "d" => DataValue::U64(5)
1595 ))
1596 .is_ok());
1597 let arr = Array2::from_shape_vec(
1598 (3, 2),
1599 vec![
1600 DataValue::U32(1),
1601 DataValue::I32(2),
1602 DataValue::U32(12),
1603 DataValue::I32(22),
1604 DataValue::U32(2),
1605 DataValue::I32(3),
1606 ],
1607 )
1608 .expect("BUG: cannot create array");
1609 trace!("{arr:?}");
1610 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1611 let result = frame.push(data_value::stdhashmap!(
1612 "a" => DataValue::U32(34),
1613 "b" => DataValue::I32(44),
1614 "c" => DataValue::I64(54),
1615 "e" => DataValue::F32(6f32)
1616 ));
1617 assert!(result.is_ok(), "{result:?}");
1618 let arr = Array2::from_shape_vec(
1619 (4, 2),
1620 vec![
1621 DataValue::U64(4),
1622 DataValue::Null,
1623 DataValue::U64(42),
1624 DataValue::Null,
1625 DataValue::U64(5),
1626 DataValue::Null,
1627 DataValue::Null,
1628 DataValue::F32(6f32),
1629 ],
1630 )
1631 .expect("BUG: cannot create array");
1632 trace!("{arr:?}");
1633 assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1634 }
1635
1636 #[rstest]
1637 #[case(
1638 column_frame! {
1639 "group_id" => vec![1, 2],
1640 "feed_tag" => vec![3, 4]
1641 },
1642 Some(vec![Key::from("group_id")]),
1643 ndarray::array!([1.into()], [2.into()])
1644 )]
1645 #[case(
1646 column_frame! {
1647 "group_id" => vec![1, 2],
1648 "feed_tag" => vec![3, 4]
1649 },
1650 Some(vec!["group_id".into(), "feed_tag".into()]),
1651 ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1652 )]
1653 #[case(
1654 column_frame! {
1655 "group_id" => vec![1, 2],
1656 "feed_tag" => vec![3, DataValue::Null]
1657 },
1658 Some(vec!["feed_tag".into()]),
1659 ndarray::array![[3.into()], [DataValue::Null]]
1660 )]
1661 #[case(
1662 column_frame! {
1663 "group_id" => vec![1, 2],
1664 "feed_tag" => vec![1, DataValue::Null]
1665 },
1666 Some(vec!["feed_tag2".into()]),
1667 Array2::<DataValue>::default((0, 0))
1668 )]
1669 #[traced_test]
1670 fn test_select(
1671 #[case] input: ColumnFrame,
1672 #[case] keys: Option<Vec<Key>>,
1673 #[case] expected: Array2<DataValue>,
1674 ) {
1675 trace!("input={input:?}");
1676 let keys_slice = keys.as_deref();
1677 let selected = input.select(keys_slice);
1678 trace!("selected={selected:?}");
1679 assert_eq!(selected, expected);
1680 let selected = input.select_transposed(keys_slice);
1681 trace!("selected_transposed={selected:?}");
1682 assert!(selected.is_ok());
1683 assert_eq!(selected.unwrap(), expected.t());
1684 }
1685
1686 #[rstest]
1687 #[case(
1688 column_frame! {
1689 "group_id" => vec![1, 2],
1690 "feed_tag" => vec![3, 4]
1691 },
1692 Key::from("group_id"),
1693 Some(ndarray::array!(1.into(), 2.into()))
1694 )]
1695 #[case(
1696 column_frame! {
1697 "group_id" => vec![1, 2, 5, 6],
1698 "feed_tag" => vec![3, 4, 7, 8]
1699 },
1700 Key::from("group_id"),
1701 Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1702 )]
1703 #[case(
1704 column_frame! {
1705 "group_id" => vec![1, 2],
1706 "feed_tag" => vec![1, 1]
1707 },
1708 Key::from("feed_tag1"),
1709 None
1710 )]
1711 #[traced_test]
1712 fn test_select_column(
1713 #[case] input: ColumnFrame,
1714 #[case] key: Key,
1715 #[case] expected: Option<Array1<DataValue>>,
1716 ) {
1717 let selected = input.select_column(&key);
1718 trace!("selected={selected:?}");
1719 match expected {
1720 Some(expected) => {
1721 assert!(selected.is_some());
1722 assert_eq!(selected.expect("BUG: checked above"), expected);
1723 }
1724 None => assert!(selected.is_none()),
1725 }
1726 }
1727
1728 #[test]
1729 #[traced_test]
1730 fn empty_join_test() {
1731 let join = JoinRelation::add_columns();
1732 let mut column_frame = ColumnFrame::default();
1733 column_frame
1734 .add_single_column("group_id", Array1::from_vec(vec![]))
1735 .expect("BUG: cannot add column");
1736 let column_frame2 = column_frame! {
1737 "group_id" => vec![2, 1, 3],
1738 "feed_tag" => vec![1, 1, 1],
1739 "clicks" => vec![100, 10, 10],
1740 "imps" => vec![1000, 200, 200]
1741 };
1742 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1743
1744 let joined = column_frame.join(column_frame2, &join);
1745 assert!(joined.is_ok(), "{joined:?}");
1746
1747 trace!("{column_frame:?}");
1748 assert_eq!(
1749 column_frame.select(Some(&[
1750 "group_id".into(),
1751 "feed_tag".into(),
1752 "clicks".into(),
1753 "imps".into()
1754 ])),
1755 ndarray::array!(
1756 [2.into(), 1.into(), 100.into(), 1000.into()],
1757 [1.into(), 1.into(), 10.into(), 200.into()],
1758 [3.into(), 1.into(), 10.into(), 200.into()],
1759 )
1760 );
1761
1762 let mut column_frame2 = column_frame! {
1763 "feed_tag" => vec![1, 1, 1],
1764 "clicks" => vec![100, 10, 10],
1765 "imps" => vec![1000, 200, 200]
1766 };
1767 let mut column_frame = ColumnFrame::default();
1768 column_frame
1769 .add_single_column("group_id", Array1::from_vec(vec![]))
1770 .expect("BUG: cannot add column");
1771 let joined = column_frame2.join(column_frame, &join);
1772 assert!(joined.is_ok(), "{joined:?}");
1773
1774 trace!("{column_frame2:?}");
1775 assert_eq!(
1776 column_frame2.select(Some(&[
1777 "group_id".into(),
1778 "feed_tag".into(),
1779 "clicks".into(),
1780 "imps".into()
1781 ])),
1782 ndarray::array!(
1783 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1784 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1785 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1786 )
1787 );
1788
1789 let mut column_frame = ColumnFrame::default();
1790 column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1791 let joined = column_frame2.join(column_frame, &join);
1792 assert!(joined.is_ok(), "{joined:?}");
1793
1794 trace!("{column_frame2:?}");
1795 assert_eq!(
1796 column_frame2.select(Some(&[
1797 "group_id2".into(),
1798 "feed_tag".into(),
1799 "clicks".into(),
1800 "imps".into()
1801 ])),
1802 ndarray::array!(
1803 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1804 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1805 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1806 )
1807 );
1808 }
1809
1810 #[test]
1811 #[traced_test]
1812 fn join_test_multiple() {
1813 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1814 let mut column_frame = column_frame! {
1815 "group_id" => vec![1, 1, 3]
1816 };
1817 let column_frame2 = column_frame! {
1818 "group_id" => vec![2, 1, 1],
1819 "clicks" => vec![100, 10, 10],
1820 "imps" => vec![1000, 200, 200]
1821 };
1822
1823 let joined = column_frame.join(column_frame2, &join);
1824 assert!(joined.is_ok(), "{joined:?}");
1825
1826 trace!("{column_frame:?}");
1827 assert_eq!(
1828 column_frame.select(Some(&["group_id".into(), "clicks".into(), "imps".into(),])),
1829 ndarray::array!(
1830 [1.into(), 10.into(), 200.into()],
1831 [1.into(), 10.into(), 200.into()],
1832 [3.into(), DataValue::Null, DataValue::Null],
1833 )
1834 )
1835 }
1836
1837 #[test]
1838 #[traced_test]
1839 fn join_test_no_matches() {
1840 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1841 let mut column_frame = column_frame! {
1842 "group_id" => vec![DataValue::I32(1), DataValue::I32(2), DataValue::I32(3)]
1843 };
1844 let column_frame2 = column_frame! {
1845 "group_id" => vec![DataValue::I32(4), DataValue::I32(5), DataValue::I32(6)],
1846 "clicks" => vec![DataValue::I32(100), DataValue::I32(200), DataValue::I32(300)],
1847 };
1848
1849 let joined = column_frame.join(column_frame2, &join);
1850 assert!(joined.is_ok(), "{joined:?}");
1851
1852 trace!("{column_frame:?}");
1853 assert_eq!(
1854 column_frame.select(Some(&["group_id".into(), "clicks".into()])),
1855 ndarray::array!(
1856 [DataValue::I32(1), DataValue::Null],
1857 [DataValue::I32(2), DataValue::Null],
1858 [DataValue::I32(3), DataValue::Null],
1859 )
1860 )
1861 }
1862 #[test]
1863 #[traced_test]
1864 fn join_test() {
1865 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1866 "group_id".into(),
1867 "feed_tag".into(),
1868 ])));
1869 let mut column_frame = column_frame! {
1870 "group_id" => vec![1, 2, 8],
1871 "feed_tag" => vec![1, 1, 10]
1872 };
1873 let column_frame2 = column_frame! {
1874 "group_id" => vec![2, 1, 3],
1875 "feed_tag" => vec![1, 1, 1],
1876 "clicks" => vec![100, 10, 10],
1877 "imps" => vec![1000, 200, 200]
1878 };
1879 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1880
1881 let joined = column_frame.join(column_frame2, &join);
1882 assert!(joined.is_ok(), "{joined:?}");
1883
1884 trace!("{column_frame:?}");
1885 assert_eq!(
1886 column_frame.select(Some(&[
1887 "group_id".into(),
1888 "feed_tag".into(),
1889 "clicks".into(),
1890 "imps".into()
1891 ])),
1892 ndarray::array!(
1893 [1.into(), 1.into(), 10.into(), 200.into()],
1894 [2.into(), 1.into(), 100.into(), 1000.into()],
1895 [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1896 )
1897 )
1898 }
1899
1900 #[test]
1901 #[traced_test]
1902 fn join_test_with_additional() {
1903 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1904 "group_id".into(),
1905 "feed_tag".into(),
1906 ])));
1907 let mut column_frame = column_frame! {
1908 "group_id" => vec![1, 2, 8],
1909 "feed_tag" => vec![1, 1, 10],
1910 "clicked" => vec![0, 0, 1]
1911 };
1912 let column_frame2 = column_frame! {
1913 "group_id" => vec![2, 1, 3],
1914 "feed_tag" => vec![1, 1, 1],
1915 "clicks" => vec![100, 10, 10],
1916 "imps" => vec![1000, 200, 200]
1917 };
1918 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1919
1920 let joined = column_frame.join(column_frame2, &join);
1921 assert!(joined.is_ok(), "{joined:?}");
1922
1923 trace!("{column_frame:?}");
1924 assert_eq!(
1925 column_frame.select(Some(&[
1926 "group_id".into(),
1927 "feed_tag".into(),
1928 "clicks".into(),
1929 "imps".into(),
1930 "clicked".into()
1931 ])),
1932 ndarray::array!(
1933 [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1934 [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1935 [
1936 8.into(),
1937 10.into(),
1938 DataValue::Null,
1939 DataValue::Null,
1940 1.into()
1941 ]
1942 )
1943 )
1944 }
1945
1946 #[test]
1947 #[traced_test]
1948 fn join_test_with_additional_single() {
1949 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1950 "group_id".into(),
1951 "feed_tag".into(),
1952 ])));
1953 let mut column_frame = column_frame! {
1954 "group_id" => vec![1, 2, 8],
1955 "feed_tag" => vec![1, 1, 10],
1956 "clicked" => vec![0, 0, 1]
1957 };
1958 let column_frame2 = column_frame! {
1959 "a" => vec![1],
1960 "group_id" => vec![2],
1961 "feed_tag" => vec![1],
1962 "clicks" => vec![10],
1963 "imps" => vec![200]
1964 };
1965 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1966
1967 let joined = column_frame.join(column_frame2, &join);
1968 assert!(joined.is_ok(), "{joined:?}");
1969
1970 trace!("{column_frame:?}");
1971 assert_eq!(
1972 column_frame.select(Some(&[
1973 "group_id".into(),
1974 "feed_tag".into(),
1975 "clicks".into(),
1976 "imps".into(),
1977 "clicked".into()
1978 ])),
1979 ndarray::array!(
1980 [
1981 1.into(),
1982 1.into(),
1983 DataValue::Null,
1984 DataValue::Null,
1985 0.into(),
1986 ],
1987 [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1988 [
1989 8.into(),
1990 10.into(),
1991 DataValue::Null,
1992 DataValue::Null,
1993 1.into()
1994 ]
1995 )
1996 )
1997 }
1998
1999 #[rstest]
2000 #[traced_test]
2001 fn cartesian_product_join() {
2002 let mut df = column_frame! {
2003 "group_id" => vec![1, 2, 3],
2004 "feed_tag" => vec![1, 2, 3]
2005 };
2006 let df2 = column_frame! {
2007 "zone_id" => vec![111111, 111133],
2008 "zone_avg_ctr" => vec![0.1, 0.001]
2009 };
2010 assert!(df
2011 .join(
2012 ColumnFrame::default(),
2013 &JoinRelation::new(JoinBy::CartesianProduct)
2014 )
2015 .is_ok());
2016 let join = JoinRelation::new(JoinBy::CartesianProduct);
2017 let result = df.join(df2, &join);
2018 assert!(result.is_ok(), "{result:?}");
2019 let selected = df.select(None);
2020 trace!("{selected:?}");
2021 assert_eq!(
2022 selected,
2023 ndarray::array!(
2024 [1.into(), 1.into(), 111111.into(), 0.1.into()],
2025 [1.into(), 1.into(), 111133.into(), 0.001.into()],
2026 [2.into(), 2.into(), 111111.into(), 0.1.into()],
2027 [2.into(), 2.into(), 111133.into(), 0.001.into()],
2028 [3.into(), 3.into(), 111111.into(), 0.1.into()],
2029 [3.into(), 3.into(), 111133.into(), 0.001.into()],
2030 )
2031 );
2032
2033 let df2 = column_frame! {
2034 "zone_id" => vec![111]
2035 };
2036 let result = df.join(df2, &join);
2037 assert!(result.is_ok(), "{result:?}");
2038 let selected = df.select(None);
2039 trace!("{selected:?}");
2040 assert_eq!(
2041 selected,
2042 ndarray::array!(
2043 [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
2044 [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
2045 [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
2046 [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
2047 [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
2048 [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
2049 )
2050 );
2051 }
2052
2053 #[rstest]
2054 #[traced_test]
2055 fn broadcast_join() {
2056 let mut df = column_frame! {
2057 "group_id" => vec![1, 2, 3],
2058 "feed_tag" => vec![1, 2, 3]
2059 };
2060 let df2 = column_frame! {
2061 "zone_id" => vec![111111]
2062 };
2063 assert!(df
2064 .join(
2065 ColumnFrame::default(),
2066 &JoinRelation::new(JoinBy::Broadcast)
2067 )
2068 .is_ok());
2069 let join = JoinRelation::new(JoinBy::Broadcast);
2070 assert!(df.join(df2, &join).is_ok());
2071 let selected = df.select(None);
2072 trace!("{selected:?}");
2073 assert_eq!(
2074 selected,
2075 ndarray::array!(
2076 [1.into(), 1.into(), 111111.into()],
2077 [2.into(), 2.into(), 111111.into()],
2078 [3.into(), 3.into(), 111111.into()]
2079 )
2080 );
2081 }
2082 #[rstest]
2083 #[traced_test]
2084 fn merge_test() {
2085 let mut df = column_frame! {
2086 "group_id" => vec![1, 2, 3],
2087 "feed_tag" => vec![1, 2, 3]
2088 };
2089 let df2 = column_frame! {
2090 "group_id" => vec![11, 21, 31],
2091 "feed_tag" => vec![12, 22, 32]
2092 };
2093
2094 let join = JoinRelation::new(JoinBy::Replace);
2095 assert!(df.join(df2, &join).is_ok());
2096 let selected = df.select(None);
2097 trace!("{selected:?}");
2098 assert_eq!(
2099 selected,
2100 ndarray::array!(
2101 [11.into(), 12.into()],
2102 [21.into(), 22.into()],
2103 [31.into(), 32.into()]
2104 )
2105 );
2106 }
2107
2108 #[rstest]
2109 #[traced_test]
2110 fn extend_test() {
2111 let mut df = column_frame! {
2112 "group_id" => vec![1, 2, 3],
2113 "feed_tag" => vec![1, 2, 3]
2114 };
2115 let df2 = column_frame! {
2116 "group_id" => vec![11, 21, 31],
2117 "feed_tag" => vec![5, 6, 7]
2118 };
2119 assert!(df
2120 .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
2121 .is_ok());
2122
2123 let join = JoinRelation::new(JoinBy::Extend);
2124 assert!(df.join(df2, &join).is_ok());
2125 let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
2126 trace!("{selected:?}");
2127 assert_eq!(
2128 selected,
2129 ndarray::array!(
2130 [1.into(), 1.into()],
2131 [2.into(), 2.into()],
2132 [3.into(), 3.into()],
2133 [5.into(), 11.into()],
2134 [6.into(), 21.into()],
2135 [7.into(), 31.into()]
2136 )
2137 );
2138 let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
2139 trace!("{as_map:?}");
2140 assert_eq!(
2141 as_map,
2142 stdhashmap!(
2143 "feed_tag" => vec![1, 2, 3, 5, 6, 7],
2144 "group_id" => vec![1, 2, 3, 11, 21, 31]
2145 )
2146 );
2147
2148 let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
2149 trace!("{as_map:?}");
2150 assert_eq!(as_map, HashMap::default());
2151 }
2152
2153 #[rstest]
2154 #[traced_test]
2155 fn extend_test_with_non_existing_cols() {
2156 let mut df = column_frame! {
2157 "group_id" => vec![1, 2, 3],
2158 "feed_tag" => vec![1, 2, 3]
2159 };
2160 let mut df2 = column_frame! {
2161 "group_id" => vec![11, 21, 31],
2162 "feed_tag" => vec![5, 6, 7],
2163 "clicks" => vec![100, 200, 300],
2164 "impressions" => vec![1000, 2000, 3000]
2165 };
2166 let df_bckp = df.clone();
2167 let join = JoinRelation::new(JoinBy::Extend);
2168 assert!(df.join(df2.clone(), &join).is_ok());
2169 let selected = df.select(None);
2170 trace!("{selected:?}");
2171 assert_eq!(
2172 selected,
2173 ndarray::array!(
2174 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2175 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2176 [3.into(), 3.into(), DataValue::Null, DataValue::Null],
2177 [11.into(), 5.into(), 100.into(), 1000.into()],
2178 [21.into(), 6.into(), 200.into(), 2000.into()],
2179 [31.into(), 7.into(), 300.into(), 3000.into()]
2180 )
2181 );
2182 let join = JoinRelation::new(JoinBy::Extend);
2183 let r = df2.join(df_bckp, &join);
2184 assert!(r.is_ok(), "{r:?}");
2185 let selected = df2.select(None);
2186 trace!("{selected:?}");
2187 assert_eq!(
2188 selected,
2189 ndarray::array!(
2190 [11.into(), 5.into(), 100.into(), 1000.into()],
2191 [21.into(), 6.into(), 200.into(), 2000.into()],
2192 [31.into(), 7.into(), 300.into(), 3000.into()],
2193 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2194 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2195 [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2196 )
2197 );
2198 }
2199
2200 #[rstest]
2201 #[traced_test]
2202 fn extend_test_with_non_existing_cols_wrong_order() {
2203 let mut df = column_frame! {
2204 "group_id" => vec![1, 2, 3],
2205 "feed_tag" => vec![1, 2, 3]
2206 };
2207 let df2 = column_frame! {
2208 "feed_tag" => vec![5, 6, 7],
2209 "group_id" => vec![11, 21, 31]
2210 };
2211 let join = JoinRelation::new(JoinBy::Extend);
2212 let err = df.join(df2, &join);
2213 assert!(err.is_ok(), "{err:?}");
2214 }
2215
2216 #[rstest]
2217 #[traced_test]
2218 fn test_replace_not_compatible() {
2219 let mut df = column_frame! {
2220 "group_id" => vec![1, 2, 3],
2221 "feed_tag" => vec![1, 2, 3]
2222 };
2223 let df2 = column_frame! {
2224 "feed_tag" => vec![5, 6],
2225 "group_id" => vec![11, 21]
2226 };
2227 let join = JoinRelation::new(JoinBy::Replace);
2228 let err = df.join(df2, &join);
2229 assert!(err.is_err(), "{err:?}");
2230 let empty = ColumnFrame::default();
2231 let err = df.join(empty, &join);
2232 assert!(err.is_ok(), "{err:?}");
2233 }
2234
2235 #[rstest]
2236 #[traced_test]
2237 fn test_different_data() {
2238 let mut df = column_frame! {
2239 "group_id" => vec![1, 2, 3],
2240 "feed_tag" => vec![1, 2, 3]
2241 };
2242 let df2 = column_frame! {
2243 "group_id" => vec![11, 21],
2244 "a" => vec![5, 6]
2245 };
2246 let join = JoinRelation::new(JoinBy::Extend);
2247 let err = df.join(df2, &join);
2248 assert!(err.is_ok(), "{err:?}");
2249 println!("{df:?}");
2250 let expected_df = ColumnFrame::new(
2251 KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2252 ndarray::array!(
2253 [1.into(), 1.into(), DataValue::Null],
2254 [2.into(), 2.into(), DataValue::Null],
2255 [3.into(), 3.into(), DataValue::Null],
2256 [11.into(), DataValue::Null, 5.into()],
2257 [21.into(), DataValue::Null, 6.into()]
2258 ),
2259 );
2260 assert_eq!(df, expected_df)
2261 }
2262
2263 #[rstest]
2264 #[traced_test]
2265 fn serde_column_frame() {
2266 let df = column_frame! {
2267 "group_id" => vec![1u64, 2u64, 3u64],
2268 "feed_tag" => vec![1u64, 2u64, 3u64]
2269 };
2270 let key_idx = df.index.clone();
2271 let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2272 let deserialized: KeyIndex =
2273 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2274 assert_eq!(key_idx, deserialized);
2275 assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2276 let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2277 let deserialized: ColumnFrame =
2278 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2279 assert_eq!(df, deserialized);
2280 }
2281
2282 #[rstest]
2283 #[traced_test]
2284 fn update_value() {
2285 let mut df = column_frame! {
2286 "group_id" => vec![1, 2, 3],
2287 "feed_tag" => vec![1, 2, 3]
2288 };
2289 let group_id: Key = "group_id".into();
2290 let v = df.get_mut_by_row_index(&group_id, 1);
2291 assert!(v.is_some());
2292 let v = v.unwrap();
2293 assert_eq!(v, &DataValue::I32(2));
2294 *v = DataValue::U64(22);
2295 let v = df.get_by_row_index(&group_id, 1);
2296 assert!(v.is_some());
2297 let v = v.unwrap();
2298 assert_eq!(v, &DataValue::U64(22));
2299
2300 assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2301 }
2302}