1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
22pub struct ColumnFrame {
23 pub index: KeyIndex,
24 pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28 Continue,
29 End,
30}
31
32impl Continue {
33 pub fn should_end(&self) -> bool {
34 matches!(self, Self::End)
35 }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 write!(f, "\n|")?;
44
45 for key in &self.index.keys {
46 write!(f, " {key} |")?;
47 }
48
49 if self.index.is_empty() {
50 writeln!(f, "|")?;
51 }
52
53 if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55 write!(f, "\n|")?;
56 for value in row.iter() {
57 write!(f, " {:10?} |", crate::detect_dtype(value))?;
59 }
60 writeln!(f)?;
61 }
62
63 writeln!(f, "---")?;
64
65 for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67 write!(f, "|")?;
68
69 for value in row.iter() {
70 write!(f, " {value} |")?;
72 }
73 writeln!(f)?;
74
75 if n >= 256 {
76 writeln!(f, "... (dataframe is too long)")?;
77 break;
78 }
79 }
80
81 writeln!(f, "---")
82 }
83}
84pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
89 let x = &item;
90 match dtype {
91 crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
92 crate::DataType::U32 => DataValue::U32(u32::extract(x)),
93 crate::DataType::I32 => DataValue::I32(i32::extract(x)),
94 crate::DataType::U64 => DataValue::U64(u64::extract(x)),
95 crate::DataType::I64 => DataValue::I64(i64::extract(x)),
96 crate::DataType::F32 => DataValue::F32(f32::extract(x)),
97 crate::DataType::U128 => DataValue::U128(u128::extract(x)),
98 crate::DataType::I128 => DataValue::I128(i128::extract(x)),
99 crate::DataType::F64 => DataValue::F64(f64::extract(x)),
100 crate::DataType::U8 => DataValue::U8(u8::extract(x)),
101 crate::DataType::String => DataValue::String(String::extract(x).into()),
102 crate::DataType::Bytes => item,
103 crate::DataType::Map => item,
104 crate::DataType::Vec => item,
105 crate::DataType::Unknown => {
106 if matches!(item, DataValue::Null) {
107 return item;
108 }
109 let dtype = crate::detect_dtype(&item);
110 if matches!(dtype, crate::DataType::Unknown) {
112 tracing::error!("Unknown datatype {dtype:?} - {item:?}");
113 return item;
114 }
115 convert_data_value(item, dtype)
116 }
117 }
118}
119pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
123 convert_data_value(item, key.ctype)
124}
125impl ColumnFrame {
126 pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
130 Self {
131 index: index.into(),
132 data_frame,
133 }
134 }
135
136 pub fn keys(&self) -> &[Key] {
138 self.index.get_keys()
139 }
140
141 pub fn len(&self) -> usize {
143 self.data_frame.nrows()
144 }
145
146 pub fn is_empty(&self) -> bool {
148 self.data_frame.nrows() == 0
149 }
150
151 pub fn shrink(&mut self) {
154 let shape = self.data_frame.shape();
155 if shape[0] > 0 && shape[1] > 0 {
156 let mut new_data = Vec::with_capacity(shape[0] * shape[1]);
157 for elem in self.data_frame.iter() {
158 new_data.push(elem.clone());
159 }
160 if let Ok(arr) = Array2::from_shape_vec((shape[0], shape[1]), new_data) {
161 self.data_frame = arr;
162 }
163 }
164 }
165
166 pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
170 for i in 0..self.index.keys.len() {
171 let should_fix = force || matches!(self.index.keys[i].ctype, crate::DataType::Unknown);
172
173 if should_fix {
174 let column = self
175 .get_single_column(&self.index.keys[i])
176 .ok_or_else(|| Error::EmptyData)?;
177 let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
178 self.index.keys[i].ctype = dtype;
179 }
180 }
181
182 Ok(())
183 }
184 pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
189 let mut errors = vec![];
190 let keys = self.index.keys.clone();
191 for key in keys {
192 tracing::trace!("key: {key:?}- {:?}", key.ctype);
193 if let Err(e) = self.try_fix_column_by_key(&key) {
194 errors.push((key, e.to_string()));
195 }
196 }
197 if errors.is_empty() {
198 Ok(())
199 } else {
200 Err(Error::CastFailed(errors))
201 }
202 }
203
204 pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
208 let idx = self
209 .index
210 .get_column_index(key)
211 .ok_or(Error::MissingField(format!("{key}").into()))?;
212 let mut col = self.data_frame.column_mut(idx);
213
214 col.mapv_inplace(|item| convert_dv_to_dtype(key, item));
215 Ok(())
216 }
217
218 pub fn enforce_dtype_for_column(
223 &mut self,
224 key: &str,
225 dtype: crate::DataType,
226 ) -> Result<(), Error> {
227 if let Some(idx) = self.index.get_column_index_by_name(key) {
228 let new_key = Key::new(key, dtype);
229 let mut col = self.data_frame.column_mut(idx);
230
231 col.mapv_inplace(|item| convert_dv_to_dtype(&new_key, item));
232 self.index.rename_key(key, new_key)?;
233 Ok(())
234 } else {
235 Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
236 }
237 }
238
239 pub fn get_mut_view(&mut self) -> ArrayViewMut2<'_, DataValue> {
244 self.data_frame.view_mut()
245 }
246
247 pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
252 self.index.rename_key(old, new)
253 }
254
255 pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
259 self.index.add_alias(key, alias)
260 }
261
262 pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
268 let selected = self.select(Some(keys));
269 let mut result = Vec::with_capacity(selected.nrows());
270 for row in selected.rows() {
271 let mut r = Vec::with_capacity(selected.ncols());
272 for value in row.iter() {
273 r.push(D::extract(value));
274 }
275 result.push(r);
276 }
277 result
278 }
279
280 pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
288 let keys = keys.unwrap_or_else(|| self.index.get_keys());
289 let key_indexes = self.index.select(keys);
290 if key_indexes.is_empty() {
291 return Ok(Array2::default((0, 0)));
292 }
293 let data_vec: Vec<Array1<DataValue>> = key_indexes
294 .indexes()
295 .iter()
296 .map(|x| self.data_frame.column(*x).to_owned())
297 .collect();
298 to_array2(data_vec)
299 }
300
301 pub fn select_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
305 self.index
306 .get_column_index(key)
307 .map(|x| self.data_frame.column(x))
308 }
309
310 pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
315 where
316 F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
317 {
318 func(keys, self)
319 }
320
321 pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
326 if row_index >= self.data_frame.nrows() {
327 return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
328 }
329 let Some(column_index) = self.index.get_column_index(column) else {
330 return Err(Error::NotFound(column.clone()));
331 };
332 Ok(column_index)
333 }
334
335 pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
339 trace!(
340 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
341 self.data_frame.len(),
342 self.data_frame.nrows()
343 );
344 trace!("{:?}", self.data_frame);
345 match self.validate_entry_access(column, row_index) {
346 Ok(column_index) => self.data_frame.get((row_index, column_index)),
347 Err(e) => {
348 trace!("Error: {e}");
349 None
350 }
351 }
352 }
353
354 pub fn get_mut_by_row_index(
358 &mut self,
359 column: &Key,
360 row_index: usize,
361 ) -> Option<&mut DataValue> {
362 trace!(
363 "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
364 self.data_frame.len(),
365 self.data_frame.nrows()
366 );
367 trace!("{:?}", self.data_frame);
368 match self.validate_entry_access(column, row_index) {
369 Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
370 Err(e) => {
371 trace!("Error: {e}");
372 None
373 }
374 }
375 }
376
377 pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
382 let keys = keys.unwrap_or_else(|| self.index.get_keys());
383 let indexes = self.index.select(keys);
384 if indexes.is_empty() {
385 return Default::default();
386 }
387
388 let mut new_data_frame = HashMap::with_capacity(keys.len());
389
390 for key in keys.iter() {
391 if let Some(column_index_in_source) = indexes.get_column_index(key) {
392 let column = self.data_frame.column(column_index_in_source);
393 new_data_frame.insert(key.clone(), column.to_vec());
394 }
395 }
396
397 new_data_frame
398 }
399
400 pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
408 let keys = keys.unwrap_or_else(|| self.index.get_keys());
409 let indexes = self.index.select(keys);
410 if indexes.is_empty() || keys.is_empty() {
411 return Array2::default((0, 0));
412 }
413
414 let nrows = self.data_frame.nrows();
415 let ncols = keys.len();
416
417 let mut data = Vec::with_capacity(nrows * ncols);
419
420 let col_mappings: Vec<(usize, usize)> = keys
422 .iter()
423 .enumerate()
424 .filter_map(|(dst_idx, key)| {
425 indexes
426 .get_column_index(key)
427 .map(|src_idx| (dst_idx, src_idx))
428 })
429 .collect();
430
431 data.resize(nrows * ncols, DataValue::Null);
433
434 for row_idx in 0..nrows {
436 for (dst_col, src_col) in &col_mappings {
437 let dst_idx = row_idx * ncols + dst_col;
438 data[dst_idx] = self.data_frame[(row_idx, *src_col)].clone();
439 }
440 }
441
442 Array2::from_shape_vec((nrows, ncols), data).unwrap_or_else(|_| Array2::default((0, 0)))
443 }
444
445 pub fn select_typed<T: Extract>(&self, keys: Option<&[Key]>) -> Array2<T> {
457 self.select(keys).mapv(|x| T::extract(&x))
458 }
459
460 fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
461 self.index.store_key(key);
462 let len = self.data_frame.nrows();
463 self.data_frame.push_column(Array1::default(len).view())?;
464 Ok(())
465 }
466
467 pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
473 let num_keys = self.index.len();
475 let candidate_keys = row_candidate.keys();
476 let mut arr = Vec::with_capacity(num_keys.max(candidate_keys.len()));
477
478 for key in &candidate_keys {
480 if self.index.get_column_index(key).is_none() {
481 self.extend_dataframe_for_column(key.clone())?;
482 }
483 }
484
485 arr.reserve(self.index.len());
487 for index in self.index.get_keys() {
488 arr.push(
489 row_candidate
490 .get_value_ref(index)
491 .cloned()
492 .unwrap_or(DataValue::Null),
493 );
494 }
495
496 self.data_frame.push_row(Array::from_vec(arr).view())?;
497 Ok(())
498 }
499
500 pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
504 let mut indexes = KeyIndex::default();
506 let data = self.select(Some(keys));
508 for key in keys {
510 if let Some((current, _idx)) = self.index.remove_key(key) {
511 indexes.store_key(current);
512 }
513 }
514 let rest = self.select(Some(self.keys()));
516 let keys = self.index.get_keys().to_vec();
517 self.data_frame = rest;
518 self.index = KeyIndex::new(keys);
519
520 Ok(Self::new(indexes, data))
522 }
523
524 fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
525 if self.index.is_empty() {
526 self.index = other.index.clone();
527 self.data_frame = other.data_frame.clone();
528 return Ok(Continue::End);
529 }
530 if other.index.is_empty() {
531 return Ok(Continue::End);
532 }
533 if self.is_empty() {
534 self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
535 }
536
537 Ok(Continue::Continue)
538 }
539
540 fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
541 let missing_keys: Vec<Key> = other
542 .index
543 .get_keys()
544 .iter()
545 .filter(|key| self.index.get_column_index(key).is_none())
546 .cloned()
547 .collect();
548
549 if missing_keys.is_empty() {
550 return Ok(());
551 }
552
553 for key in missing_keys {
554 self.index.store_key(key);
555 }
556
557 let nrows = self.data_frame.nrows();
558 let new_cols = self.index.len() - self.data_frame.ncols();
559
560 if new_cols > 0 {
561 let new_data = Array2::default((nrows, new_cols));
563 self.data_frame = concatenate(Axis(1), &[self.data_frame.view(), new_data.view()])?;
564 }
565
566 Ok(())
567 }
568
569 fn try_extend(&mut self, other: Self) -> Result<(), Error> {
570 let mut joined_keys = self.index.clone();
571 for key in other.keys() {
572 if self.index.get_column_index(key).is_none() {
573 joined_keys.store_key(key.clone());
574 }
575 }
576
577 let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
578 let mut arr = Array2::default((sum_len, joined_keys.len()));
579 let increment = self.data_frame.nrows();
580
581 for key in joined_keys.get_keys() {
582 let index_result = joined_keys.get_column_index(key).ok_or_else(|| {
583 Error::UnknownError(format!(
584 "Index lookup failed for key '{}' in try_extend",
585 key.name()
586 ))
587 })?;
588
589 let mut col = arr.column_mut(index_result);
590
591 if let Some(index) = self.index.get_column_index(key) {
592 let src_col = self.data_frame.column(index);
593 col.slice_mut(s![..increment]).assign(&src_col);
594 }
595
596 if let Some(index) = other.index.get_column_index(key) {
597 let src_col = other.data_frame.column(index);
598 col.slice_mut(s![increment..]).assign(&src_col);
599 }
600 }
601
602 *self = ColumnFrame::new(joined_keys, arr);
603 Ok(())
604 }
605
606 pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
614 if self.check_or_init_frame(&other)?.should_end() {
615 return Ok(());
616 }
617
618 if self.index.check_order_of_indexes(&other.index).is_err() {
619 return self.try_extend(other);
620 }
621
622 trace!(
623 "Extend columns from other {:?} vs {:?}",
624 other.index.get_keys(),
625 self.index.get_keys()
626 );
627
628 if other.data_frame.ncols() < self.data_frame.ncols() {
629 other.extend_columns_from_other(self)?;
630 } else {
631 self.extend_columns_from_other(&other)?;
632 }
633 self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
634
635 Ok(())
636 }
637
638 pub fn replace(&mut self, other: Self) -> Result<(), Error> {
644 if self.check_or_init_frame(&other)?.should_end() {
645 return Ok(());
646 }
647
648 if self.data_frame.len() > other.data_frame.len() {
649 return Err(Error::DataSetSizeDoesntMatch(
650 self.data_frame.len(),
651 other.data_frame.len(),
652 ));
653 }
654 self.index = other.index;
655 self.data_frame = other.data_frame;
656
657 Ok(())
658 }
659
660 pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
664 if self.check_or_init_frame(&right)?.should_end() {
665 return Ok(());
666 }
667
668 let timer = std::time::Instant::now();
669 let new_columns = right.index.get_complement_keys(self.index.get_keys());
670
671 self.extend_columns_from_other(&right)?;
673 tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
674
675 let column_mappings: Vec<(usize, usize)> = new_columns
677 .iter()
678 .filter_map(|key| {
679 let left_idx = self.index.get_column_index(key)?;
680 let right_idx = right.index.get_column_index(key)?;
681 Some((left_idx, right_idx))
682 })
683 .collect();
684
685 let timer = std::time::Instant::now();
687 let index = Index::new(keys.to_vec(), self);
688 tracing::debug!("Left index build took: {}ns", timer.elapsed().as_nanos());
689 tracing::trace!("Index {index:?}");
690
691 let timer = std::time::Instant::now();
692 let right_index = Index::new(keys.to_vec(), &right);
693 let joined_idx = index.join(right_index);
694 tracing::debug!(
695 "Right index build and join took: {}ns",
696 timer.elapsed().as_nanos()
697 );
698
699 let timer = std::time::Instant::now();
702 let joined_idx_len = joined_idx.len();
703
704 for (left_col_idx, right_col_idx) in &column_mappings {
705 let mut left_col = self.data_frame.column_mut(*left_col_idx);
706 let right_col = right.data_frame.column(*right_col_idx);
707
708 for (left_indices, right_indices) in &joined_idx {
709 for right_row_idx in right_indices {
710 for left_idx in left_indices {
711 left_col[*left_idx] = right_col[*right_row_idx].clone();
712 }
713 }
714 }
715 }
716
717 let elapsed = timer.elapsed();
718 tracing::debug!(
719 "Filled {} rows in {}ms|{}s",
720 joined_idx_len,
721 elapsed.as_millis(),
722 elapsed.as_secs()
723 );
724
725 Ok(())
726 }
727
728 pub fn add_single_column<K: Into<Key>>(
732 &mut self,
733 key: K,
734 column: Array1<DataValue>,
735 ) -> Result<(), Error> {
736 let key = key.into();
737 if self.index.get_column_index(&key).is_some() {
738 return Err(Error::ColumnAlreadyExists(key));
739 }
740 if self.len() != column.len() && !self.is_empty() {
741 return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
742 }
743
744 self.index.store_key(key.clone());
745 let rows = column.len();
746 let column_index = self
747 .index
748 .get_column_index(&key)
749 .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
750 if self.is_empty() && self.index.len() == 1 {
751 self.data_frame = column.into_shape_clone((rows, 1))?;
752 assert_eq!(self.data_frame.column(column_index).len(), rows);
753 } else if self.is_empty() {
754 self.data_frame = Array2::default((column.len(), self.index.len() - 1));
755 self.data_frame.push_column(column.view())?;
756 assert_eq!(self.data_frame.column(column_index).len(), rows);
757 } else {
758 self.data_frame.push_column(column.view())?;
759 }
760 assert_eq!(self.data_frame.column(column_index).len(), rows);
761
762 Ok(())
763 }
764 pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
768 if self.check_or_init_frame(&other)?.should_end() {
769 return Ok(());
770 }
771
772 self.extend_columns_from_other(&other)?;
773 for (idx, key) in other.index.get_keys().iter().enumerate() {
774 if let Some(index) = self.index.get_column_index(key) {
775 trace!("Other array = {:?}", other.data_frame.dim());
776 if other.data_frame.dim() == (0, 0) {
777 self.data_frame.column_mut(index).fill(DataValue::Null);
778 continue;
779 }
780 let arr = other.data_frame.column(idx);
781 trace!(
782 "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
783 self.data_frame.nrows(),
784 arr.len()
785 );
786 if arr.len() != self.data_frame.nrows() {
787 self.data_frame.column_mut(index).fill(DataValue::Null);
788 } else {
789 self.data_frame.column_mut(index).assign(&arr);
790 }
791 }
792 }
793 Ok(())
794 }
795
796 pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
801 if self.check_or_init_frame(&other)?.should_end() {
802 return Ok(());
803 }
804 if other.data_frame.nrows() != 1 {
805 return Err(Error::CannotBroadcast);
806 }
807
808 let all_keys = self.index.get_keys().to_vec();
810 let other_keys: Vec<_> = other
811 .index
812 .get_keys()
813 .iter()
814 .filter(|k| self.index.get_column_index(k).is_none())
815 .cloned()
816 .collect();
817
818 for key in &other_keys {
820 self.index.store_key(key.clone());
821 }
822
823 let nrows = self.len();
824 let ncols = self.index.len();
825 let ncols_old = all_keys.len();
826
827 let mut data = Vec::with_capacity(nrows * ncols);
829
830 for row_idx in 0..nrows {
832 for col_idx in 0..ncols_old {
834 data.push(self.data_frame[(row_idx, col_idx)].clone());
835 }
836 for key in &other_keys {
838 if let Some(other_idx) = other.index.get_column_index(key) {
839 data.push(other.data_frame[(0, other_idx)].clone());
840 }
841 }
842 }
843
844 self.data_frame = Array2::from_shape_vec((nrows, ncols), data)
845 .map_err(|e| Error::UnknownError(format!("Broadcast reshape failed: {e}")))?;
846
847 Ok(())
848 }
849
850 pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
855 if self.check_or_init_frame(&other)?.should_end() {
856 return Ok(());
857 }
858 for other_key in other.keys() {
861 if self.index.get_column_index(other_key).is_none() {
862 self.index.store_key(other_key.clone());
863 } else {
864 self.index.store_key(Key::new(
865 format!("{}-{}", other_key, other_key.id()).as_str(),
866 other_key.ctype,
867 ));
868 }
869 }
870 let max_rows = self.len() * other.len();
871 let ncols = self.index.len();
872 let mut new_df = Array2::default((max_rows, ncols));
874
875 let mut cur_idx = 0;
876 for cur_row in self.data_frame.rows() {
877 for other_row in other.data_frame.rows() {
878 new_df
879 .slice_mut(s![cur_idx, ..])
880 .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
881 cur_idx += 1;
882 }
883 }
884 self.data_frame = new_df;
885 Ok(())
886 }
887
888 pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
896 use JoinBy::*;
897 match &join_type.join_type {
898 AddColumns => self.add_columns(right),
899 Replace => self.replace(right),
900 Extend => self.extend(right),
901 Broadcast => self.broadcast(right),
902 CartesianProduct => self.cartesian_product(right),
903 JoinById(join) => self.join_by_id_inner(right, &join.keys),
904 }
905 }
906
907 pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
912 self.index
913 .get_column_index(key)
914 .map(|x| self.data_frame.column(x))
915 }
916
917 pub fn get_single_column_typed<T: Extract>(&self, key: &Key) -> Option<Array1<T>> {
930 self.get_single_column(key)
931 .map(|x| x.mapv(|x| T::extract(&x)))
932 }
933
934 pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
940 let index = self
941 .index
942 .get_column_index(key)
943 .ok_or(Error::NotFound(key.clone()))?;
944 let column = self.data_frame.column(index);
945 let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
946 tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
947 data_with_index.sort_by(
948 |(a_idx, a_val), (b_idx, b_val)| match a_val.partial_cmp(b_val) {
949 Some(ordering) => ordering.then_with(|| a_idx.cmp(b_idx)),
950 None => {
951 let a_null = matches!(a_val, DataValue::Null);
952 let b_null = matches!(b_val, DataValue::Null);
953 match (a_null, b_null) {
954 (true, true) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
955 (true, false) => std::cmp::Ordering::Greater.then_with(|| a_idx.cmp(b_idx)),
956 (false, true) => std::cmp::Ordering::Less.then_with(|| a_idx.cmp(b_idx)),
957 (false, false) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
958 }
959 }
960 },
961 );
962
963 tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
964 let indicies = data_with_index
965 .into_iter()
966 .map(|(idx, _)| idx)
967 .collect::<Vec<_>>();
968
969 Ok(sorted_df::SortedDataFrame::new(self, indicies))
970 }
971
972 pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
977 let mut final_indices = Vec::new();
978 let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
979 for rule in &filter.rules {
980 final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
981 }
982
983 final_indices.sort_unstable();
984 final_indices.dedup();
985 let mut new_df = ColumnFrame::new(
986 self.index.clone(),
987 Array2::default((final_indices.len(), self.index.len())),
988 );
989 final_indices
990 .iter()
991 .enumerate()
992 .for_each(|(cur_idx, row_idx)| {
993 new_df
994 .data_frame
995 .slice_mut(s![cur_idx, ..])
996 .assign(&self.data_frame.slice(s![*row_idx, ..]));
997 });
998
999 Ok(new_df)
1000 }
1001}
1002
1003pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
1007 let width = source.len();
1008 let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
1009 let height = flattened.len() / width;
1010 Ok(flattened.into_shape_with_order((width, height))?)
1011}
1012#[macro_export]
1013macro_rules! df {
1014 ($($everything:tt)*) => {
1015 $crate::DataFrame::new($crate::column_frame!($($everything)*))
1016 };
1017}
1018
1019#[macro_export]
1020macro_rules! column_frame {
1021 ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
1023 ($($key:expr => vec![$($value:expr),*]),*) => {
1025 $crate::column_frame!($($key => [$($value),*]),*)
1026 };
1027 ($($key:expr => [$($value:expr),*]),*) => {
1029 {
1030 let data = ::ndarray::arr2(&[$(
1031 [$($value.into(),)*],
1032 )*]);
1033
1034 let _keys = vec![$($key.into(),)*];
1035
1036 $crate::ColumnFrame::new(
1037 $crate::KeyIndex::new(_keys),
1038 data.reversed_axes()
1039 )
1040 }
1041 };
1042 ($($key:expr => $value:expr),*) => {
1044 {
1045 let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
1046 let _keys = vec![$($key.into(),)*];
1047
1048 $crate::ColumnFrame::new(
1049 $crate::KeyIndex::new(_keys),
1050 _data,
1051 )
1052 }
1053 };
1054}
1055
1056#[cfg(test)]
1057mod test {
1058 use crate::{filter::FilterRules, JoinById};
1059
1060 use super::*;
1061 use data_value::stdhashmap;
1062 use ndarray::ArrayView;
1063 use rstest::*;
1064 use tracing_test::traced_test;
1065
1066 #[rstest]
1067 #[case(
1068 column_frame! {
1069 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1070 "b" => [4, 5, 6],
1071 "c" => [7, 8, 9]
1072 },
1073 column_frame! {
1074 "t" => [1752001987000000u64],
1075 "b" => [5],
1076 "c" => [8]
1077 },
1078 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1079 )]
1080 #[case(
1081 column_frame! {
1082 "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
1083 "b" => [4, 5, 6],
1084 "c" => [7, 8, 9]
1085 },
1086 column_frame! {
1087 "t" => [1752001987000000f64],
1088 "b" => [5],
1089 "c" => [8]
1090 },
1091 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1092 )]
1093 #[case(
1094 column_frame! {
1095 "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
1096 "b" => [4, 5, 6],
1097 "c" => [7, 8, 9]
1098 },
1099 column_frame! {
1100 "t" => [1752001987000000i64],
1101 "b" => [5],
1102 "c" => [8]
1103 },
1104 FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1105 )]
1106 #[case(
1107 column_frame! {
1108 "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1109 "b" => [4, 5, 6],
1110 "c" => [7, 8, 9]
1111 },
1112 column_frame! {
1113 "t" => [1751001987000000u64],
1114 "b" => [4],
1115 "c" => [7]
1116 },
1117 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1118 )]
1119 #[case(
1120 column_frame! {
1121 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1122 "b" => [4, 5, 6],
1123 "c" => [7, 8, 9]
1124 },
1125 column_frame! {
1126 "t" => ["2025-07-08 18:13:07"],
1127 "b" => [4],
1128 "c" => [7]
1129 },
1130 FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1131 )]
1132 #[case(
1133 column_frame! {
1134 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1135 "b" => [4, 5, 6],
1136 "c" => [7, 8, 9]
1137 },
1138 column_frame! {
1139 "t" => [],
1140 "b" => [],
1141 "c" => []
1142 },
1143 FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
1144 )]
1145 #[case(
1146 column_frame! {
1147 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1148 "b" => [4, 5, 6],
1149 "c" => [7, 8, 9]
1150 },
1151 column_frame! {
1152 "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1153 "b" => [4, 5, 6],
1154 "c" => [7, 8, 9]
1155 },
1156 FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
1157 )]
1158 #[case(
1159 column_frame! {
1160 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1161 "b" => [4, 5, 6],
1162 "c" => [7, 8, 9]
1163 },
1164 column_frame! {
1165 "t" => [DataValue::Vec(vec![])],
1166 "b" => [5],
1167 "c" => [ 8]
1168 },
1169 FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
1170 )]
1171 #[case(
1172 column_frame! {
1173 "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1174 "b" => [4, 5, 6],
1175 "c" => [7, 8, 9]
1176 },
1177 column_frame! {
1178 "t" => [DataValue::Vec(vec![1.into()])],
1179 "b" => [6],
1180 "c" => [9]
1181 },
1182 FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
1183 )]
1184 #[case(
1185 column_frame! {
1186 "a" => [1, 2, 3],
1187 "b" => [4, 5, 6],
1188 "c" => [7, 8, 9]
1189 },
1190 column_frame! {
1191 "a" => [1, 2],
1192 "b" => [4, 5],
1193 "c" => [7, 8]
1194 },
1195 FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
1196 )]
1197 #[case(
1198 column_frame! {
1199 "a" => [1, 2, 3],
1200 "b" => [4, 5, 6],
1201 "c" => [7, 8, 9]
1202 },
1203 column_frame! {
1204 "a" => [2],
1205 "b" => [5],
1206 "c" => [8]
1207 },
1208 FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
1209 )]
1210 #[case(
1211 column_frame! {
1212 "a" => [1, 2, 3],
1213 "b" => [4, 5, 6],
1214 "c" => [7, 8, 9]
1215 },
1216 column_frame! {
1217 "a" => [],
1218 "b" => [],
1219 "c" => []
1220 },
1221 FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
1222 )]
1223 #[case(
1224 column_frame! {
1225 "a" => [1, 2, 3],
1226 "b" => [4, 5, 6],
1227 "c" => [7, 8, 9]
1228 },
1229 column_frame! {
1230 "a" => [1, 2],
1231 "b" => [4, 5],
1232 "c" => [7, 8]
1233 },
1234 FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1235 )]
1236 #[case(
1237 column_frame! {
1238 "a" => [1, 2, 3],
1239 "b" => [4, 5, 6],
1240 "c" => [7, 8, 9]
1241 },
1242 column_frame! {
1243 "a" => [2],
1244 "b" => [5],
1245 "c" => [8]
1246 },
1247 FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1248 )]
1249 #[case(
1250 column_frame! {
1251 "a" => ["abcd", "ab", "abcdefg"],
1252 "b" => [4, 5, 6],
1253 "c" => [7, 8, 9]
1254 },
1255 column_frame! {
1256 "a" => ["abcd","abcdefg"],
1257 "b" => [4, 6],
1258 "c" => [7, 9]
1259 },
1260 FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1261 )]
1262 #[case(
1263 column_frame! {
1264 "a" => [1, 2, 3],
1265 "b" => [4, 5, 6],
1266 "c" => [7, 8, 9]
1267 },
1268 column_frame! {
1269 "a" => [1],
1270 "b" => [4],
1271 "c" => [7]
1272 },
1273 FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1274 )]
1275 #[case(
1276 column_frame! {
1277 "a" => [1, 2, 3],
1278 "b" => [4, 5, 6],
1279 "c" => [7, 8, 9]
1280 },
1281 column_frame! {
1282 "a" => [2, 3],
1283 "b" => [5, 6],
1284 "c" => [8, 9]
1285 },
1286 FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1287 )]
1288 #[case(
1289 column_frame! {
1290 "a" => [1f64, 2f64, 3f64],
1291 "b" => [4, 5, 6],
1292 "c" => [7, 8, 9]
1293 },
1294 column_frame! {
1295 "a" => [1f64, 2f64],
1296 "b" => [4, 5],
1297 "c" => [7, 8]
1298 },
1299 FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1300 )]
1301 #[case(
1302 column_frame! {
1303 "a" => [1f64, 2f64, 3f64],
1304 "b" => [4i64, 5i64, 6i64],
1305 "c" => [7i64, 8i64, 9i64]
1306 },
1307 column_frame! {
1308 "a" => [1f64, 2f64],
1309 "b" => [4i64, 5i64],
1310 "c" => [7i64, 8i64]
1311 },
1312 FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1313 )]
1314 #[traced_test]
1315 fn filter_test(
1316 #[case] df: ColumnFrame,
1317 #[case] expected: ColumnFrame,
1318 #[case] filter: FilterRules,
1319 ) {
1320 let filtered = df.filter(&filter).expect("BUG: cannot filter");
1321 assert_eq!(filtered, expected);
1322 }
1323
1324 #[rstest]
1325 #[traced_test]
1326 fn test_macro() {
1327 let df = column_frame! {
1328 "a" => 1,
1329 "b" => 2,
1330 "c" => 3,
1331 "d" => 4,
1332 };
1333
1334 assert_eq!(df.len(), 1);
1335 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1336 let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1337 .expect("BUG: cannot create array");
1338 assert_eq!(df.select(None), f);
1339
1340 let df = column_frame! {
1341 "a" => [1, 2, 3],
1342 "b" => [4, 5, 6],
1343 "c" => [7, 8, 9]
1344 };
1345
1346 assert_eq!(df.len(), 3);
1347 assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1348 let f = Array2::from_shape_vec(
1349 (3, 3),
1350 vec![
1351 1.into(),
1352 4.into(),
1353 7.into(),
1354 2.into(),
1355 5.into(),
1356 8.into(),
1357 3.into(),
1358 6.into(),
1359 9.into(),
1360 ],
1361 )
1362 .expect("BUG: cannot create array");
1363 let selected = df.select(None);
1364 trace!("{selected:?}");
1365 assert_eq!(selected, f);
1366
1367 let df1 = df! {
1368 "a" => [1, 2, 3],
1369 "b" => [4, 5, 6],
1370 "c" => [7, 8, 9]
1371 };
1372
1373 let formatted = format!("{}", df);
1375 debug!("{}", formatted);
1376
1377 assert_eq!(df1, crate::DataFrame::from(df));
1378 }
1379
1380 #[rstest]
1381 #[case(
1382 column_frame! {
1383 "a" => [1, 2, 3],
1384 "b" => [4, 5, 6],
1385 "c" => [7, 8, 9]
1386 },
1387 column_frame! {
1388 "a_new" => [1, 2, 3],
1389 "b" => [4, 5, 6],
1390 "c" => [7, 8, 9]
1391 },
1392 vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1393 vec![("a", "a_new".into())]
1394 )]
1395 #[traced_test]
1396 fn rename_test(
1397 #[case] df: ColumnFrame,
1398 #[case] expected: ColumnFrame,
1399 #[case] keys: Vec<Key>,
1400 #[case] renames: Vec<(&str, Key)>,
1401 ) {
1402 let mut df = df;
1403 for (old, new) in renames {
1404 df.rename_key(old, new).expect("BUG: cannot rename key");
1405 }
1406 assert_eq!(df, expected);
1407 assert_eq!(df.keys(), keys.as_slice());
1408 }
1409
1410 #[rstest]
1411 #[case(
1412 column_frame!("a" => [1, 2, 3]),
1413 Key::new("a", crate::DataType::I32),
1414 column_frame!("a" => [1i32, 2i32, 3i32])
1415 )]
1416 #[case(
1417 column_frame!("a" => [1, 2, 3]),
1418 Key::new("a", crate::DataType::U32),
1419 column_frame!("a" => [1u32, 2u32, 3u32])
1420 )]
1421 #[case(
1422 column_frame!("a" => [1, 2, 3]),
1423 Key::new("a", crate::DataType::I64),
1424 column_frame!("a" => [1i64, 2i64, 3i64])
1425 )]
1426 #[case(
1427 column_frame!("a" => [1, 2, 3]),
1428 Key::new("a", crate::DataType::U64),
1429 column_frame!("a" => [1u64, 2u64, 3u64])
1430 )]
1431 #[case(
1432 column_frame!("a" => [1, 2, 3]),
1433 Key::new("a", crate::DataType::F64),
1434 column_frame!("a" => [1f64, 2f64, 3f64])
1435 )]
1436 #[case(
1437 column_frame!("a" => [1, 2, 3]),
1438 Key::new("a", crate::DataType::F32),
1439 column_frame!("a" => [1f32, 2f32, 3f32])
1440 )]
1441 fn test_try_fix_dtype(
1447 #[case] mut df: ColumnFrame,
1448 #[case] key: Key,
1449 #[case] expected: ColumnFrame,
1450 ) {
1451 assert!(df.try_fix_column_by_key(&key).is_ok());
1452 assert_eq!(
1453 df.select(Some(&[key.clone()])),
1454 expected.select(Some(&[key.clone()]))
1455 );
1456 }
1457
1458 #[fixture]
1459 fn unknown_df() -> ColumnFrame {
1460 let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1461
1462 hm.insert("a".into(), vec![1u32.into()]);
1463 hm.insert("b".into(), vec![3i64.into()]);
1464 hm.insert("c".into(), vec![1f64.into()]);
1465 hm.insert("d".into(), vec![1u64.into()]);
1466
1467 hm.into()
1468 }
1469 #[rstest]
1470 #[case(stdhashmap!(
1471 "a" => crate::DataType::U32,
1472 "b" => crate::DataType::I64,
1473 "c" => crate::DataType::F64,
1474 "d" => crate::DataType::U64)
1475 )]
1476 fn test_try_fix_dtype_unknown(
1477 mut unknown_df: ColumnFrame,
1478 #[case] dtypes: HashMap<String, crate::DataType>,
1479 ) {
1480 for dtype in dtypes.iter() {
1481 let t: &Key = unknown_df
1482 .keys()
1483 .iter()
1484 .find(|x| x.name() == dtype.0)
1485 .unwrap();
1486 assert_ne!(t.ctype, crate::DataType::Unknown);
1487 }
1488 assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1489 for dtype in dtypes.iter() {
1490 let t: &Key = unknown_df
1491 .keys()
1492 .iter()
1493 .find(|x| x.name() == dtype.0)
1494 .unwrap();
1495 assert_eq!(t.ctype, *dtype.1);
1496 assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1497 }
1498 assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1499 }
1500
1501 #[rstest]
1502 #[case(
1503 column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1504 Key::new("a", crate::DataType::F32),
1505 column_frame!("a" => [1f32, 2f32, 3f32])
1506 )]
1507 #[traced_test]
1508 fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1509 assert!(df.try_fix_dtype().is_ok());
1510 assert_eq!(
1511 df.select(Some(&[key.clone()])),
1512 expected.select(Some(&[key]))
1513 )
1514 }
1515
1516 #[rstest]
1517 #[traced_test]
1518 fn test_not_key_fix() {
1519 let mut cf = column_frame!("a" => [1]);
1520 let non_existing = Key::new("b", crate::DataType::I32);
1521 assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1522 }
1523
1524 #[rstest]
1525 #[case(
1526 column_frame! {
1527 "a" => [1, 2, 3],
1528 "b" => [4, 5, 6],
1529 "c" => [7, 8, 9]
1530 },
1531 vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1532 vec![("a", "a_alias")]
1533 )]
1534 #[traced_test]
1535 fn alias_test(
1536 #[case] df: ColumnFrame,
1537 #[case] keys: Vec<Key>,
1538 #[case] aliases: Vec<(&str, &str)>,
1539 ) {
1540 let mut df = df;
1541 for (old, new) in aliases {
1542 df.add_alias(old, new).expect("BUG: cannot rename key");
1543 }
1544 let origin_keys = df.keys().to_vec();
1545 let selected_aliases = df.select(Some(keys.as_slice()));
1546 let selected = df.select(Some(origin_keys.as_slice()));
1547 assert_eq!(selected, selected_aliases);
1548 }
1549
1550 #[rstest]
1551 #[traced_test]
1552 fn test_mut_view() {
1553 let data = vec![
1554 DataValue::from(1f64),
1555 DataValue::from(4f32),
1556 DataValue::from(2f64),
1557 DataValue::from(f32::NAN),
1558 DataValue::from(f64::NAN),
1559 DataValue::from(f32::INFINITY),
1560 ];
1561 let keys: Vec<Key> = vec!["a".into(), "b".into()];
1562
1563 let index = KeyIndex::new(keys.clone());
1564 let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1565 let mut df = ColumnFrame::new(index.clone(), df);
1566 df.get_mut_view().mapv_inplace(|x| match x {
1567 DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1568 DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1569 e => e,
1570 });
1571 let data = vec![
1572 DataValue::from(1f64),
1573 DataValue::from(4f32),
1574 DataValue::from(2f64),
1575 DataValue::from(0f32),
1576 DataValue::from(0f64),
1577 DataValue::from(0f32),
1578 ];
1579 let expected = ColumnFrame::new(
1580 index,
1581 Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1582 );
1583 assert_eq!(df, expected);
1584 }
1585
1586 #[rstest]
1587 #[traced_test]
1588 fn dummy_test() {
1589 let data = vec![
1590 DataValue::U32(1),
1591 DataValue::I32(2),
1592 DataValue::I64(3),
1593 DataValue::U64(4),
1594 ];
1595
1596 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1597
1598 let index = KeyIndex::new(keys.clone());
1599 let mut data_frame = Array2::default((1, keys.len()));
1600 for (idx, entry) in data.iter().enumerate() {
1601 data_frame
1602 .column_mut(idx)
1603 .assign(&ArrayView::from(&[entry.clone()]));
1604 }
1605
1606 let frame = ColumnFrame::new(index, data_frame);
1607 assert_eq!(
1608 frame.get_by_row_index(&"a".into(), 0),
1609 Some(&DataValue::U32(1))
1610 );
1611 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1612 assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1613 assert_eq!(
1614 frame.select(Some(&["a".into(), "b".into()])),
1615 Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1616 .expect("BUG: cannot create array")
1617 );
1618 }
1619
1620 #[rstest]
1621 #[traced_test]
1622 fn dummy_test_multiple_rows() {
1623 let data = vec![
1624 DataValue::U32(1),
1625 DataValue::I32(2),
1626 DataValue::I64(3),
1627 DataValue::U64(4),
1628 DataValue::U32(12),
1629 DataValue::I32(22),
1630 DataValue::I64(32),
1631 DataValue::U64(42),
1632 ];
1633
1634 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1635
1636 let index = KeyIndex::new(keys.clone());
1637 let data_frame =
1638 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1639
1640 let frame = ColumnFrame::new(index, data_frame);
1641 assert_eq!(
1642 frame.get_by_row_index(&"a".into(), 0),
1643 Some(&DataValue::U32(1))
1644 );
1645 assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1646 assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1647 let arr = Array2::from_shape_vec(
1648 (2, 2),
1649 vec![
1650 DataValue::U32(1),
1651 DataValue::I32(2),
1652 DataValue::U32(12),
1653 DataValue::I32(22),
1654 ],
1655 )
1656 .expect("BUG: cannot create array");
1657 trace!("{arr:?}");
1658 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1659 }
1660
1661 #[rstest]
1662 #[traced_test]
1663 fn dummy_test_multiple_rows_push() {
1664 let data = vec![
1665 DataValue::U32(1),
1666 DataValue::I32(2),
1667 DataValue::I64(3),
1668 DataValue::U64(4),
1669 DataValue::U32(12),
1670 DataValue::I32(22),
1671 DataValue::I64(32),
1672 DataValue::U64(42),
1673 ];
1674 let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1675
1676 let index = KeyIndex::new(keys.clone());
1677 let data_frame =
1678 Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1679
1680 let mut frame = ColumnFrame::new(index, data_frame);
1681 assert!(frame
1682 .push(data_value::stdhashmap!(
1683 "a" => DataValue::U32(2),
1684 "b" => DataValue::I32(3),
1685 "c" => DataValue::I64(4),
1686 "d" => DataValue::U64(5)
1687 ))
1688 .is_ok());
1689 let arr = Array2::from_shape_vec(
1690 (3, 2),
1691 vec![
1692 DataValue::U32(1),
1693 DataValue::I32(2),
1694 DataValue::U32(12),
1695 DataValue::I32(22),
1696 DataValue::U32(2),
1697 DataValue::I32(3),
1698 ],
1699 )
1700 .expect("BUG: cannot create array");
1701 trace!("{arr:?}");
1702 assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1703 let result = frame.push(data_value::stdhashmap!(
1704 "a" => DataValue::U32(34),
1705 "b" => DataValue::I32(44),
1706 "c" => DataValue::I64(54),
1707 "e" => DataValue::F32(6f32)
1708 ));
1709 assert!(result.is_ok(), "{result:?}");
1710 let arr = Array2::from_shape_vec(
1711 (4, 2),
1712 vec![
1713 DataValue::U64(4),
1714 DataValue::Null,
1715 DataValue::U64(42),
1716 DataValue::Null,
1717 DataValue::U64(5),
1718 DataValue::Null,
1719 DataValue::Null,
1720 DataValue::F32(6f32),
1721 ],
1722 )
1723 .expect("BUG: cannot create array");
1724 trace!("{arr:?}");
1725 assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1726 }
1727
1728 #[rstest]
1729 #[case(
1730 column_frame! {
1731 "group_id" => vec![1, 2],
1732 "feed_tag" => vec![3, 4]
1733 },
1734 Some(vec![Key::from("group_id")]),
1735 ndarray::array!([1.into()], [2.into()])
1736 )]
1737 #[case(
1738 column_frame! {
1739 "group_id" => vec![1, 2],
1740 "feed_tag" => vec![3, 4]
1741 },
1742 Some(vec!["group_id".into(), "feed_tag".into()]),
1743 ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1744 )]
1745 #[case(
1746 column_frame! {
1747 "group_id" => vec![1, 2],
1748 "feed_tag" => vec![3, DataValue::Null]
1749 },
1750 Some(vec!["feed_tag".into()]),
1751 ndarray::array![[3.into()], [DataValue::Null]]
1752 )]
1753 #[case(
1754 column_frame! {
1755 "group_id" => vec![1, 2],
1756 "feed_tag" => vec![1, DataValue::Null]
1757 },
1758 Some(vec!["feed_tag2".into()]),
1759 Array2::<DataValue>::default((0, 0))
1760 )]
1761 #[traced_test]
1762 fn test_select(
1763 #[case] input: ColumnFrame,
1764 #[case] keys: Option<Vec<Key>>,
1765 #[case] expected: Array2<DataValue>,
1766 ) {
1767 trace!("input={input:?}");
1768 let keys_slice = keys.as_deref();
1769 let selected = input.select(keys_slice);
1770 trace!("selected={selected:?}");
1771 assert_eq!(selected, expected);
1772 let selected = input.select_transposed(keys_slice);
1773 trace!("selected_transposed={selected:?}");
1774 assert!(selected.is_ok());
1775 assert_eq!(selected.unwrap(), expected.t());
1776 }
1777
1778 #[rstest]
1779 #[case(
1780 column_frame! {
1781 "group_id" => vec![1, 2],
1782 "feed_tag" => vec![3, 4]
1783 },
1784 Key::from("group_id"),
1785 Some(ndarray::array!(1.into(), 2.into()))
1786 )]
1787 #[case(
1788 column_frame! {
1789 "group_id" => vec![1, 2, 5, 6],
1790 "feed_tag" => vec![3, 4, 7, 8]
1791 },
1792 Key::from("group_id"),
1793 Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1794 )]
1795 #[case(
1796 column_frame! {
1797 "group_id" => vec![1, 2],
1798 "feed_tag" => vec![1, 1]
1799 },
1800 Key::from("feed_tag1"),
1801 None
1802 )]
1803 #[traced_test]
1804 fn test_select_column(
1805 #[case] input: ColumnFrame,
1806 #[case] key: Key,
1807 #[case] expected: Option<Array1<DataValue>>,
1808 ) {
1809 let selected = input.select_column(&key);
1810 trace!("selected={selected:?}");
1811 match expected {
1812 Some(expected) => {
1813 assert!(selected.is_some());
1814 assert_eq!(selected.expect("BUG: checked above"), expected);
1815 }
1816 None => assert!(selected.is_none()),
1817 }
1818 }
1819
1820 #[test]
1821 #[traced_test]
1822 fn empty_join_test() {
1823 let join = JoinRelation::add_columns();
1824 let mut column_frame = ColumnFrame::default();
1825 column_frame
1826 .add_single_column("group_id", Array1::from_vec(vec![]))
1827 .expect("BUG: cannot add column");
1828 let column_frame2 = column_frame! {
1829 "group_id" => vec![2, 1, 3],
1830 "feed_tag" => vec![1, 1, 1],
1831 "clicks" => vec![100, 10, 10],
1832 "imps" => vec![1000, 200, 200]
1833 };
1834 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1835
1836 let joined = column_frame.join(column_frame2, &join);
1837 assert!(joined.is_ok(), "{joined:?}");
1838
1839 trace!("{column_frame:?}");
1840 assert_eq!(
1841 column_frame.select(Some(&[
1842 "group_id".into(),
1843 "feed_tag".into(),
1844 "clicks".into(),
1845 "imps".into()
1846 ])),
1847 ndarray::array!(
1848 [2.into(), 1.into(), 100.into(), 1000.into()],
1849 [1.into(), 1.into(), 10.into(), 200.into()],
1850 [3.into(), 1.into(), 10.into(), 200.into()],
1851 )
1852 );
1853
1854 let mut column_frame2 = column_frame! {
1855 "feed_tag" => vec![1, 1, 1],
1856 "clicks" => vec![100, 10, 10],
1857 "imps" => vec![1000, 200, 200]
1858 };
1859 let mut column_frame = ColumnFrame::default();
1860 column_frame
1861 .add_single_column("group_id", Array1::from_vec(vec![]))
1862 .expect("BUG: cannot add column");
1863 let joined = column_frame2.join(column_frame, &join);
1864 assert!(joined.is_ok(), "{joined:?}");
1865
1866 trace!("{column_frame2:?}");
1867 assert_eq!(
1868 column_frame2.select(Some(&[
1869 "group_id".into(),
1870 "feed_tag".into(),
1871 "clicks".into(),
1872 "imps".into()
1873 ])),
1874 ndarray::array!(
1875 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1876 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1877 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1878 )
1879 );
1880
1881 let mut column_frame = ColumnFrame::default();
1882 column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1883 let joined = column_frame2.join(column_frame, &join);
1884 assert!(joined.is_ok(), "{joined:?}");
1885
1886 trace!("{column_frame2:?}");
1887 assert_eq!(
1888 column_frame2.select(Some(&[
1889 "group_id2".into(),
1890 "feed_tag".into(),
1891 "clicks".into(),
1892 "imps".into()
1893 ])),
1894 ndarray::array!(
1895 [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1896 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1897 [DataValue::Null, 1.into(), 10.into(), 200.into()],
1898 )
1899 );
1900 }
1901
1902 #[test]
1903 #[traced_test]
1904 fn join_test_multiple() {
1905 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1906 let mut column_frame = column_frame! {
1907 "group_id" => vec![1, 1, 3]
1908 };
1909 let column_frame2 = column_frame! {
1910 "group_id" => vec![2, 1, 1],
1911 "clicks" => vec![100, 10, 10],
1912 "imps" => vec![1000, 200, 200]
1913 };
1914
1915 let joined = column_frame.join(column_frame2, &join);
1916 assert!(joined.is_ok(), "{joined:?}");
1917
1918 trace!("{column_frame:?}");
1919 assert_eq!(
1920 column_frame.select(Some(&["group_id".into(), "clicks".into(), "imps".into(),])),
1921 ndarray::array!(
1922 [1.into(), 10.into(), 200.into()],
1923 [1.into(), 10.into(), 200.into()],
1924 [3.into(), DataValue::Null, DataValue::Null],
1925 )
1926 )
1927 }
1928
1929 #[test]
1930 #[traced_test]
1931 fn join_test_no_matches() {
1932 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1933 let mut column_frame = column_frame! {
1934 "group_id" => vec![DataValue::I32(1), DataValue::I32(2), DataValue::I32(3)]
1935 };
1936 let column_frame2 = column_frame! {
1937 "group_id" => vec![DataValue::I32(4), DataValue::I32(5), DataValue::I32(6)],
1938 "clicks" => vec![DataValue::I32(100), DataValue::I32(200), DataValue::I32(300)],
1939 };
1940
1941 let joined = column_frame.join(column_frame2, &join);
1942 assert!(joined.is_ok(), "{joined:?}");
1943
1944 trace!("{column_frame:?}");
1945 assert_eq!(
1946 column_frame.select(Some(&["group_id".into(), "clicks".into()])),
1947 ndarray::array!(
1948 [DataValue::I32(1), DataValue::Null],
1949 [DataValue::I32(2), DataValue::Null],
1950 [DataValue::I32(3), DataValue::Null],
1951 )
1952 )
1953 }
1954 #[test]
1955 #[traced_test]
1956 fn join_test() {
1957 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1958 "group_id".into(),
1959 "feed_tag".into(),
1960 ])));
1961 let mut column_frame = column_frame! {
1962 "group_id" => vec![1, 2, 8],
1963 "feed_tag" => vec![1, 1, 10]
1964 };
1965 let column_frame2 = column_frame! {
1966 "group_id" => vec![2, 1, 3],
1967 "feed_tag" => vec![1, 1, 1],
1968 "clicks" => vec![100, 10, 10],
1969 "imps" => vec![1000, 200, 200]
1970 };
1971 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1972
1973 let joined = column_frame.join(column_frame2, &join);
1974 assert!(joined.is_ok(), "{joined:?}");
1975
1976 trace!("{column_frame:?}");
1977 assert_eq!(
1978 column_frame.select(Some(&[
1979 "group_id".into(),
1980 "feed_tag".into(),
1981 "clicks".into(),
1982 "imps".into()
1983 ])),
1984 ndarray::array!(
1985 [1.into(), 1.into(), 10.into(), 200.into()],
1986 [2.into(), 1.into(), 100.into(), 1000.into()],
1987 [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1988 )
1989 )
1990 }
1991
1992 #[test]
1993 #[traced_test]
1994 fn join_test_with_additional() {
1995 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1996 "group_id".into(),
1997 "feed_tag".into(),
1998 ])));
1999 let mut column_frame = column_frame! {
2000 "group_id" => vec![1, 2, 8],
2001 "feed_tag" => vec![1, 1, 10],
2002 "clicked" => vec![0, 0, 1]
2003 };
2004 let column_frame2 = column_frame! {
2005 "group_id" => vec![2, 1, 3],
2006 "feed_tag" => vec![1, 1, 1],
2007 "clicks" => vec![100, 10, 10],
2008 "imps" => vec![1000, 200, 200]
2009 };
2010 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2011
2012 let joined = column_frame.join(column_frame2, &join);
2013 assert!(joined.is_ok(), "{joined:?}");
2014
2015 trace!("{column_frame:?}");
2016 assert_eq!(
2017 column_frame.select(Some(&[
2018 "group_id".into(),
2019 "feed_tag".into(),
2020 "clicks".into(),
2021 "imps".into(),
2022 "clicked".into()
2023 ])),
2024 ndarray::array!(
2025 [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
2026 [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
2027 [
2028 8.into(),
2029 10.into(),
2030 DataValue::Null,
2031 DataValue::Null,
2032 1.into()
2033 ]
2034 )
2035 )
2036 }
2037
2038 #[test]
2039 #[traced_test]
2040 fn join_test_with_additional_single() {
2041 let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
2042 "group_id".into(),
2043 "feed_tag".into(),
2044 ])));
2045 let mut column_frame = column_frame! {
2046 "group_id" => vec![1, 2, 8],
2047 "feed_tag" => vec![1, 1, 10],
2048 "clicked" => vec![0, 0, 1]
2049 };
2050 let column_frame2 = column_frame! {
2051 "a" => vec![1],
2052 "group_id" => vec![2],
2053 "feed_tag" => vec![1],
2054 "clicks" => vec![10],
2055 "imps" => vec![200]
2056 };
2057 assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2058
2059 let joined = column_frame.join(column_frame2, &join);
2060 assert!(joined.is_ok(), "{joined:?}");
2061
2062 trace!("{column_frame:?}");
2063 assert_eq!(
2064 column_frame.select(Some(&[
2065 "group_id".into(),
2066 "feed_tag".into(),
2067 "clicks".into(),
2068 "imps".into(),
2069 "clicked".into()
2070 ])),
2071 ndarray::array!(
2072 [
2073 1.into(),
2074 1.into(),
2075 DataValue::Null,
2076 DataValue::Null,
2077 0.into(),
2078 ],
2079 [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
2080 [
2081 8.into(),
2082 10.into(),
2083 DataValue::Null,
2084 DataValue::Null,
2085 1.into()
2086 ]
2087 )
2088 )
2089 }
2090
2091 #[rstest]
2092 #[traced_test]
2093 fn cartesian_product_join() {
2094 let mut df = column_frame! {
2095 "group_id" => vec![1, 2, 3],
2096 "feed_tag" => vec![1, 2, 3]
2097 };
2098 let df2 = column_frame! {
2099 "zone_id" => vec![111111, 111133],
2100 "zone_avg_ctr" => vec![0.1, 0.001]
2101 };
2102 assert!(df
2103 .join(
2104 ColumnFrame::default(),
2105 &JoinRelation::new(JoinBy::CartesianProduct)
2106 )
2107 .is_ok());
2108 let join = JoinRelation::new(JoinBy::CartesianProduct);
2109 let result = df.join(df2, &join);
2110 assert!(result.is_ok(), "{result:?}");
2111 let selected = df.select(None);
2112 trace!("{selected:?}");
2113 assert_eq!(
2114 selected,
2115 ndarray::array!(
2116 [1.into(), 1.into(), 111111.into(), 0.1.into()],
2117 [1.into(), 1.into(), 111133.into(), 0.001.into()],
2118 [2.into(), 2.into(), 111111.into(), 0.1.into()],
2119 [2.into(), 2.into(), 111133.into(), 0.001.into()],
2120 [3.into(), 3.into(), 111111.into(), 0.1.into()],
2121 [3.into(), 3.into(), 111133.into(), 0.001.into()],
2122 )
2123 );
2124
2125 let df2 = column_frame! {
2126 "zone_id" => vec![111]
2127 };
2128 let result = df.join(df2, &join);
2129 assert!(result.is_ok(), "{result:?}");
2130 let selected = df.select(None);
2131 trace!("{selected:?}");
2132 assert_eq!(
2133 selected,
2134 ndarray::array!(
2135 [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
2136 [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
2137 [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
2138 [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
2139 [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
2140 [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
2141 )
2142 );
2143 }
2144
2145 #[rstest]
2146 #[traced_test]
2147 fn broadcast_join() {
2148 let mut df = column_frame! {
2149 "group_id" => vec![1, 2, 3],
2150 "feed_tag" => vec![1, 2, 3]
2151 };
2152 let df2 = column_frame! {
2153 "zone_id" => vec![111111]
2154 };
2155 assert!(df
2156 .join(
2157 ColumnFrame::default(),
2158 &JoinRelation::new(JoinBy::Broadcast)
2159 )
2160 .is_ok());
2161 let join = JoinRelation::new(JoinBy::Broadcast);
2162 assert!(df.join(df2, &join).is_ok());
2163 let selected = df.select(None);
2164 trace!("{selected:?}");
2165 assert_eq!(
2166 selected,
2167 ndarray::array!(
2168 [1.into(), 1.into(), 111111.into()],
2169 [2.into(), 2.into(), 111111.into()],
2170 [3.into(), 3.into(), 111111.into()]
2171 )
2172 );
2173 }
2174 #[rstest]
2175 #[traced_test]
2176 fn merge_test() {
2177 let mut df = column_frame! {
2178 "group_id" => vec![1, 2, 3],
2179 "feed_tag" => vec![1, 2, 3]
2180 };
2181 let df2 = column_frame! {
2182 "group_id" => vec![11, 21, 31],
2183 "feed_tag" => vec![12, 22, 32]
2184 };
2185
2186 let join = JoinRelation::new(JoinBy::Replace);
2187 assert!(df.join(df2, &join).is_ok());
2188 let selected = df.select(None);
2189 trace!("{selected:?}");
2190 assert_eq!(
2191 selected,
2192 ndarray::array!(
2193 [11.into(), 12.into()],
2194 [21.into(), 22.into()],
2195 [31.into(), 32.into()]
2196 )
2197 );
2198 }
2199
2200 #[rstest]
2201 #[traced_test]
2202 fn extend_test() {
2203 let mut df = column_frame! {
2204 "group_id" => vec![1, 2, 3],
2205 "feed_tag" => vec![1, 2, 3]
2206 };
2207 let df2 = column_frame! {
2208 "group_id" => vec![11, 21, 31],
2209 "feed_tag" => vec![5, 6, 7]
2210 };
2211 assert!(df
2212 .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
2213 .is_ok());
2214
2215 let join = JoinRelation::new(JoinBy::Extend);
2216 assert!(df.join(df2, &join).is_ok());
2217 let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
2218 trace!("{selected:?}");
2219 assert_eq!(
2220 selected,
2221 ndarray::array!(
2222 [1.into(), 1.into()],
2223 [2.into(), 2.into()],
2224 [3.into(), 3.into()],
2225 [5.into(), 11.into()],
2226 [6.into(), 21.into()],
2227 [7.into(), 31.into()]
2228 )
2229 );
2230 let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
2231 trace!("{as_map:?}");
2232 assert_eq!(
2233 as_map,
2234 stdhashmap!(
2235 "feed_tag" => vec![1, 2, 3, 5, 6, 7],
2236 "group_id" => vec![1, 2, 3, 11, 21, 31]
2237 )
2238 );
2239
2240 let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
2241 trace!("{as_map:?}");
2242 assert_eq!(as_map, HashMap::default());
2243 }
2244
2245 #[rstest]
2246 #[traced_test]
2247 fn extend_test_with_non_existing_cols() {
2248 let mut df = column_frame! {
2249 "group_id" => vec![1, 2, 3],
2250 "feed_tag" => vec![1, 2, 3]
2251 };
2252 let mut df2 = column_frame! {
2253 "group_id" => vec![11, 21, 31],
2254 "feed_tag" => vec![5, 6, 7],
2255 "clicks" => vec![100, 200, 300],
2256 "impressions" => vec![1000, 2000, 3000]
2257 };
2258 let df_bckp = df.clone();
2259 let join = JoinRelation::new(JoinBy::Extend);
2260 assert!(df.join(df2.clone(), &join).is_ok());
2261 let selected = df.select(None);
2262 trace!("{selected:?}");
2263 assert_eq!(
2264 selected,
2265 ndarray::array!(
2266 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2267 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2268 [3.into(), 3.into(), DataValue::Null, DataValue::Null],
2269 [11.into(), 5.into(), 100.into(), 1000.into()],
2270 [21.into(), 6.into(), 200.into(), 2000.into()],
2271 [31.into(), 7.into(), 300.into(), 3000.into()]
2272 )
2273 );
2274 let join = JoinRelation::new(JoinBy::Extend);
2275 let r = df2.join(df_bckp, &join);
2276 assert!(r.is_ok(), "{r:?}");
2277 let selected = df2.select(None);
2278 trace!("{selected:?}");
2279 assert_eq!(
2280 selected,
2281 ndarray::array!(
2282 [11.into(), 5.into(), 100.into(), 1000.into()],
2283 [21.into(), 6.into(), 200.into(), 2000.into()],
2284 [31.into(), 7.into(), 300.into(), 3000.into()],
2285 [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2286 [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2287 [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2288 )
2289 );
2290 }
2291
2292 #[rstest]
2293 #[traced_test]
2294 fn extend_test_with_non_existing_cols_wrong_order() {
2295 let mut df = column_frame! {
2296 "group_id" => vec![1, 2, 3],
2297 "feed_tag" => vec![1, 2, 3]
2298 };
2299 let df2 = column_frame! {
2300 "feed_tag" => vec![5, 6, 7],
2301 "group_id" => vec![11, 21, 31]
2302 };
2303 let join = JoinRelation::new(JoinBy::Extend);
2304 let err = df.join(df2, &join);
2305 assert!(err.is_ok(), "{err:?}");
2306 }
2307
2308 #[rstest]
2309 #[traced_test]
2310 fn test_replace_not_compatible() {
2311 let mut df = column_frame! {
2312 "group_id" => vec![1, 2, 3],
2313 "feed_tag" => vec![1, 2, 3]
2314 };
2315 let df2 = column_frame! {
2316 "feed_tag" => vec![5, 6],
2317 "group_id" => vec![11, 21]
2318 };
2319 let join = JoinRelation::new(JoinBy::Replace);
2320 let err = df.join(df2, &join);
2321 assert!(err.is_err(), "{err:?}");
2322 let empty = ColumnFrame::default();
2323 let err = df.join(empty, &join);
2324 assert!(err.is_ok(), "{err:?}");
2325 }
2326
2327 #[rstest]
2328 #[traced_test]
2329 fn test_different_data() {
2330 let mut df = column_frame! {
2331 "group_id" => vec![1, 2, 3],
2332 "feed_tag" => vec![1, 2, 3]
2333 };
2334 let df2 = column_frame! {
2335 "group_id" => vec![11, 21],
2336 "a" => vec![5, 6]
2337 };
2338 let join = JoinRelation::new(JoinBy::Extend);
2339 let err = df.join(df2, &join);
2340 assert!(err.is_ok(), "{err:?}");
2341 println!("{df:?}");
2342 let expected_df = ColumnFrame::new(
2343 KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2344 ndarray::array!(
2345 [1.into(), 1.into(), DataValue::Null],
2346 [2.into(), 2.into(), DataValue::Null],
2347 [3.into(), 3.into(), DataValue::Null],
2348 [11.into(), DataValue::Null, 5.into()],
2349 [21.into(), DataValue::Null, 6.into()]
2350 ),
2351 );
2352 assert_eq!(df, expected_df)
2353 }
2354
2355 #[rstest]
2356 #[traced_test]
2357 fn serde_column_frame() {
2358 let df = column_frame! {
2359 "group_id" => vec![1u64, 2u64, 3u64],
2360 "feed_tag" => vec![1u64, 2u64, 3u64]
2361 };
2362 let key_idx = df.index.clone();
2363 let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2364 let deserialized: KeyIndex =
2365 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2366 assert_eq!(key_idx, deserialized);
2367 assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2368 let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2369 let deserialized: ColumnFrame =
2370 serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2371 assert_eq!(df, deserialized);
2372 }
2373
2374 #[rstest]
2375 #[traced_test]
2376 fn update_value() {
2377 let mut df = column_frame! {
2378 "group_id" => vec![1, 2, 3],
2379 "feed_tag" => vec![1, 2, 3]
2380 };
2381 let group_id: Key = "group_id".into();
2382 let v = df.get_mut_by_row_index(&group_id, 1);
2383 assert!(v.is_some());
2384 let v = v.unwrap();
2385 assert_eq!(v, &DataValue::I32(2));
2386 *v = DataValue::U64(22);
2387 let v = df.get_by_row_index(&group_id, 1);
2388 assert!(v.is_some());
2389 let v = v.unwrap();
2390 assert_eq!(v, &DataValue::U64(22));
2391
2392 assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2393 }
2394
2395 #[rstest]
2396 fn get_single_column_typed_f64() {
2397 let df = column_frame! {
2398 "a" => [1i32, 2i32, 3i32],
2399 "b" => [10u64, 20u64, 30u64]
2400 };
2401 let key: Key = "a".into();
2402 let col = df.get_single_column_typed::<f64>(&key).unwrap();
2403 assert_eq!(col, ndarray::arr1(&[1.0, 2.0, 3.0]));
2404 }
2405
2406 #[rstest]
2407 fn get_single_column_typed_i64() {
2408 let df = column_frame! {
2409 "x" => [10u32, 20u32, 30u32]
2410 };
2411 let key: Key = "x".into();
2412 let col = df.get_single_column_typed::<i64>(&key).unwrap();
2413 assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
2414 }
2415
2416 #[rstest]
2417 fn get_single_column_typed_string() {
2418 let df = column_frame! {
2419 "name" => ["alice", "bob", "carol"]
2420 };
2421 let key: Key = "name".into();
2422 let col = df.get_single_column_typed::<String>(&key).unwrap();
2423 assert_eq!(
2424 col,
2425 ndarray::arr1(&["alice".to_string(), "bob".to_string(), "carol".to_string()])
2426 );
2427 }
2428
2429 #[rstest]
2430 fn get_single_column_typed_bool() {
2431 let df = column_frame! {
2432 "flag" => [1i32, 0i32, 1i32]
2433 };
2434 let key: Key = "flag".into();
2435 let col = df.get_single_column_typed::<bool>(&key).unwrap();
2436 assert_eq!(col, ndarray::arr1(&[true, false, true]));
2437 }
2438
2439 #[rstest]
2440 fn get_single_column_typed_missing_key_returns_none() {
2441 let df = column_frame! {
2442 "a" => [1, 2, 3]
2443 };
2444 let missing: Key = "nonexistent".into();
2445 assert!(df.get_single_column_typed::<f64>(&missing).is_none());
2446 }
2447
2448 #[rstest]
2449 fn get_single_column_typed_numeric_coercion_from_mixed() {
2450 let df = column_frame! {
2451 "vals" => [1.5f64, 2.7f64, 3.9f64]
2452 };
2453 let key: Key = "vals".into();
2454 let col = df.get_single_column_typed::<i32>(&key).unwrap();
2456 assert_eq!(col, ndarray::arr1(&[1i32, 2i32, 3i32]));
2457 }
2458
2459 #[rstest]
2460 fn get_single_column_typed_selects_correct_column() {
2461 let df = column_frame! {
2462 "x" => [1, 2, 3],
2463 "y" => [10, 20, 30],
2464 "z" => [100, 200, 300]
2465 };
2466 let key: Key = "y".into();
2467 let col = df.get_single_column_typed::<i64>(&key).unwrap();
2468 assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
2469 }
2470
2471 #[rstest]
2472 fn get_single_column_typed_u64_identity() {
2473 let df = column_frame! {
2474 "id" => [100u64, 200u64, 300u64]
2475 };
2476 let key: Key = "id".into();
2477 let col = df.get_single_column_typed::<u64>(&key).unwrap();
2478 assert_eq!(col, ndarray::arr1(&[100u64, 200u64, 300u64]));
2479 }
2480
2481 #[rstest]
2482 fn get_single_column_typed_single_row() {
2483 let df = column_frame! {
2484 "x" => [42i32]
2485 };
2486 let key: Key = "x".into();
2487 let col = df.get_single_column_typed::<f64>(&key).unwrap();
2488 assert_eq!(col, ndarray::arr1(&[42.0f64]));
2489 }
2490
2491 #[rstest]
2492 fn get_single_column_typed_empty_frame() {
2493 let df = ColumnFrame::default();
2494 let key: Key = "x".into();
2495 let col = df.get_single_column_typed::<f64>(&key);
2496 assert!(col.is_none());
2497 }
2498
2499 #[rstest]
2500 fn select_typed_all_columns() {
2501 let df = column_frame! {
2502 "a" => [1i32, 2i32],
2503 "b" => [3i32, 4i32]
2504 };
2505 let result = df.select_typed::<f64>(None);
2506 assert_eq!(result.nrows(), 2);
2507 assert_eq!(result.ncols(), 2);
2508 assert_eq!(result[[0, 0]], 1.0);
2509 assert_eq!(result[[0, 1]], 3.0);
2510 assert_eq!(result[[1, 0]], 2.0);
2511 assert_eq!(result[[1, 1]], 4.0);
2512 }
2513
2514 #[rstest]
2515 fn select_typed_subset_of_columns() {
2516 let df = column_frame! {
2517 "a" => [10u64, 20u64],
2518 "b" => [30u64, 40u64],
2519 "c" => [50u64, 60u64]
2520 };
2521 let keys: Vec<Key> = vec!["a".into(), "c".into()];
2522 let result = df.select_typed::<i64>(Some(&keys));
2523 assert_eq!(result.nrows(), 2);
2524 assert_eq!(result.ncols(), 2);
2525 assert_eq!(result[[0, 0]], 10i64);
2526 assert_eq!(result[[0, 1]], 50i64);
2527 assert_eq!(result[[1, 0]], 20i64);
2528 assert_eq!(result[[1, 1]], 60i64);
2529 }
2530
2531 #[rstest]
2532 fn select_typed_nonexistent_keys_returns_empty() {
2533 let df = column_frame! {
2534 "a" => [1i32, 2i32]
2535 };
2536 let keys: Vec<Key> = vec!["z".into()];
2537 let result = df.select_typed::<f64>(Some(&keys));
2538 assert_eq!(result.shape(), &[0, 0]);
2539 }
2540
2541 #[rstest]
2542 fn select_typed_string_extraction() {
2543 let df = column_frame! {
2544 "name" => ["hello", "world"]
2545 };
2546 let result = df.select_typed::<String>(None);
2547 assert_eq!(result[[0, 0]], "hello");
2548 assert_eq!(result[[1, 0]], "world");
2549 }
2550
2551 #[rstest]
2552 fn select_typed_matches_manual_mapv() {
2553 let df = column_frame! {
2554 "x" => [1i32, 2i32, 3i32],
2555 "y" => [4i32, 5i32, 6i32]
2556 };
2557 let typed = df.select_typed::<f64>(None);
2558 let manual = df.select(None).mapv(|v| f64::extract(&v));
2559 assert_eq!(typed, manual);
2560 }
2561}