trs_dataframe/dataframe/
column_store.rs

1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16/// [`ColumnFrame`] is used to store the data for the candidates
17/// The data is stored in the [`Array2`] with the [`DataValue`] values
18/// The data is stored in the columns and the columns are indexed by the [`KeyIndex`]
19/// The [`KeyIndex`] is used to access the data by the column [`Key`]
20/// Memory layout is same like in [`ndarray`] - the data is stored in the row-major order
21#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
22pub struct ColumnFrame {
23    pub index: KeyIndex,
24    pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28    Continue,
29    End,
30}
31
32impl Continue {
33    pub fn should_end(&self) -> bool {
34        matches!(self, Self::End)
35    }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        // Display keys and and indices
43        write!(f, "\n|")?;
44
45        for key in &self.index.keys {
46            write!(f, " {key} |")?;
47        }
48
49        if self.index.is_empty() {
50            writeln!(f, "|")?;
51        }
52
53        // Display type for each key
54        if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55            write!(f, "\n|")?;
56            for value in row.iter() {
57                // Display types during first iteration
58                write!(f, " {:10?} |", crate::detect_dtype(value))?;
59            }
60            writeln!(f)?;
61        }
62
63        writeln!(f, "---")?;
64
65        // Display items, limit output to 256 rows
66        for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67            write!(f, "|")?;
68
69            for value in row.iter() {
70                // Display types during first iteration
71                write!(f, " {value} |")?;
72            }
73            writeln!(f)?;
74
75            if n >= 256 {
76                writeln!(f, "... (dataframe is too long)")?;
77                break;
78            }
79        }
80
81        writeln!(f, "---")
82    }
83}
84pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
85    let x = &item;
86    match dtype {
87        crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
88        crate::DataType::U32 => DataValue::U32(u32::extract(x)),
89        crate::DataType::I32 => DataValue::I32(i32::extract(x)),
90        crate::DataType::U64 => DataValue::U64(u64::extract(x)),
91        crate::DataType::I64 => DataValue::I64(i64::extract(x)),
92        crate::DataType::F32 => DataValue::F32(f32::extract(x)),
93        crate::DataType::U128 => DataValue::U128(u128::extract(x)),
94        crate::DataType::I128 => DataValue::I128(i128::extract(x)),
95        crate::DataType::F64 => DataValue::F64(f64::extract(x)),
96        crate::DataType::U8 => DataValue::U8(u8::extract(x)),
97        crate::DataType::String => DataValue::String(String::extract(x).into()),
98        crate::DataType::Bytes => item,
99        crate::DataType::Map => item,
100        crate::DataType::Vec => item,
101        crate::DataType::Unknown => {
102            if matches!(item, DataValue::Null) {
103                return item;
104            }
105            let dtype = crate::detect_dtype(&item);
106            // this situation should not ever happen
107            if matches!(dtype, crate::DataType::Unknown) {
108                tracing::error!("Unknown datatype {dtype:?} - {item:?}");
109                return item;
110            }
111            convert_data_value(item, dtype)
112        }
113    }
114}
115pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
116    convert_data_value(item, key.ctype)
117}
118impl ColumnFrame {
119    pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
120        Self {
121            index: index.into(),
122            data_frame,
123        }
124    }
125
126    pub fn keys(&self) -> &[Key] {
127        self.index.get_keys()
128    }
129
130    pub fn len(&self) -> usize {
131        self.data_frame.nrows()
132    }
133
134    pub fn is_empty(&self) -> bool {
135        self.data_frame.nrows() == 0
136    }
137
138    pub fn shrink(&mut self) {
139        //fixme
140    }
141
142    /// This method will try to fix dtype based on data stored in each column. If dtype is [`crate::DataType::Unknown`]
143    /// this method will replace dtype for "correct" one based on [`DataValue`].
144    /// NOTE: flag `force` will enforce this dtype even if dtype is known
145    pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
146        let mut keys = self.index.keys.clone();
147        for key in keys.iter_mut() {
148            if !force && matches!(key.ctype, crate::DataType::Unknown) {
149                let column = self
150                    .get_single_column(key)
151                    .ok_or_else(|| Error::EmptyData)?;
152                let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
153                key.ctype = dtype;
154            } else if force {
155                let column = self
156                    .get_single_column(key)
157                    .ok_or_else(|| Error::EmptyData)?;
158                let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
159                key.ctype = dtype;
160            }
161        }
162        self.index.keys = keys;
163
164        Ok(())
165    }
166    pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
167        let mut errors = vec![];
168        let keys = self.index.keys.clone();
169        for key in keys {
170            tracing::trace!("key: {key:?}- {:?}", key.ctype);
171            if let Err(e) = self.try_fix_column_by_key(&key) {
172                errors.push((key, e.to_string()));
173            }
174        }
175        if errors.is_empty() {
176            Ok(())
177        } else {
178            Err(Error::CastFailed(errors))
179        }
180    }
181
182    pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
183        let idx = self
184            .index
185            .get_column_index(key)
186            .ok_or(Error::MissingField(format!("{key}").into()))?;
187        let mut col = self.data_frame.column_mut(idx);
188
189        col.mapv_inplace(|item| convert_dv_to_dtype(key, item));
190        Ok(())
191    }
192
193    pub fn enforce_dtype_for_column(
194        &mut self,
195        key: &str,
196        dtype: crate::DataType,
197    ) -> Result<(), Error> {
198        if let Some(idx) = self.index.get_column_index_by_name(key) {
199            let new_key = Key::new(key, dtype);
200            let mut col = self.data_frame.column_mut(idx);
201
202            col.mapv_inplace(|item| convert_dv_to_dtype(&new_key, item));
203            self.index.rename_key(key, new_key)?;
204            Ok(())
205        } else {
206            Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
207        }
208    }
209
210    pub fn get_mut_view(&mut self) -> ArrayViewMut2<'_, DataValue> {
211        self.data_frame.view_mut()
212    }
213
214    pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
215        self.index.rename_key(old, new)
216    }
217
218    pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
219        self.index.add_alias(key, alias)
220    }
221
222    /// Selects the data from the [`ColumnFrame`] by the given keys
223    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
224    /// The data is returned as the [`Vec<Vec<DataValue>>`]
225    /// If the keys are not found, the empty [`Vec<Vec<DataValue>>`] is returned
226    /// Returns the data in the column-major order
227    pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
228        let selected = self.select(Some(keys));
229        let mut result = Vec::with_capacity(selected.nrows());
230        for row in selected.rows() {
231            let mut r = Vec::with_capacity(selected.ncols());
232            for value in row.iter() {
233                r.push(D::extract(value));
234            }
235            result.push(r);
236        }
237        result
238    }
239
240    /// Selects the data from the [`ColumnFrame`] by the given keys
241    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
242    /// The data is returned as the [`Array2`] with the [`DataValue`] values
243    /// If the keys are not found, the empty [`Array2`] is returned
244    /// Returns the [`Array2`] with the data in the column-major order
245    pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
246        let keys = keys.unwrap_or_else(|| self.index.get_keys());
247        let key_indexes = self.index.select(keys);
248        if key_indexes.is_empty() {
249            return Ok(Array2::default((0, 0)));
250        }
251        let data_vec: Vec<Array1<DataValue>> = key_indexes
252            .indexes()
253            .iter()
254            .map(|x| self.data_frame.column(*x).to_owned())
255            .collect();
256        to_array2(data_vec)
257    }
258
259    /// Selects whole column from the [`ColumnFrame`] by the given key
260    /// If the key is not found, the None is returned
261    /// If the key is found, the [`ArrayView1`] with the [`DataValue`] values is returned
262    pub fn select_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
263        self.index
264            .get_column_index(key)
265            .map(|x| self.data_frame.column(x))
266    }
267
268    pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
269    where
270        F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
271    {
272        func(keys, self)
273    }
274
275    /// Validates the access to the entry by the given column [`Key`] and row index
276    /// If the column is not found, an error is returned [`Error::NotFound`]
277    /// If the row index is out of bounds, an error is returned [`Error::IndexOutOfRange`]
278    /// Otherwise, the column index is returned
279    pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
280        if row_index >= self.data_frame.nrows() {
281            return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
282        }
283        let Some(column_index) = self.index.get_column_index(column) else {
284            return Err(Error::NotFound(column.clone()));
285        };
286        Ok(column_index)
287    }
288
289    /// Returns the value [`DataValue`] for the given column defined by [`Key`] and row index
290    /// If the column is not found, None is returned
291    /// If the row index is out of bounds, None is returned
292    pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
293        trace!(
294            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
295            self.data_frame.len(),
296            self.data_frame.nrows()
297        );
298        trace!("{:?}", self.data_frame);
299        match self.validate_entry_access(column, row_index) {
300            Ok(column_index) => self.data_frame.get((row_index, column_index)),
301            Err(e) => {
302                trace!("Error: {e}");
303                None
304            }
305        }
306    }
307
308    /// Returns mutable reference for the value [`DataValue`] for the given column defined by [`Key`] and row index
309    /// If the column is not found, None is returned
310    /// If the row index is out of bounds, None is returned
311    pub fn get_mut_by_row_index(
312        &mut self,
313        column: &Key,
314        row_index: usize,
315    ) -> Option<&mut DataValue> {
316        trace!(
317            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
318            self.data_frame.len(),
319            self.data_frame.nrows()
320        );
321        trace!("{:?}", self.data_frame);
322        match self.validate_entry_access(column, row_index) {
323            Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
324            Err(e) => {
325                trace!("Error: {e}");
326                None
327            }
328        }
329    }
330
331    /// Returns the value [`HashMap<Key, Vec<DataValue>>`] for the given columns defined by [`Key`].
332    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
333    /// If the keys are not found, the empty [`HashMap`] is returned
334    /// Returns the [`Array2`] with the data in the row-major order
335    pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
336        let keys = keys.unwrap_or_else(|| self.index.get_keys());
337        let indexes = self.index.select(keys);
338        if indexes.is_empty() {
339            return Default::default();
340        }
341
342        let mut new_data_frame = HashMap::with_capacity(keys.len());
343
344        for key in keys.iter() {
345            if let Some(column_index_in_source) = indexes.get_column_index(key) {
346                let column = self.data_frame.column(column_index_in_source);
347                new_data_frame.insert(key.clone(), column.to_vec());
348            }
349        }
350
351        new_data_frame
352    }
353
354    /// Returns the value [`Array2<DataValue>`] for the given columns defined by [`Key`].
355    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
356    /// If the keys are not found, the empty [`Array2`] is returned
357    /// Returns the [`Array2`] with the data in the row-major order
358    pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
359        let keys = keys.unwrap_or_else(|| self.index.get_keys());
360        let indexes = self.index.select(keys);
361        if indexes.is_empty() {
362            return Array2::default((0, 0));
363        }
364        let mut new_data_frame = Array2::default((self.data_frame.nrows(), keys.len()));
365
366        for (idx, key) in keys.iter().enumerate() {
367            if let Some(column_index_in_source) = indexes.get_column_index(key) {
368                new_data_frame
369                    .column_mut(idx)
370                    .assign(&self.data_frame.column(column_index_in_source));
371            }
372        }
373
374        new_data_frame
375    }
376
377    fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
378        self.index.store_key(key);
379        let len = self.data_frame.nrows();
380        self.data_frame.push_column(Array1::default(len).view())?;
381        Ok(())
382    }
383
384    /// Pushes the row candidate into the [`ColumnFrame`]
385    /// If the column is not found this method will add the column to the [`ColumnFrame`]
386    ///
387    pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
388        let mut arr = vec![];
389        for key in row_candidate.keys() {
390            if self.index.get_column_index(&key).is_none() {
391                self.extend_dataframe_for_column(key)?;
392            }
393        }
394        for index in self.index.get_keys() {
395            if let Some(value) = row_candidate.get_value_ref(index) {
396                arr.push(value.clone());
397            } else {
398                arr.push(DataValue::Null);
399            }
400        }
401        self.data_frame.push_row(Array::from_vec(arr).view())?;
402        Ok(())
403    }
404
405    pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
406        // fixme this is naive approach
407        let mut indexes = KeyIndex::default();
408        // take remove data
409        let data = self.select(Some(keys));
410        // remove labels from the index
411        for key in keys {
412            if let Some((current, _idx)) = self.index.remove_key(key) {
413                indexes.store_key(current);
414            }
415        }
416        // copy the rest of the data to the new data frame with new index
417        let rest = self.select(Some(self.keys()));
418        let keys = self.index.get_keys().to_vec();
419        self.data_frame = rest;
420        self.index = KeyIndex::new(keys);
421
422        //remove_self.data_frame = to_array2(columns)?;
423        Ok(Self::new(indexes, data))
424    }
425
426    fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
427        if self.index.is_empty() {
428            self.index = other.index.clone();
429            self.data_frame = other.data_frame.clone();
430            return Ok(Continue::End);
431        }
432        if other.index.is_empty() {
433            return Ok(Continue::End);
434        }
435        if self.is_empty() {
436            self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
437        }
438
439        Ok(Continue::Continue)
440    }
441
442    fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
443        for key in other.index.get_keys() {
444            if self.index.get_column_index(key).is_none() {
445                self.extend_dataframe_for_column(key.clone())?;
446            }
447        }
448        Ok(())
449    }
450
451    fn try_extend(&mut self, other: Self) -> Result<(), Error> {
452        let mut joined_keys = self.index.clone();
453        // extend keys
454        for key in other.keys() {
455            if self.index.get_column_index(key).is_none() {
456                joined_keys.store_key(key.clone());
457            }
458        }
459
460        let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
461        let mut arr = Array2::default((sum_len, joined_keys.len()));
462        let increment = self.data_frame.nrows();
463        for key in joined_keys.get_keys() {
464            let index_result = joined_keys
465                .get_column_index(key)
466                .expect("BUG: index for this has to be defined");
467            if let Some(index) = self.index.get_column_index(key) {
468                for (idx, value) in self.data_frame.column(index).iter().enumerate() {
469                    if let Some(x) = arr.get_mut((idx, index_result)) {
470                        *x = value.to_owned();
471                    }
472                }
473            }
474
475            if let Some(index) = other.index.get_column_index(key) {
476                for (idx, value) in other.data_frame.column(index).iter().enumerate() {
477                    if let Some(x) = arr.get_mut((increment + idx, index_result)) {
478                        *x = value.to_owned();
479                    }
480                }
481            }
482        }
483        *self = ColumnFrame::new(joined_keys, arr);
484        Ok(())
485    }
486
487    /// Extends the [`ColumnFrame`] with the data from the other [`ColumnFrame`]
488    /// If the [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
489    /// If the other [`KeyIndex`] is empty, nothing happens
490    /// If the length of the [`KeyIndex`] of the other data frame is greater then current,
491    /// an error is returned [`Error::DataSetSizeDoesntMatch`]
492    /// If [`Key`] from other data frame - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
493    ///
494    pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
495        if self.check_or_init_frame(&other)?.should_end() {
496            return Ok(());
497        }
498
499        if self.index.check_order_of_indexes(&other.index).is_err() {
500            return self.try_extend(other);
501        }
502
503        trace!(
504            "Extend columns from other {:?} vs {:?}",
505            other.index.get_keys(),
506            self.index.get_keys()
507        );
508
509        if other.data_frame.ncols() < self.data_frame.ncols() {
510            other.extend_columns_from_other(self)?;
511        } else {
512            self.extend_columns_from_other(&other)?;
513        }
514        self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
515
516        Ok(())
517    }
518
519    /// Replace the [`ColumnFrame`] with the other [`ColumnFrame`]
520    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
521    /// If the other [`KeyIndex`] is empty, nothing happens
522    /// If the [`KeyIndex`] of the other data frame and current doesn't match an error is returned [`Error::DataSetSizeDoesntMatch`]
523    /// If the [`Key`] from other data frame is not present in the current [`ColumnFrame`] - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
524    pub fn replace(&mut self, other: Self) -> Result<(), Error> {
525        if self.check_or_init_frame(&other)?.should_end() {
526            return Ok(());
527        }
528
529        if self.data_frame.len() > other.data_frame.len() {
530            return Err(Error::DataSetSizeDoesntMatch(
531                self.data_frame.len(),
532                other.data_frame.len(),
533            ));
534        }
535        self.index = other.index;
536        self.data_frame = other.data_frame;
537
538        Ok(())
539    }
540
541    /// Joins the candidates by the keys in the `JoinRelation::JoinById` struct.
542    /// This function creates [`Index`] for the keys and then joins the candidates by the keys.
543    pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
544        if self.check_or_init_frame(&right)?.should_end() {
545            return Ok(());
546        }
547
548        let timer = std::time::Instant::now();
549        let new_columnns = right.index.get_complement_keys(self.index.get_keys());
550        // add new columns and keys into the column frame index
551        self.extend_columns_from_other(&right)?;
552        tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
553        // get the indexes for the keys
554        let timer = std::time::Instant::now();
555        let index = Index::new(keys.to_vec(), self);
556        tracing::debug!(" took: {}ns", timer.elapsed().as_nanos());
557        tracing::trace!("Index {index:?}");
558        let timer = std::time::Instant::now();
559        let right_index = Index::new(keys.to_vec(), &right);
560
561        let joined_idx = index.join(right_index);
562        // let right_indexes = right.index.select(keys);
563        tracing::debug!("right_idx {}ns", timer.elapsed().as_nanos());
564        // tracing::trace!("RIndex {right_indexes:?}",);
565        let timer = std::time::Instant::now();
566
567        let mut new_df = Array2::default((self.len(), self.index.len()));
568        new_df.assign(&self.data_frame);
569        debug!(
570            "New DF: {new_df:?} create new df: {}ns",
571            timer.elapsed().as_nanos()
572        );
573        trace!("Right DF: {:?}", right.data_frame);
574        trace!("current {:?}", self.data_frame);
575        let right_data = right.select(Some(&new_columnns));
576        // iterate over all rows in the right data frame and find the corresponding row in the current data frame
577        // then fill the new data frame with the values from the right data frame
578        // if the row is not found, the row is skipped - the data is not filled - [`DataValue::Null`] is used
579        let timer = std::time::Instant::now();
580
581        for (left_index, right_index) in joined_idx {
582            if let Some(right_index) = right_index {
583                let right_row = right_data.row(right_index);
584                for (right_column_index, complement_key) in new_columnns.iter().enumerate() {
585                    let column_index = self
586                        .index
587                        .get_column_index(complement_key)
588                        .expect("BUG: Something is very wrong");
589                    trace!("Filling Index: [{complement_key:?}] ri:{right_index} rci:{right_column_index:?}| li: {left_index} lci: {column_index} -> {:?} vs {:?}", right_row[right_column_index], new_df.get_mut((left_index, column_index)));
590                    if let Some(v) = new_df.get_mut((left_index, column_index)) {
591                        trace!("Filling result: [{complement_key:?}] ri:{right_index} rci:{right_column_index:?}| li: {left_index} lci: {column_index} -> {:?}", right_row[right_column_index]);
592                        *v = right_row[right_column_index].to_owned();
593                    }
594                }
595            }
596        }
597        let elapsed = timer.elapsed();
598        tracing::debug!(
599            "Filling data {}ms|{}s",
600            elapsed.as_millis(),
601            elapsed.as_secs()
602        );
603        self.data_frame = new_df;
604
605        Ok(())
606    }
607
608    /// Adds the single column to the current [`ColumnFrame`]
609    /// If the column is already present, an error is returned [`Error::ColumnAlreadyExists`]
610    /// If the length of the column is different from the current data frame, an error is returned [`Error::DataSetSizeDoesntMatch`]
611    pub fn add_single_column<K: Into<Key>>(
612        &mut self,
613        key: K,
614        column: Array1<DataValue>,
615    ) -> Result<(), Error> {
616        let key = key.into();
617        if self.index.get_column_index(&key).is_some() {
618            return Err(Error::ColumnAlreadyExists(key));
619        }
620        if self.len() != column.len() && !self.is_empty() {
621            return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
622        }
623
624        self.index.store_key(key.clone());
625        let rows = column.len();
626        let column_index = self
627            .index
628            .get_column_index(&key)
629            .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
630        if self.is_empty() && self.index.len() == 1 {
631            self.data_frame = column.into_shape_clone((rows, 1))?;
632            assert_eq!(self.data_frame.column(column_index).len(), rows);
633        } else if self.is_empty() {
634            self.data_frame = Array2::default((column.len(), self.index.len() - 1));
635            self.data_frame.push_column(column.view())?;
636            assert_eq!(self.data_frame.column(column_index).len(), rows);
637        } else {
638            self.data_frame.push_column(column.view())?;
639        }
640        assert_eq!(self.data_frame.column(column_index).len(), rows);
641
642        Ok(())
643    }
644    /// Adds the columns from the other [`ColumnFrame`] to the current [`ColumnFrame`]
645    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
646    /// If the other [`KeyIndex`] is empty, nothing happens
647    pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
648        if self.check_or_init_frame(&other)?.should_end() {
649            return Ok(());
650        }
651
652        self.extend_columns_from_other(&other)?;
653        for (idx, key) in other.index.get_keys().iter().enumerate() {
654            if let Some(index) = self.index.get_column_index(key) {
655                trace!("Other array = {:?}", other.data_frame.dim());
656                if other.data_frame.dim() == (0, 0) {
657                    self.data_frame.column_mut(index).fill(DataValue::Null);
658                    continue;
659                }
660                let arr = other.data_frame.column(idx);
661                trace!(
662                    "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
663                    self.data_frame.nrows(),
664                    arr.len()
665                );
666                if arr.len() != self.data_frame.nrows() {
667                    self.data_frame.column_mut(index).fill(DataValue::Null);
668                } else {
669                    self.data_frame.column_mut(index).assign(&arr);
670                }
671            }
672        }
673        Ok(())
674    }
675
676    /// Broadcasts the data from the other [`ColumnFrame`] to the current [`ColumnFrame`]
677    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
678    /// If the other [`KeyIndex`] is empty, nothing happens
679    /// If the length (number of rows) of the other data frame is greater then 1 an error is returned [`Error::CannotBroadcast`]
680    pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
681        if self.check_or_init_frame(&other)?.should_end() {
682            return Ok(());
683        }
684        if other.data_frame.nrows() != 1 {
685            return Err(Error::CannotBroadcast);
686        }
687        self.extend_columns_from_other(&other)?;
688        let mut new_df = Array2::default((self.len(), self.index.len()));
689        for (idx, key) in self.index.get_keys().iter().enumerate() {
690            if let Some(other_idx) = other.index.get_column_index(key) {
691                new_df
692                    .column_mut(idx)
693                    .assign(&other.data_frame.column(other_idx));
694            } else {
695                new_df.column_mut(idx).assign(&self.data_frame.column(idx));
696            }
697        }
698        self.data_frame = new_df;
699        Ok(())
700    }
701
702    /// Computes the Cartesian product of the input structures,
703    /// resulting in all possible combinations of elements.
704    /// The data is stored in the row-major order
705    /// The keys are stored in the order they are added - the order is preserved - new keys from the `other` [`ColumnFrame`] are added to the end
706    pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
707        if self.check_or_init_frame(&other)?.should_end() {
708            return Ok(());
709        }
710        // extend the columns
711        // let mut new_index = self.index.clone();
712        for other_key in other.keys() {
713            if self.index.get_column_index(other_key).is_none() {
714                self.index.store_key(other_key.clone());
715            } else {
716                self.index.store_key(Key::new(
717                    format!("{}-{}", other_key, other_key.id()).as_str(),
718                    other_key.ctype,
719                ));
720            }
721        }
722        let max_rows = self.len() * other.len();
723        let ncols = self.index.len();
724        // create new data frame
725        let mut new_df = Array2::default((max_rows, ncols));
726
727        let mut cur_idx = 0;
728        for cur_row in self.data_frame.rows() {
729            for other_row in other.data_frame.rows() {
730                new_df
731                    .slice_mut(s![cur_idx, ..])
732                    .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
733                cur_idx += 1;
734            }
735        }
736        self.data_frame = new_df;
737        Ok(())
738    }
739
740    /// Joins the candidates with the other candidates by the [`JoinRelation`] policy.
741    /// For [`JoinBy::AddColumns`] the columns are added to the existing structure via [`Self::add_columns`]
742    /// For [`JoinBy::Replace`] the columns are replaced with the new columns
743    /// For [`JoinBy::Extend`] the candidates are extended via [`Self::extend`]
744    /// For [`JoinBy::Broadcast`] each candidate is extended with the values of the other candidates `Self::broadcast`
745    /// For [`JoinBy::CartesianProduct`] the candidates are multiplied by the other candidates
746    /// For [`JoinBy::JoinById`] the candidates are joined by the keys in the `JoinRelation::JoinById` struct see [`Self::join_by_id_inner`]
747    pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
748        use JoinBy::*;
749        match &join_type.join_type {
750            AddColumns => self.add_columns(right),
751            Replace => self.replace(right),
752            Extend => self.extend(right),
753            Broadcast => self.broadcast(right),
754            CartesianProduct => self.cartesian_product(right),
755            JoinById(join) => self.join_by_id_inner(right, &join.keys),
756        }
757    }
758
759    pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
760        self.index
761            .get_column_index(key)
762            .map(|x| self.data_frame.column(x))
763    }
764
765    pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
766        let index = self
767            .index
768            .get_column_index(key)
769            .ok_or(Error::NotFound(key.clone()))?;
770        let column = self.data_frame.column(index);
771        let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
772        tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
773        data_with_index.sort_by(|(a_idx, a_val), (b_idx, b_val)| {
774            a_val
775                .partial_cmp(b_val)
776                .unwrap_or(std::cmp::Ordering::Equal)
777                .then_with(|| a_idx.cmp(b_idx))
778        });
779
780        tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
781        let indicies = data_with_index
782            .into_iter()
783            .map(|(idx, _)| idx)
784            .collect::<Vec<_>>();
785
786        Ok(sorted_df::SortedDataFrame::new(self, indicies))
787    }
788
789    pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
790        let mut final_indices = Vec::new();
791        let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
792        for rule in &filter.rules {
793            final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
794        }
795
796        final_indices.sort_unstable();
797        final_indices.dedup();
798        let mut new_df = ColumnFrame::new(
799            self.index.clone(),
800            Array2::default((final_indices.len(), self.index.len())),
801        );
802        final_indices
803            .iter()
804            .enumerate()
805            .for_each(|(cur_idx, row_idx)| {
806                new_df
807                    .data_frame
808                    .slice_mut(s![cur_idx, ..])
809                    .assign(&self.data_frame.slice(s![*row_idx, ..]));
810            });
811
812        Ok(new_df)
813    }
814}
815
816pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
817    let width = source.len();
818    let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
819    let height = flattened.len() / width;
820    Ok(flattened.into_shape_with_order((width, height))?)
821}
822#[macro_export]
823macro_rules! df {
824    ($($everything:tt)*) => {
825        $crate::DataFrame::new($crate::column_frame!($($everything)*))
826    };
827}
828
829#[macro_export]
830macro_rules! column_frame {
831    // case { "a" => 1, }
832    ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
833    // case { "a" => vec![1, 2, 3] }
834    ($($key:expr => vec![$($value:expr),*]),*) => {
835        $crate::column_frame!($($key => [$($value),*]),*)
836    };
837    // case { "a" => [1, 2, 3] }
838    ($($key:expr => [$($value:expr),*]),*) => {
839        {
840           let data = ::ndarray::arr2(&[$(
841                [$($value.into(),)*],
842            )*]);
843
844           let _keys = vec![$($key.into(),)*];
845
846            $crate::ColumnFrame::new(
847                $crate::KeyIndex::new(_keys),
848                data.reversed_axes()
849            )
850        }
851    };
852    // case { "a" => 1 }
853    ($($key:expr => $value:expr),*) => {
854        {
855           let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
856           let _keys = vec![$($key.into(),)*];
857
858            $crate::ColumnFrame::new(
859                $crate::KeyIndex::new(_keys),
860                _data,
861            )
862        }
863    };
864}
865
866#[cfg(test)]
867mod test {
868    use crate::{filter::FilterRules, JoinById};
869
870    use super::*;
871    use data_value::stdhashmap;
872    use ndarray::ArrayView;
873    use rstest::*;
874    use tracing_test::traced_test;
875
876    #[rstest]
877    #[case(
878        column_frame! {
879            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
880            "b" => [4, 5, 6],
881            "c" => [7, 8, 9]
882        },
883        column_frame! {
884            "t" => [1752001987000000u64],
885            "b" => [5],
886            "c" => [8]
887        },
888        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
889    )]
890    #[case(
891        column_frame! {
892            "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
893            "b" => [4, 5, 6],
894            "c" => [7, 8, 9]
895        },
896        column_frame! {
897            "t" => [1752001987000000f64],
898            "b" => [5],
899            "c" => [8]
900        },
901        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
902    )]
903    #[case(
904        column_frame! {
905            "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
906            "b" => [4, 5, 6],
907            "c" => [7, 8, 9]
908        },
909        column_frame! {
910            "t" => [1752001987000000i64],
911            "b" => [5],
912            "c" => [8]
913        },
914        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
915    )]
916    #[case(
917        column_frame! {
918            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
919            "b" => [4, 5, 6],
920            "c" => [7, 8, 9]
921        },
922        column_frame! {
923            "t" => [1751001987000000u64],
924            "b" => [4],
925            "c" => [7]
926        },
927        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
928    )]
929    #[case(
930        column_frame! {
931            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
932            "b" => [4, 5, 6],
933            "c" => [7, 8, 9]
934        },
935        column_frame! {
936            "t" => ["2025-07-08 18:13:07"],
937            "b" => [4],
938            "c" => [7]
939        },
940        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
941    )]
942    #[case(
943        column_frame! {
944            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
945            "b" => [4, 5, 6],
946            "c" => [7, 8, 9]
947        },
948        column_frame! {
949            "t" => [],
950            "b" => [],
951            "c" => []
952        },
953        FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
954    )]
955    #[case(
956        column_frame! {
957            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
958            "b" => [4, 5, 6],
959            "c" => [7, 8, 9]
960        },
961        column_frame! {
962            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
963            "b" => [4, 5, 6],
964            "c" => [7, 8, 9]
965        },
966        FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
967    )]
968    #[case(
969        column_frame! {
970            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
971            "b" => [4, 5, 6],
972            "c" => [7, 8, 9]
973        },
974        column_frame! {
975            "t" => [DataValue::Vec(vec![])],
976            "b" => [5],
977            "c" => [ 8]
978        },
979        FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
980    )]
981    #[case(
982        column_frame! {
983            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
984            "b" => [4, 5, 6],
985            "c" => [7, 8, 9]
986        },
987        column_frame! {
988            "t" => [DataValue::Vec(vec![1.into()])],
989            "b" => [6],
990            "c" => [9]
991        },
992        FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
993    )]
994    #[case(
995        column_frame! {
996            "a" => [1, 2, 3],
997            "b" => [4, 5, 6],
998            "c" => [7, 8, 9]
999        },
1000        column_frame! {
1001            "a" => [1, 2],
1002            "b" => [4, 5],
1003            "c" => [7, 8]
1004        },
1005        FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
1006    )]
1007    #[case(
1008        column_frame! {
1009            "a" => [1, 2, 3],
1010            "b" => [4, 5, 6],
1011            "c" => [7, 8, 9]
1012        },
1013        column_frame! {
1014            "a" => [2],
1015            "b" => [5],
1016            "c" => [8]
1017        },
1018        FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
1019    )]
1020    #[case(
1021        column_frame! {
1022            "a" => [1, 2, 3],
1023            "b" => [4, 5, 6],
1024            "c" => [7, 8, 9]
1025        },
1026        column_frame! {
1027            "a" => [],
1028            "b" => [],
1029            "c" => []
1030        },
1031        FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
1032    )]
1033    #[case(
1034        column_frame! {
1035            "a" => [1, 2, 3],
1036            "b" => [4, 5, 6],
1037            "c" => [7, 8, 9]
1038        },
1039        column_frame! {
1040            "a" => [1, 2],
1041            "b" => [4, 5],
1042            "c" => [7, 8]
1043        },
1044        FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1045    )]
1046    #[case(
1047        column_frame! {
1048            "a" => [1, 2, 3],
1049            "b" => [4, 5, 6],
1050            "c" => [7, 8, 9]
1051        },
1052        column_frame! {
1053            "a" => [2],
1054            "b" => [5],
1055            "c" => [8]
1056        },
1057        FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1058    )]
1059    #[case(
1060        column_frame! {
1061            "a" => ["abcd", "ab", "abcdefg"],
1062            "b" => [4, 5, 6],
1063            "c" => [7, 8, 9]
1064        },
1065        column_frame! {
1066            "a" => ["abcd","abcdefg"],
1067            "b" => [4, 6],
1068            "c" => [7, 9]
1069        },
1070        FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1071    )]
1072    #[case(
1073        column_frame! {
1074            "a" => [1, 2, 3],
1075            "b" => [4, 5, 6],
1076            "c" => [7, 8, 9]
1077        },
1078        column_frame! {
1079            "a" => [1],
1080            "b" => [4],
1081            "c" => [7]
1082        },
1083        FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1084    )]
1085    #[case(
1086        column_frame! {
1087            "a" => [1, 2, 3],
1088            "b" => [4, 5, 6],
1089            "c" => [7, 8, 9]
1090        },
1091        column_frame! {
1092            "a" => [2, 3],
1093            "b" => [5, 6],
1094            "c" => [8, 9]
1095        },
1096        FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1097    )]
1098    #[case(
1099        column_frame! {
1100            "a" => [1f64, 2f64, 3f64],
1101            "b" => [4, 5, 6],
1102            "c" => [7, 8, 9]
1103        },
1104        column_frame! {
1105            "a" => [1f64, 2f64],
1106            "b" => [4, 5],
1107            "c" => [7, 8]
1108        },
1109        FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1110    )]
1111    #[case(
1112        column_frame! {
1113            "a" => [1f64, 2f64, 3f64],
1114            "b" => [4i64, 5i64, 6i64],
1115            "c" => [7i64, 8i64, 9i64]
1116        },
1117        column_frame! {
1118            "a" => [1f64, 2f64],
1119            "b" => [4i64, 5i64],
1120            "c" => [7i64, 8i64]
1121        },
1122        FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1123    )]
1124    #[traced_test]
1125    fn filter_test(
1126        #[case] df: ColumnFrame,
1127        #[case] expected: ColumnFrame,
1128        #[case] filter: FilterRules,
1129    ) {
1130        let filtered = df.filter(&filter).expect("BUG: cannot filter");
1131        assert_eq!(filtered, expected);
1132    }
1133
1134    #[rstest]
1135    #[traced_test]
1136    fn test_macro() {
1137        let df = column_frame! {
1138            "a" => 1,
1139            "b" => 2,
1140            "c" => 3,
1141            "d" => 4,
1142        };
1143
1144        assert_eq!(df.len(), 1);
1145        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1146        let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1147            .expect("BUG: cannot create array");
1148        assert_eq!(df.select(None), f);
1149
1150        let df = column_frame! {
1151            "a" => [1, 2, 3],
1152            "b" => [4, 5, 6],
1153            "c" => [7, 8, 9]
1154        };
1155
1156        assert_eq!(df.len(), 3);
1157        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1158        let f = Array2::from_shape_vec(
1159            (3, 3),
1160            vec![
1161                1.into(),
1162                4.into(),
1163                7.into(),
1164                2.into(),
1165                5.into(),
1166                8.into(),
1167                3.into(),
1168                6.into(),
1169                9.into(),
1170            ],
1171        )
1172        .expect("BUG: cannot create array");
1173        let selected = df.select(None);
1174        trace!("{selected:?}");
1175        assert_eq!(selected, f);
1176
1177        let df1 = df! {
1178            "a" => [1, 2, 3],
1179            "b" => [4, 5, 6],
1180            "c" => [7, 8, 9]
1181        };
1182
1183        // just to make sure we've tested Display
1184        let formatted = format!("{}", df);
1185        debug!("{}", formatted);
1186
1187        assert_eq!(df1, crate::DataFrame::from(df));
1188    }
1189
1190    #[rstest]
1191    #[case(
1192        column_frame! {
1193            "a" => [1, 2, 3],
1194            "b" => [4, 5, 6],
1195            "c" => [7, 8, 9]
1196        },
1197        column_frame! {
1198            "a_new" => [1, 2, 3],
1199            "b" => [4, 5, 6],
1200            "c" => [7, 8, 9]
1201        },
1202        vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1203        vec![("a", "a_new".into())]
1204    )]
1205    #[traced_test]
1206    fn rename_test(
1207        #[case] df: ColumnFrame,
1208        #[case] expected: ColumnFrame,
1209        #[case] keys: Vec<Key>,
1210        #[case] renames: Vec<(&str, Key)>,
1211    ) {
1212        let mut df = df;
1213        for (old, new) in renames {
1214            df.rename_key(old, new).expect("BUG: cannot rename key");
1215        }
1216        assert_eq!(df, expected);
1217        assert_eq!(df.keys(), keys.as_slice());
1218    }
1219
1220    #[rstest]
1221    #[case(
1222        column_frame!("a" => [1, 2, 3]),
1223        Key::new("a", crate::DataType::I32),
1224        column_frame!("a" => [1i32, 2i32, 3i32])
1225    )]
1226    #[case(
1227        column_frame!("a" => [1, 2, 3]),
1228        Key::new("a", crate::DataType::U32),
1229        column_frame!("a" => [1u32, 2u32, 3u32])
1230    )]
1231    #[case(
1232        column_frame!("a" => [1, 2, 3]),
1233        Key::new("a", crate::DataType::I64),
1234        column_frame!("a" => [1i64, 2i64, 3i64])
1235    )]
1236    #[case(
1237        column_frame!("a" => [1, 2, 3]),
1238        Key::new("a", crate::DataType::U64),
1239        column_frame!("a" => [1u64, 2u64, 3u64])
1240    )]
1241    #[case(
1242        column_frame!("a" => [1, 2, 3]),
1243        Key::new("a", crate::DataType::F64),
1244        column_frame!("a" => [1f64, 2f64, 3f64])
1245    )]
1246    #[case(
1247        column_frame!("a" => [1, 2, 3]),
1248        Key::new("a", crate::DataType::F32),
1249        column_frame!("a" => [1f32, 2f32, 3f32])
1250    )]
1251    // #[case(
1252    //     column_frame!("a" => [1, 2, 3]),
1253    //     Key::new("a", crate::DataType::String),
1254    //     column_frame!("a" => ["1", "2", "3"])
1255    // )]
1256    fn test_try_fix_dtype(
1257        #[case] mut df: ColumnFrame,
1258        #[case] key: Key,
1259        #[case] expected: ColumnFrame,
1260    ) {
1261        assert!(df.try_fix_column_by_key(&key).is_ok());
1262        assert_eq!(
1263            df.select(Some(&[key.clone()])),
1264            expected.select(Some(&[key.clone()]))
1265        );
1266    }
1267
1268    #[fixture]
1269    fn unknown_df() -> ColumnFrame {
1270        let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1271
1272        hm.insert("a".into(), vec![1u32.into()]);
1273        hm.insert("b".into(), vec![3i64.into()]);
1274        hm.insert("c".into(), vec![1f64.into()]);
1275        hm.insert("d".into(), vec![1u64.into()]);
1276
1277        hm.into()
1278    }
1279    #[rstest]
1280    #[case(stdhashmap!(
1281        "a" => crate::DataType::U32,
1282        "b" => crate::DataType::I64,
1283        "c" => crate::DataType::F64,
1284        "d" => crate::DataType::U64)
1285    )]
1286    fn test_try_fix_dtype_unknown(
1287        mut unknown_df: ColumnFrame,
1288        #[case] dtypes: HashMap<String, crate::DataType>,
1289    ) {
1290        for dtype in dtypes.iter() {
1291            let t: &Key = unknown_df
1292                .keys()
1293                .iter()
1294                .find(|x| x.name() == dtype.0)
1295                .unwrap();
1296            assert_ne!(t.ctype, crate::DataType::Unknown);
1297        }
1298        assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1299        for dtype in dtypes.iter() {
1300            let t: &Key = unknown_df
1301                .keys()
1302                .iter()
1303                .find(|x| x.name() == dtype.0)
1304                .unwrap();
1305            assert_eq!(t.ctype, *dtype.1);
1306            assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1307        }
1308        assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1309    }
1310
1311    #[rstest]
1312    #[case(
1313        column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1314        Key::new("a", crate::DataType::F32),
1315        column_frame!("a" => [1f32, 2f32, 3f32])
1316    )]
1317    #[traced_test]
1318    fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1319        assert!(df.try_fix_dtype().is_ok());
1320        assert_eq!(
1321            df.select(Some(&[key.clone()])),
1322            expected.select(Some(&[key]))
1323        )
1324    }
1325
1326    #[rstest]
1327    #[traced_test]
1328    fn test_not_key_fix() {
1329        let mut cf = column_frame!("a" => [1]);
1330        let non_existing = Key::new("b", crate::DataType::I32);
1331        assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1332    }
1333
1334    #[rstest]
1335    #[case(
1336        column_frame! {
1337            "a" => [1, 2, 3],
1338            "b" => [4, 5, 6],
1339            "c" => [7, 8, 9]
1340        },
1341        vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1342        vec![("a", "a_alias")]
1343    )]
1344    #[traced_test]
1345    fn alias_test(
1346        #[case] df: ColumnFrame,
1347        #[case] keys: Vec<Key>,
1348        #[case] aliases: Vec<(&str, &str)>,
1349    ) {
1350        let mut df = df;
1351        for (old, new) in aliases {
1352            df.add_alias(old, new).expect("BUG: cannot rename key");
1353        }
1354        let origin_keys = df.keys().to_vec();
1355        let selected_aliases = df.select(Some(keys.as_slice()));
1356        let selected = df.select(Some(origin_keys.as_slice()));
1357        assert_eq!(selected, selected_aliases);
1358    }
1359
1360    #[rstest]
1361    #[traced_test]
1362    fn test_mut_view() {
1363        let data = vec![
1364            DataValue::from(1f64),
1365            DataValue::from(4f32),
1366            DataValue::from(2f64),
1367            DataValue::from(f32::NAN),
1368            DataValue::from(f64::NAN),
1369            DataValue::from(f32::INFINITY),
1370        ];
1371        let keys: Vec<Key> = vec!["a".into(), "b".into()];
1372
1373        let index = KeyIndex::new(keys.clone());
1374        let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1375        let mut df = ColumnFrame::new(index.clone(), df);
1376        df.get_mut_view().mapv_inplace(|x| match x {
1377            DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1378            DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1379            e => e,
1380        });
1381        let data = vec![
1382            DataValue::from(1f64),
1383            DataValue::from(4f32),
1384            DataValue::from(2f64),
1385            DataValue::from(0f32),
1386            DataValue::from(0f64),
1387            DataValue::from(0f32),
1388        ];
1389        let expected = ColumnFrame::new(
1390            index,
1391            Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1392        );
1393        assert_eq!(df, expected);
1394    }
1395
1396    #[rstest]
1397    #[traced_test]
1398    fn dummy_test() {
1399        let data = vec![
1400            DataValue::U32(1),
1401            DataValue::I32(2),
1402            DataValue::I64(3),
1403            DataValue::U64(4),
1404        ];
1405
1406        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1407
1408        let index = KeyIndex::new(keys.clone());
1409        let mut data_frame = Array2::default((1, keys.len()));
1410        for (idx, entry) in data.iter().enumerate() {
1411            data_frame
1412                .column_mut(idx)
1413                .assign(&ArrayView::from(&[entry.clone()]));
1414        }
1415
1416        let frame = ColumnFrame::new(index, data_frame);
1417        assert_eq!(
1418            frame.get_by_row_index(&"a".into(), 0),
1419            Some(&DataValue::U32(1))
1420        );
1421        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1422        assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1423        assert_eq!(
1424            frame.select(Some(&["a".into(), "b".into()])),
1425            Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1426                .expect("BUG: cannot create array")
1427        );
1428    }
1429
1430    #[rstest]
1431    #[traced_test]
1432    fn dummy_test_multiple_rows() {
1433        let data = vec![
1434            DataValue::U32(1),
1435            DataValue::I32(2),
1436            DataValue::I64(3),
1437            DataValue::U64(4),
1438            DataValue::U32(12),
1439            DataValue::I32(22),
1440            DataValue::I64(32),
1441            DataValue::U64(42),
1442        ];
1443
1444        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1445
1446        let index = KeyIndex::new(keys.clone());
1447        let data_frame =
1448            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1449
1450        let frame = ColumnFrame::new(index, data_frame);
1451        assert_eq!(
1452            frame.get_by_row_index(&"a".into(), 0),
1453            Some(&DataValue::U32(1))
1454        );
1455        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1456        assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1457        let arr = Array2::from_shape_vec(
1458            (2, 2),
1459            vec![
1460                DataValue::U32(1),
1461                DataValue::I32(2),
1462                DataValue::U32(12),
1463                DataValue::I32(22),
1464            ],
1465        )
1466        .expect("BUG: cannot create array");
1467        trace!("{arr:?}");
1468        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1469    }
1470
1471    #[rstest]
1472    #[traced_test]
1473    fn dummy_test_multiple_rows_push() {
1474        let data = vec![
1475            DataValue::U32(1),
1476            DataValue::I32(2),
1477            DataValue::I64(3),
1478            DataValue::U64(4),
1479            DataValue::U32(12),
1480            DataValue::I32(22),
1481            DataValue::I64(32),
1482            DataValue::U64(42),
1483        ];
1484        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1485
1486        let index = KeyIndex::new(keys.clone());
1487        let data_frame =
1488            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1489
1490        let mut frame = ColumnFrame::new(index, data_frame);
1491        assert!(frame
1492            .push(data_value::stdhashmap!(
1493                "a" => DataValue::U32(2),
1494                "b" => DataValue::I32(3),
1495                "c" => DataValue::I64(4),
1496                "d" => DataValue::U64(5)
1497            ))
1498            .is_ok());
1499        let arr = Array2::from_shape_vec(
1500            (3, 2),
1501            vec![
1502                DataValue::U32(1),
1503                DataValue::I32(2),
1504                DataValue::U32(12),
1505                DataValue::I32(22),
1506                DataValue::U32(2),
1507                DataValue::I32(3),
1508            ],
1509        )
1510        .expect("BUG: cannot create array");
1511        trace!("{arr:?}");
1512        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1513        let result = frame.push(data_value::stdhashmap!(
1514            "a" => DataValue::U32(34),
1515            "b" => DataValue::I32(44),
1516            "c" => DataValue::I64(54),
1517            "e" => DataValue::F32(6f32)
1518        ));
1519        assert!(result.is_ok(), "{result:?}");
1520        let arr = Array2::from_shape_vec(
1521            (4, 2),
1522            vec![
1523                DataValue::U64(4),
1524                DataValue::Null,
1525                DataValue::U64(42),
1526                DataValue::Null,
1527                DataValue::U64(5),
1528                DataValue::Null,
1529                DataValue::Null,
1530                DataValue::F32(6f32),
1531            ],
1532        )
1533        .expect("BUG: cannot create array");
1534        trace!("{arr:?}");
1535        assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1536    }
1537
1538    #[rstest]
1539    #[case(
1540        column_frame! {
1541            "group_id" => vec![1, 2],
1542            "feed_tag" => vec![3, 4]
1543        },
1544        Some(vec![Key::from("group_id")]),
1545        ndarray::array!([1.into()], [2.into()])
1546    )]
1547    #[case(
1548        column_frame! {
1549            "group_id" => vec![1, 2],
1550            "feed_tag" => vec![3, 4]
1551        },
1552        Some(vec!["group_id".into(), "feed_tag".into()]),
1553        ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1554    )]
1555    #[case(
1556        column_frame! {
1557            "group_id" => vec![1, 2],
1558            "feed_tag" => vec![3, DataValue::Null]
1559        },
1560        Some(vec!["feed_tag".into()]),
1561        ndarray::array![[3.into()], [DataValue::Null]]
1562    )]
1563    #[case(
1564        column_frame! {
1565            "group_id" => vec![1, 2],
1566            "feed_tag" => vec![1, DataValue::Null]
1567        },
1568        Some(vec!["feed_tag2".into()]),
1569       Array2::<DataValue>::default((0, 0))
1570    )]
1571    #[traced_test]
1572    fn test_select(
1573        #[case] input: ColumnFrame,
1574        #[case] keys: Option<Vec<Key>>,
1575        #[case] expected: Array2<DataValue>,
1576    ) {
1577        trace!("input={input:?}");
1578        let keys_slice = keys.as_deref();
1579        let selected = input.select(keys_slice);
1580        trace!("selected={selected:?}");
1581        assert_eq!(selected, expected);
1582        let selected = input.select_transposed(keys_slice);
1583        trace!("selected_transposed={selected:?}");
1584        assert!(selected.is_ok());
1585        assert_eq!(selected.unwrap(), expected.t());
1586    }
1587
1588    #[rstest]
1589    #[case(
1590        column_frame! {
1591            "group_id" => vec![1, 2],
1592            "feed_tag" => vec![3, 4]
1593        },
1594        Key::from("group_id"),
1595        Some(ndarray::array!(1.into(), 2.into()))
1596    )]
1597    #[case(
1598        column_frame! {
1599            "group_id" => vec![1, 2, 5, 6],
1600            "feed_tag" => vec![3, 4, 7, 8]
1601        },
1602        Key::from("group_id"),
1603        Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1604    )]
1605    #[case(
1606        column_frame! {
1607            "group_id" => vec![1, 2],
1608            "feed_tag" => vec![1, 1]
1609        },
1610        Key::from("feed_tag1"),
1611        None
1612    )]
1613    #[traced_test]
1614    fn test_select_column(
1615        #[case] input: ColumnFrame,
1616        #[case] key: Key,
1617        #[case] expected: Option<Array1<DataValue>>,
1618    ) {
1619        let selected = input.select_column(&key);
1620        trace!("selected={selected:?}");
1621        match expected {
1622            Some(expected) => {
1623                assert!(selected.is_some());
1624                assert_eq!(selected.expect("BUG: checked above"), expected);
1625            }
1626            None => assert!(selected.is_none()),
1627        }
1628    }
1629
1630    #[test]
1631    #[traced_test]
1632    fn empty_join_test() {
1633        let join = JoinRelation::add_columns();
1634        let mut column_frame = ColumnFrame::default();
1635        column_frame
1636            .add_single_column("group_id", Array1::from_vec(vec![]))
1637            .expect("BUG: cannot add column");
1638        let column_frame2 = column_frame! {
1639            "group_id" => vec![2, 1, 3],
1640            "feed_tag" => vec![1, 1, 1],
1641            "clicks" => vec![100, 10, 10],
1642            "imps" => vec![1000, 200, 200]
1643        };
1644        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1645
1646        let joined = column_frame.join(column_frame2, &join);
1647        assert!(joined.is_ok(), "{joined:?}");
1648
1649        trace!("{column_frame:?}");
1650        assert_eq!(
1651            column_frame.select(Some(&[
1652                "group_id".into(),
1653                "feed_tag".into(),
1654                "clicks".into(),
1655                "imps".into()
1656            ])),
1657            ndarray::array!(
1658                [2.into(), 1.into(), 100.into(), 1000.into()],
1659                [1.into(), 1.into(), 10.into(), 200.into()],
1660                [3.into(), 1.into(), 10.into(), 200.into()],
1661            )
1662        );
1663
1664        let mut column_frame2 = column_frame! {
1665            "feed_tag" => vec![1, 1, 1],
1666            "clicks" => vec![100, 10, 10],
1667            "imps" => vec![1000, 200, 200]
1668        };
1669        let mut column_frame = ColumnFrame::default();
1670        column_frame
1671            .add_single_column("group_id", Array1::from_vec(vec![]))
1672            .expect("BUG: cannot add column");
1673        let joined = column_frame2.join(column_frame, &join);
1674        assert!(joined.is_ok(), "{joined:?}");
1675
1676        trace!("{column_frame2:?}");
1677        assert_eq!(
1678            column_frame2.select(Some(&[
1679                "group_id".into(),
1680                "feed_tag".into(),
1681                "clicks".into(),
1682                "imps".into()
1683            ])),
1684            ndarray::array!(
1685                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1686                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1687                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1688            )
1689        );
1690
1691        let mut column_frame = ColumnFrame::default();
1692        column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1693        let joined = column_frame2.join(column_frame, &join);
1694        assert!(joined.is_ok(), "{joined:?}");
1695
1696        trace!("{column_frame2:?}");
1697        assert_eq!(
1698            column_frame2.select(Some(&[
1699                "group_id2".into(),
1700                "feed_tag".into(),
1701                "clicks".into(),
1702                "imps".into()
1703            ])),
1704            ndarray::array!(
1705                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1706                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1707                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1708            )
1709        );
1710    }
1711
1712    #[test]
1713    #[traced_test]
1714    fn join_test() {
1715        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1716            "group_id".into(),
1717            "feed_tag".into(),
1718        ])));
1719        let mut column_frame = column_frame! {
1720            "group_id" => vec![1, 2, 8],
1721            "feed_tag" => vec![1, 1, 10]
1722        };
1723        let column_frame2 = column_frame! {
1724            "group_id" => vec![2, 1, 3],
1725            "feed_tag" => vec![1, 1, 1],
1726            "clicks" => vec![100, 10, 10],
1727            "imps" => vec![1000, 200, 200]
1728        };
1729        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1730
1731        let joined = column_frame.join(column_frame2, &join);
1732        assert!(joined.is_ok(), "{joined:?}");
1733
1734        trace!("{column_frame:?}");
1735        assert_eq!(
1736            column_frame.select(Some(&[
1737                "group_id".into(),
1738                "feed_tag".into(),
1739                "clicks".into(),
1740                "imps".into()
1741            ])),
1742            ndarray::array!(
1743                [1.into(), 1.into(), 10.into(), 200.into()],
1744                [2.into(), 1.into(), 100.into(), 1000.into()],
1745                [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1746            )
1747        )
1748    }
1749
1750    #[test]
1751    #[traced_test]
1752    fn join_test_with_additional() {
1753        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1754            "group_id".into(),
1755            "feed_tag".into(),
1756        ])));
1757        let mut column_frame = column_frame! {
1758            "group_id" => vec![1, 2, 8],
1759            "feed_tag" => vec![1, 1, 10],
1760            "clicked" => vec![0, 0, 1]
1761        };
1762        let column_frame2 = column_frame! {
1763            "group_id" => vec![2, 1, 3],
1764            "feed_tag" => vec![1, 1, 1],
1765            "clicks" => vec![100, 10, 10],
1766            "imps" => vec![1000, 200, 200]
1767        };
1768        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1769
1770        let joined = column_frame.join(column_frame2, &join);
1771        assert!(joined.is_ok(), "{joined:?}");
1772
1773        trace!("{column_frame:?}");
1774        assert_eq!(
1775            column_frame.select(Some(&[
1776                "group_id".into(),
1777                "feed_tag".into(),
1778                "clicks".into(),
1779                "imps".into(),
1780                "clicked".into()
1781            ])),
1782            ndarray::array!(
1783                [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1784                [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1785                [
1786                    8.into(),
1787                    10.into(),
1788                    DataValue::Null,
1789                    DataValue::Null,
1790                    1.into()
1791                ]
1792            )
1793        )
1794    }
1795
1796    #[test]
1797    #[traced_test]
1798    fn join_test_with_additional_single() {
1799        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1800            "group_id".into(),
1801            "feed_tag".into(),
1802        ])));
1803        let mut column_frame = column_frame! {
1804            "group_id" => vec![1, 2, 8],
1805            "feed_tag" => vec![1, 1, 10],
1806            "clicked" => vec![0, 0, 1]
1807        };
1808        let column_frame2 = column_frame! {
1809            "a" => vec![1],
1810            "group_id" => vec![2],
1811            "feed_tag" => vec![1],
1812            "clicks" => vec![10],
1813            "imps" => vec![200]
1814        };
1815        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1816
1817        let joined = column_frame.join(column_frame2, &join);
1818        assert!(joined.is_ok(), "{joined:?}");
1819
1820        trace!("{column_frame:?}");
1821        assert_eq!(
1822            column_frame.select(Some(&[
1823                "group_id".into(),
1824                "feed_tag".into(),
1825                "clicks".into(),
1826                "imps".into(),
1827                "clicked".into()
1828            ])),
1829            ndarray::array!(
1830                [
1831                    1.into(),
1832                    1.into(),
1833                    DataValue::Null,
1834                    DataValue::Null,
1835                    0.into(),
1836                ],
1837                [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1838                [
1839                    8.into(),
1840                    10.into(),
1841                    DataValue::Null,
1842                    DataValue::Null,
1843                    1.into()
1844                ]
1845            )
1846        )
1847    }
1848
1849    #[rstest]
1850    #[traced_test]
1851    fn cartesian_product_join() {
1852        let mut df = column_frame! {
1853            "group_id" => vec![1, 2, 3],
1854            "feed_tag" => vec![1, 2, 3]
1855        };
1856        let df2 = column_frame! {
1857            "zone_id" => vec![111111, 111133],
1858            "zone_avg_ctr" => vec![0.1, 0.001]
1859        };
1860        assert!(df
1861            .join(
1862                ColumnFrame::default(),
1863                &JoinRelation::new(JoinBy::CartesianProduct)
1864            )
1865            .is_ok());
1866        let join = JoinRelation::new(JoinBy::CartesianProduct);
1867        let result = df.join(df2, &join);
1868        assert!(result.is_ok(), "{result:?}");
1869        let selected = df.select(None);
1870        trace!("{selected:?}");
1871        assert_eq!(
1872            selected,
1873            ndarray::array!(
1874                [1.into(), 1.into(), 111111.into(), 0.1.into()],
1875                [1.into(), 1.into(), 111133.into(), 0.001.into()],
1876                [2.into(), 2.into(), 111111.into(), 0.1.into()],
1877                [2.into(), 2.into(), 111133.into(), 0.001.into()],
1878                [3.into(), 3.into(), 111111.into(), 0.1.into()],
1879                [3.into(), 3.into(), 111133.into(), 0.001.into()],
1880            )
1881        );
1882
1883        let df2 = column_frame! {
1884            "zone_id" => vec![111]
1885        };
1886        let result = df.join(df2, &join);
1887        assert!(result.is_ok(), "{result:?}");
1888        let selected = df.select(None);
1889        trace!("{selected:?}");
1890        assert_eq!(
1891            selected,
1892            ndarray::array!(
1893                [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
1894                [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
1895                [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
1896                [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
1897                [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
1898                [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
1899            )
1900        );
1901    }
1902
1903    #[rstest]
1904    #[traced_test]
1905    fn broadcast_join() {
1906        let mut df = column_frame! {
1907            "group_id" => vec![1, 2, 3],
1908            "feed_tag" => vec![1, 2, 3]
1909        };
1910        let df2 = column_frame! {
1911            "zone_id" => vec![111111]
1912        };
1913        assert!(df
1914            .join(
1915                ColumnFrame::default(),
1916                &JoinRelation::new(JoinBy::Broadcast)
1917            )
1918            .is_ok());
1919        let join = JoinRelation::new(JoinBy::Broadcast);
1920        assert!(df.join(df2, &join).is_ok());
1921        let selected = df.select(None);
1922        trace!("{selected:?}");
1923        assert_eq!(
1924            selected,
1925            ndarray::array!(
1926                [1.into(), 1.into(), 111111.into()],
1927                [2.into(), 2.into(), 111111.into()],
1928                [3.into(), 3.into(), 111111.into()]
1929            )
1930        );
1931    }
1932    #[rstest]
1933    #[traced_test]
1934    fn merge_test() {
1935        let mut df = column_frame! {
1936            "group_id" => vec![1, 2, 3],
1937            "feed_tag" => vec![1, 2, 3]
1938        };
1939        let df2 = column_frame! {
1940            "group_id" => vec![11, 21, 31],
1941            "feed_tag" => vec![12, 22, 32]
1942        };
1943
1944        let join = JoinRelation::new(JoinBy::Replace);
1945        assert!(df.join(df2, &join).is_ok());
1946        let selected = df.select(None);
1947        trace!("{selected:?}");
1948        assert_eq!(
1949            selected,
1950            ndarray::array!(
1951                [11.into(), 12.into()],
1952                [21.into(), 22.into()],
1953                [31.into(), 32.into()]
1954            )
1955        );
1956    }
1957
1958    #[rstest]
1959    #[traced_test]
1960    fn extend_test() {
1961        let mut df = column_frame! {
1962            "group_id" => vec![1, 2, 3],
1963            "feed_tag" => vec![1, 2, 3]
1964        };
1965        let df2 = column_frame! {
1966            "group_id" => vec![11, 21, 31],
1967            "feed_tag" => vec![5, 6, 7]
1968        };
1969        assert!(df
1970            .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
1971            .is_ok());
1972
1973        let join = JoinRelation::new(JoinBy::Extend);
1974        assert!(df.join(df2, &join).is_ok());
1975        let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
1976        trace!("{selected:?}");
1977        assert_eq!(
1978            selected,
1979            ndarray::array!(
1980                [1.into(), 1.into()],
1981                [2.into(), 2.into()],
1982                [3.into(), 3.into()],
1983                [5.into(), 11.into()],
1984                [6.into(), 21.into()],
1985                [7.into(), 31.into()]
1986            )
1987        );
1988        let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
1989        trace!("{as_map:?}");
1990        assert_eq!(
1991            as_map,
1992            stdhashmap!(
1993                "feed_tag" => vec![1, 2, 3, 5, 6, 7],
1994                "group_id" => vec![1, 2, 3, 11, 21, 31]
1995            )
1996        );
1997
1998        let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
1999        trace!("{as_map:?}");
2000        assert_eq!(as_map, HashMap::default());
2001    }
2002
2003    #[rstest]
2004    #[traced_test]
2005    fn extend_test_with_non_existing_cols() {
2006        let mut df = column_frame! {
2007            "group_id" => vec![1, 2, 3],
2008            "feed_tag" => vec![1, 2, 3]
2009        };
2010        let mut df2 = column_frame! {
2011            "group_id" => vec![11, 21, 31],
2012            "feed_tag" => vec![5, 6, 7],
2013            "clicks" => vec![100, 200, 300],
2014            "impressions" => vec![1000, 2000, 3000]
2015        };
2016        let df_bckp = df.clone();
2017        let join = JoinRelation::new(JoinBy::Extend);
2018        assert!(df.join(df2.clone(), &join).is_ok());
2019        let selected = df.select(None);
2020        trace!("{selected:?}");
2021        assert_eq!(
2022            selected,
2023            ndarray::array!(
2024                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2025                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2026                [3.into(), 3.into(), DataValue::Null, DataValue::Null],
2027                [11.into(), 5.into(), 100.into(), 1000.into()],
2028                [21.into(), 6.into(), 200.into(), 2000.into()],
2029                [31.into(), 7.into(), 300.into(), 3000.into()]
2030            )
2031        );
2032        let join = JoinRelation::new(JoinBy::Extend);
2033        let r = df2.join(df_bckp, &join);
2034        assert!(r.is_ok(), "{r:?}");
2035        let selected = df2.select(None);
2036        trace!("{selected:?}");
2037        assert_eq!(
2038            selected,
2039            ndarray::array!(
2040                [11.into(), 5.into(), 100.into(), 1000.into()],
2041                [21.into(), 6.into(), 200.into(), 2000.into()],
2042                [31.into(), 7.into(), 300.into(), 3000.into()],
2043                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2044                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2045                [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2046            )
2047        );
2048    }
2049
2050    #[rstest]
2051    #[traced_test]
2052    fn extend_test_with_non_existing_cols_wrong_order() {
2053        let mut df = column_frame! {
2054            "group_id" => vec![1, 2, 3],
2055            "feed_tag" => vec![1, 2, 3]
2056        };
2057        let df2 = column_frame! {
2058            "feed_tag" => vec![5, 6, 7],
2059            "group_id" => vec![11, 21, 31]
2060        };
2061        let join = JoinRelation::new(JoinBy::Extend);
2062        let err = df.join(df2, &join);
2063        assert!(err.is_ok(), "{err:?}");
2064    }
2065
2066    #[rstest]
2067    #[traced_test]
2068    fn test_replace_not_compatible() {
2069        let mut df = column_frame! {
2070            "group_id" => vec![1, 2, 3],
2071            "feed_tag" => vec![1, 2, 3]
2072        };
2073        let df2 = column_frame! {
2074            "feed_tag" => vec![5, 6],
2075            "group_id" => vec![11, 21]
2076        };
2077        let join = JoinRelation::new(JoinBy::Replace);
2078        let err = df.join(df2, &join);
2079        assert!(err.is_err(), "{err:?}");
2080        let empty = ColumnFrame::default();
2081        let err = df.join(empty, &join);
2082        assert!(err.is_ok(), "{err:?}");
2083    }
2084
2085    #[rstest]
2086    #[traced_test]
2087    fn test_different_data() {
2088        let mut df = column_frame! {
2089            "group_id" => vec![1, 2, 3],
2090            "feed_tag" => vec![1, 2, 3]
2091        };
2092        let df2 = column_frame! {
2093            "group_id" => vec![11, 21],
2094            "a" => vec![5, 6]
2095        };
2096        let join = JoinRelation::new(JoinBy::Extend);
2097        let err = df.join(df2, &join);
2098        assert!(err.is_ok(), "{err:?}");
2099        println!("{df:?}");
2100        let expected_df = ColumnFrame::new(
2101            KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2102            ndarray::array!(
2103                [1.into(), 1.into(), DataValue::Null],
2104                [2.into(), 2.into(), DataValue::Null],
2105                [3.into(), 3.into(), DataValue::Null],
2106                [11.into(), DataValue::Null, 5.into()],
2107                [21.into(), DataValue::Null, 6.into()]
2108            ),
2109        );
2110        assert_eq!(df, expected_df)
2111    }
2112
2113    #[rstest]
2114    #[traced_test]
2115    fn serde_column_frame() {
2116        let df = column_frame! {
2117            "group_id" => vec![1u64, 2u64, 3u64],
2118            "feed_tag" => vec![1u64, 2u64, 3u64]
2119        };
2120        let key_idx = df.index.clone();
2121        let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2122        let deserialized: KeyIndex =
2123            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2124        assert_eq!(key_idx, deserialized);
2125        assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2126        let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2127        let deserialized: ColumnFrame =
2128            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2129        assert_eq!(df, deserialized);
2130    }
2131
2132    #[rstest]
2133    #[traced_test]
2134    fn update_value() {
2135        let mut df = column_frame! {
2136            "group_id" => vec![1, 2, 3],
2137            "feed_tag" => vec![1, 2, 3]
2138        };
2139        let group_id: Key = "group_id".into();
2140        let v = df.get_mut_by_row_index(&group_id, 1);
2141        assert!(v.is_some());
2142        let v = v.unwrap();
2143        assert_eq!(v, &DataValue::I32(2));
2144        *v = DataValue::U64(22);
2145        let v = df.get_by_row_index(&group_id, 1);
2146        assert!(v.is_some());
2147        let v = v.unwrap();
2148        assert_eq!(v, &DataValue::U64(22));
2149
2150        assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2151    }
2152}