Skip to main content

trs_dataframe/dataframe/
column_store.rs

1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16/// [`ColumnFrame`] is used to store the data for the candidates
17/// The data is stored in the [`Array2`] with the [`DataValue`] values
18/// The data is stored in the columns and the columns are indexed by the [`KeyIndex`]
19/// The [`KeyIndex`] is used to access the data by the column [`Key`]
20/// Memory layout is same like in [`ndarray`] - the data is stored in the row-major order
21#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
22pub struct ColumnFrame {
23    pub index: KeyIndex,
24    pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28    Continue,
29    End,
30}
31
32impl Continue {
33    pub fn should_end(&self) -> bool {
34        matches!(self, Self::End)
35    }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        // Display keys and and indices
43        write!(f, "\n|")?;
44
45        for key in &self.index.keys {
46            write!(f, " {key} |")?;
47        }
48
49        if self.index.is_empty() {
50            writeln!(f, "|")?;
51        }
52
53        // Display type for each key
54        if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55            write!(f, "\n|")?;
56            for value in row.iter() {
57                // Display types during first iteration
58                write!(f, " {:10?} |", crate::detect_dtype(value))?;
59            }
60            writeln!(f)?;
61        }
62
63        writeln!(f, "---")?;
64
65        // Display items, limit output to 256 rows
66        for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67            write!(f, "|")?;
68
69            for value in row.iter() {
70                // Display types during first iteration
71                write!(f, " {value} |")?;
72            }
73            writeln!(f)?;
74
75            if n >= 256 {
76                writeln!(f, "... (dataframe is too long)")?;
77                break;
78            }
79        }
80
81        writeln!(f, "---")
82    }
83}
84pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
85    let x = &item;
86    match dtype {
87        crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
88        crate::DataType::U32 => DataValue::U32(u32::extract(x)),
89        crate::DataType::I32 => DataValue::I32(i32::extract(x)),
90        crate::DataType::U64 => DataValue::U64(u64::extract(x)),
91        crate::DataType::I64 => DataValue::I64(i64::extract(x)),
92        crate::DataType::F32 => DataValue::F32(f32::extract(x)),
93        crate::DataType::U128 => DataValue::U128(u128::extract(x)),
94        crate::DataType::I128 => DataValue::I128(i128::extract(x)),
95        crate::DataType::F64 => DataValue::F64(f64::extract(x)),
96        crate::DataType::U8 => DataValue::U8(u8::extract(x)),
97        crate::DataType::String => DataValue::String(String::extract(x).into()),
98        crate::DataType::Bytes => item,
99        crate::DataType::Map => item,
100        crate::DataType::Vec => item,
101        crate::DataType::Unknown => {
102            if matches!(item, DataValue::Null) {
103                return item;
104            }
105            let dtype = crate::detect_dtype(&item);
106            // this situation should not ever happen
107            if matches!(dtype, crate::DataType::Unknown) {
108                tracing::error!("Unknown datatype {dtype:?} - {item:?}");
109                return item;
110            }
111            convert_data_value(item, dtype)
112        }
113    }
114}
115pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
116    convert_data_value(item, key.ctype)
117}
118impl ColumnFrame {
119    pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
120        Self {
121            index: index.into(),
122            data_frame,
123        }
124    }
125
126    pub fn keys(&self) -> &[Key] {
127        self.index.get_keys()
128    }
129
130    pub fn len(&self) -> usize {
131        self.data_frame.nrows()
132    }
133
134    pub fn is_empty(&self) -> bool {
135        self.data_frame.nrows() == 0
136    }
137
138    pub fn shrink(&mut self) {
139        let shape = self.data_frame.shape();
140        if shape[0] > 0 && shape[1] > 0 {
141            let mut new_data = Vec::with_capacity(shape[0] * shape[1]);
142            for elem in self.data_frame.iter() {
143                new_data.push(elem.clone());
144            }
145            if let Ok(arr) = Array2::from_shape_vec((shape[0], shape[1]), new_data) {
146                self.data_frame = arr;
147            }
148        }
149    }
150
151    /// This method will try to fix dtype based on data stored in each column. If dtype is [`crate::DataType::Unknown`]
152    /// this method will replace dtype for "correct" one based on [`DataValue`].
153    /// NOTE: flag `force` will enforce this dtype even if dtype is known
154    pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
155        for i in 0..self.index.keys.len() {
156            let should_fix = force || matches!(self.index.keys[i].ctype, crate::DataType::Unknown);
157
158            if should_fix {
159                let column = self
160                    .get_single_column(&self.index.keys[i])
161                    .ok_or_else(|| Error::EmptyData)?;
162                let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
163                self.index.keys[i].ctype = dtype;
164            }
165        }
166
167        Ok(())
168    }
169    pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
170        let mut errors = vec![];
171        let keys = self.index.keys.clone();
172        for key in keys {
173            tracing::trace!("key: {key:?}- {:?}", key.ctype);
174            if let Err(e) = self.try_fix_column_by_key(&key) {
175                errors.push((key, e.to_string()));
176            }
177        }
178        if errors.is_empty() {
179            Ok(())
180        } else {
181            Err(Error::CastFailed(errors))
182        }
183    }
184
185    pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
186        let idx = self
187            .index
188            .get_column_index(key)
189            .ok_or(Error::MissingField(format!("{key}").into()))?;
190        let mut col = self.data_frame.column_mut(idx);
191
192        col.mapv_inplace(|item| convert_dv_to_dtype(key, item));
193        Ok(())
194    }
195
196    pub fn enforce_dtype_for_column(
197        &mut self,
198        key: &str,
199        dtype: crate::DataType,
200    ) -> Result<(), Error> {
201        if let Some(idx) = self.index.get_column_index_by_name(key) {
202            let new_key = Key::new(key, dtype);
203            let mut col = self.data_frame.column_mut(idx);
204
205            col.mapv_inplace(|item| convert_dv_to_dtype(&new_key, item));
206            self.index.rename_key(key, new_key)?;
207            Ok(())
208        } else {
209            Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
210        }
211    }
212
213    pub fn get_mut_view(&mut self) -> ArrayViewMut2<'_, DataValue> {
214        self.data_frame.view_mut()
215    }
216
217    pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
218        self.index.rename_key(old, new)
219    }
220
221    pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
222        self.index.add_alias(key, alias)
223    }
224
225    /// Selects the data from the [`ColumnFrame`] by the given keys
226    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
227    /// The data is returned as the [`Vec<Vec<DataValue>>`]
228    /// If the keys are not found, the empty [`Vec<Vec<DataValue>>`] is returned
229    /// Returns the data in the column-major order
230    pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
231        let selected = self.select(Some(keys));
232        let mut result = Vec::with_capacity(selected.nrows());
233        for row in selected.rows() {
234            let mut r = Vec::with_capacity(selected.ncols());
235            for value in row.iter() {
236                r.push(D::extract(value));
237            }
238            result.push(r);
239        }
240        result
241    }
242
243    /// Selects the data from the [`ColumnFrame`] by the given keys
244    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
245    /// The data is returned as the [`Array2`] with the [`DataValue`] values
246    /// If the keys are not found, the empty [`Array2`] is returned
247    /// Returns the [`Array2`] with the data in row-major order (same as ndarray default)
248    /// Note: Despite the method name, the data is NOT transposed - it returns data in row-major order
249    /// Use this when you want row-oriented access to the selected columns
250    pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
251        let keys = keys.unwrap_or_else(|| self.index.get_keys());
252        let key_indexes = self.index.select(keys);
253        if key_indexes.is_empty() {
254            return Ok(Array2::default((0, 0)));
255        }
256        let data_vec: Vec<Array1<DataValue>> = key_indexes
257            .indexes()
258            .iter()
259            .map(|x| self.data_frame.column(*x).to_owned())
260            .collect();
261        to_array2(data_vec)
262    }
263
264    /// Selects whole column from the [`ColumnFrame`] by the given key
265    /// If the key is not found, the None is returned
266    /// If the key is found, the [`ArrayView1`] with the [`DataValue`] values is returned
267    pub fn select_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
268        self.index
269            .get_column_index(key)
270            .map(|x| self.data_frame.column(x))
271    }
272
273    pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
274    where
275        F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
276    {
277        func(keys, self)
278    }
279
280    /// Validates the access to the entry by the given column [`Key`] and row index
281    /// If the column is not found, an error is returned [`Error::NotFound`]
282    /// If the row index is out of bounds, an error is returned [`Error::IndexOutOfRange`]
283    /// Otherwise, the column index is returned
284    pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
285        if row_index >= self.data_frame.nrows() {
286            return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
287        }
288        let Some(column_index) = self.index.get_column_index(column) else {
289            return Err(Error::NotFound(column.clone()));
290        };
291        Ok(column_index)
292    }
293
294    /// Returns the value [`DataValue`] for the given column defined by [`Key`] and row index
295    /// If the column is not found, None is returned
296    /// If the row index is out of bounds, None is returned
297    pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
298        trace!(
299            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
300            self.data_frame.len(),
301            self.data_frame.nrows()
302        );
303        trace!("{:?}", self.data_frame);
304        match self.validate_entry_access(column, row_index) {
305            Ok(column_index) => self.data_frame.get((row_index, column_index)),
306            Err(e) => {
307                trace!("Error: {e}");
308                None
309            }
310        }
311    }
312
313    /// Returns mutable reference for the value [`DataValue`] for the given column defined by [`Key`] and row index
314    /// If the column is not found, None is returned
315    /// If the row index is out of bounds, None is returned
316    pub fn get_mut_by_row_index(
317        &mut self,
318        column: &Key,
319        row_index: usize,
320    ) -> Option<&mut DataValue> {
321        trace!(
322            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
323            self.data_frame.len(),
324            self.data_frame.nrows()
325        );
326        trace!("{:?}", self.data_frame);
327        match self.validate_entry_access(column, row_index) {
328            Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
329            Err(e) => {
330                trace!("Error: {e}");
331                None
332            }
333        }
334    }
335
336    /// Returns the value [`HashMap<Key, Vec<DataValue>>`] for the given columns defined by [`Key`].
337    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
338    /// If the keys are not found, the empty [`HashMap`] is returned
339    /// Returns the [`Array2`] with the data in the row-major order
340    pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
341        let keys = keys.unwrap_or_else(|| self.index.get_keys());
342        let indexes = self.index.select(keys);
343        if indexes.is_empty() {
344            return Default::default();
345        }
346
347        let mut new_data_frame = HashMap::with_capacity(keys.len());
348
349        for key in keys.iter() {
350            if let Some(column_index_in_source) = indexes.get_column_index(key) {
351                let column = self.data_frame.column(column_index_in_source);
352                new_data_frame.insert(key.clone(), column.to_vec());
353            }
354        }
355
356        new_data_frame
357    }
358
359    /// Returns the value [`Array2<DataValue>`] for the given columns defined by [`Key`].
360    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
361    /// If the keys are not found, the empty [`Array2`] is returned
362    /// Returns the [`Array2`] with the data in the row-major order
363    ///
364    /// This operation clones all selected data. For read-only access, consider using
365    /// [`Self::select_column`] to get a view of a single column without copying.
366    pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
367        let keys = keys.unwrap_or_else(|| self.index.get_keys());
368        let indexes = self.index.select(keys);
369        if indexes.is_empty() || keys.is_empty() {
370            return Array2::default((0, 0));
371        }
372
373        let nrows = self.data_frame.nrows();
374        let ncols = keys.len();
375
376        // Pre-allocate vector with exact capacity
377        let mut data = Vec::with_capacity(nrows * ncols);
378
379        // Collect valid column mappings
380        let col_mappings: Vec<(usize, usize)> = keys
381            .iter()
382            .enumerate()
383            .filter_map(|(dst_idx, key)| {
384                indexes
385                    .get_column_index(key)
386                    .map(|src_idx| (dst_idx, src_idx))
387            })
388            .collect();
389
390        // Fill with nulls first, then overwrite valid data
391        data.resize(nrows * ncols, DataValue::Null);
392
393        // Copy column by column - better for row-major layout
394        for row_idx in 0..nrows {
395            for (dst_col, src_col) in &col_mappings {
396                let dst_idx = row_idx * ncols + dst_col;
397                data[dst_idx] = self.data_frame[(row_idx, *src_col)].clone();
398            }
399        }
400
401        Array2::from_shape_vec((nrows, ncols), data).unwrap_or_else(|_| Array2::default((0, 0)))
402    }
403
404    fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
405        self.index.store_key(key);
406        let len = self.data_frame.nrows();
407        self.data_frame.push_column(Array1::default(len).view())?;
408        Ok(())
409    }
410
411    /// Pushes the row candidate into the [`ColumnFrame`]
412    /// If the column is not found this method will add the column to the [`ColumnFrame`]
413    ///
414    /// This operation clones all values from the candidate. For batch operations,
415    /// consider using [`Self::extend`] instead which can be more efficient.
416    pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
417        // Pre-allocate with capacity to avoid reallocations
418        let num_keys = self.index.len();
419        let candidate_keys = row_candidate.keys();
420        let mut arr = Vec::with_capacity(num_keys.max(candidate_keys.len()));
421
422        // First pass: extend dataframe with any new columns
423        for key in &candidate_keys {
424            if self.index.get_column_index(key).is_none() {
425                self.extend_dataframe_for_column(key.clone())?;
426            }
427        }
428
429        // Second pass: collect values - capacity may have changed after extension
430        arr.reserve(self.index.len());
431        for index in self.index.get_keys() {
432            arr.push(
433                row_candidate
434                    .get_value_ref(index)
435                    .cloned()
436                    .unwrap_or(DataValue::Null),
437            );
438        }
439
440        self.data_frame.push_row(Array::from_vec(arr).view())?;
441        Ok(())
442    }
443
444    pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
445        // fixme this is naive approach
446        let mut indexes = KeyIndex::default();
447        // take remove data
448        let data = self.select(Some(keys));
449        // remove labels from the index
450        for key in keys {
451            if let Some((current, _idx)) = self.index.remove_key(key) {
452                indexes.store_key(current);
453            }
454        }
455        // copy the rest of the data to the new data frame with new index
456        let rest = self.select(Some(self.keys()));
457        let keys = self.index.get_keys().to_vec();
458        self.data_frame = rest;
459        self.index = KeyIndex::new(keys);
460
461        //remove_self.data_frame = to_array2(columns)?;
462        Ok(Self::new(indexes, data))
463    }
464
465    fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
466        if self.index.is_empty() {
467            self.index = other.index.clone();
468            self.data_frame = other.data_frame.clone();
469            return Ok(Continue::End);
470        }
471        if other.index.is_empty() {
472            return Ok(Continue::End);
473        }
474        if self.is_empty() {
475            self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
476        }
477
478        Ok(Continue::Continue)
479    }
480
481    fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
482        let missing_keys: Vec<Key> = other
483            .index
484            .get_keys()
485            .iter()
486            .filter(|key| self.index.get_column_index(key).is_none())
487            .cloned()
488            .collect();
489
490        if missing_keys.is_empty() {
491            return Ok(());
492        }
493
494        for key in missing_keys {
495            self.index.store_key(key);
496        }
497
498        let nrows = self.data_frame.nrows();
499        let new_cols = self.index.len() - self.data_frame.ncols();
500
501        if new_cols > 0 {
502            // Use ndarray's optimized concatenate - faster than manual copying
503            let new_data = Array2::default((nrows, new_cols));
504            self.data_frame = concatenate(Axis(1), &[self.data_frame.view(), new_data.view()])?;
505        }
506
507        Ok(())
508    }
509
510    fn try_extend(&mut self, other: Self) -> Result<(), Error> {
511        let mut joined_keys = self.index.clone();
512        for key in other.keys() {
513            if self.index.get_column_index(key).is_none() {
514                joined_keys.store_key(key.clone());
515            }
516        }
517
518        let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
519        let mut arr = Array2::default((sum_len, joined_keys.len()));
520        let increment = self.data_frame.nrows();
521
522        for key in joined_keys.get_keys() {
523            let index_result = joined_keys.get_column_index(key).ok_or_else(|| {
524                Error::UnknownError(format!(
525                    "Index lookup failed for key '{}' in try_extend",
526                    key.name()
527                ))
528            })?;
529
530            let mut col = arr.column_mut(index_result);
531
532            if let Some(index) = self.index.get_column_index(key) {
533                let src_col = self.data_frame.column(index);
534                col.slice_mut(s![..increment]).assign(&src_col);
535            }
536
537            if let Some(index) = other.index.get_column_index(key) {
538                let src_col = other.data_frame.column(index);
539                col.slice_mut(s![increment..]).assign(&src_col);
540            }
541        }
542
543        *self = ColumnFrame::new(joined_keys, arr);
544        Ok(())
545    }
546
547    /// Extends the [`ColumnFrame`] with the data from the other [`ColumnFrame`]
548    /// If the [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
549    /// If the other [`KeyIndex`] is empty, nothing happens
550    /// If the length of the [`KeyIndex`] of the other data frame is greater then current,
551    /// an error is returned [`Error::DataSetSizeDoesntMatch`]
552    /// If [`Key`] from other data frame - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
553    ///
554    pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
555        if self.check_or_init_frame(&other)?.should_end() {
556            return Ok(());
557        }
558
559        if self.index.check_order_of_indexes(&other.index).is_err() {
560            return self.try_extend(other);
561        }
562
563        trace!(
564            "Extend columns from other {:?} vs {:?}",
565            other.index.get_keys(),
566            self.index.get_keys()
567        );
568
569        if other.data_frame.ncols() < self.data_frame.ncols() {
570            other.extend_columns_from_other(self)?;
571        } else {
572            self.extend_columns_from_other(&other)?;
573        }
574        self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
575
576        Ok(())
577    }
578
579    /// Replace the [`ColumnFrame`] with the other [`ColumnFrame`]
580    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
581    /// If the other [`KeyIndex`] is empty, nothing happens
582    /// If the [`KeyIndex`] of the other data frame and current doesn't match an error is returned [`Error::DataSetSizeDoesntMatch`]
583    /// If the [`Key`] from other data frame is not present in the current [`ColumnFrame`] - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
584    pub fn replace(&mut self, other: Self) -> Result<(), Error> {
585        if self.check_or_init_frame(&other)?.should_end() {
586            return Ok(());
587        }
588
589        if self.data_frame.len() > other.data_frame.len() {
590            return Err(Error::DataSetSizeDoesntMatch(
591                self.data_frame.len(),
592                other.data_frame.len(),
593            ));
594        }
595        self.index = other.index;
596        self.data_frame = other.data_frame;
597
598        Ok(())
599    }
600
601    /// Joins the candidates by the keys in the `JoinRelation::JoinById` struct.
602    /// This function creates [`Index`] for the keys and then joins the candidates by the keys.
603    ///
604    pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
605        if self.check_or_init_frame(&right)?.should_end() {
606            return Ok(());
607        }
608
609        let timer = std::time::Instant::now();
610        let new_columns = right.index.get_complement_keys(self.index.get_keys());
611
612        // add new columns and keys into the column frame index
613        self.extend_columns_from_other(&right)?;
614        tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
615
616        // Pre-compute column index mappings AFTER extension to get correct indices
617        let column_mappings: Vec<(usize, usize)> = new_columns
618            .iter()
619            .filter_map(|key| {
620                let left_idx = self.index.get_column_index(key)?;
621                let right_idx = right.index.get_column_index(key)?;
622                Some((left_idx, right_idx))
623            })
624            .collect();
625
626        // get the indexes for the keys
627        let timer = std::time::Instant::now();
628        let index = Index::new(keys.to_vec(), self);
629        tracing::debug!("Left index build took: {}ns", timer.elapsed().as_nanos());
630        tracing::trace!("Index {index:?}");
631
632        let timer = std::time::Instant::now();
633        let right_index = Index::new(keys.to_vec(), &right);
634        let joined_idx = index.join(right_index);
635        tracing::debug!(
636            "Right index build and join took: {}ns",
637            timer.elapsed().as_nanos()
638        );
639
640        // Instead of copying all data, we only fill in the new columns
641        // The existing columns are already correct from extend_columns_from_other
642        let timer = std::time::Instant::now();
643        let joined_idx_len = joined_idx.len();
644
645        for (left_col_idx, right_col_idx) in &column_mappings {
646            let mut left_col = self.data_frame.column_mut(*left_col_idx);
647            let right_col = right.data_frame.column(*right_col_idx);
648
649            for (left_indices, right_indices) in &joined_idx {
650                for right_row_idx in right_indices {
651                    for left_idx in left_indices {
652                        left_col[*left_idx] = right_col[*right_row_idx].clone();
653                    }
654                }
655            }
656        }
657
658        let elapsed = timer.elapsed();
659        tracing::debug!(
660            "Filled {} rows in {}ms|{}s",
661            joined_idx_len,
662            elapsed.as_millis(),
663            elapsed.as_secs()
664        );
665
666        Ok(())
667    }
668
669    /// Adds the single column to the current [`ColumnFrame`]
670    /// If the column is already present, an error is returned [`Error::ColumnAlreadyExists`]
671    /// If the length of the column is different from the current data frame, an error is returned [`Error::DataSetSizeDoesntMatch`]
672    pub fn add_single_column<K: Into<Key>>(
673        &mut self,
674        key: K,
675        column: Array1<DataValue>,
676    ) -> Result<(), Error> {
677        let key = key.into();
678        if self.index.get_column_index(&key).is_some() {
679            return Err(Error::ColumnAlreadyExists(key));
680        }
681        if self.len() != column.len() && !self.is_empty() {
682            return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
683        }
684
685        self.index.store_key(key.clone());
686        let rows = column.len();
687        let column_index = self
688            .index
689            .get_column_index(&key)
690            .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
691        if self.is_empty() && self.index.len() == 1 {
692            self.data_frame = column.into_shape_clone((rows, 1))?;
693            assert_eq!(self.data_frame.column(column_index).len(), rows);
694        } else if self.is_empty() {
695            self.data_frame = Array2::default((column.len(), self.index.len() - 1));
696            self.data_frame.push_column(column.view())?;
697            assert_eq!(self.data_frame.column(column_index).len(), rows);
698        } else {
699            self.data_frame.push_column(column.view())?;
700        }
701        assert_eq!(self.data_frame.column(column_index).len(), rows);
702
703        Ok(())
704    }
705    /// Adds the columns from the other [`ColumnFrame`] to the current [`ColumnFrame`]
706    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
707    /// If the other [`KeyIndex`] is empty, nothing happens
708    pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
709        if self.check_or_init_frame(&other)?.should_end() {
710            return Ok(());
711        }
712
713        self.extend_columns_from_other(&other)?;
714        for (idx, key) in other.index.get_keys().iter().enumerate() {
715            if let Some(index) = self.index.get_column_index(key) {
716                trace!("Other array = {:?}", other.data_frame.dim());
717                if other.data_frame.dim() == (0, 0) {
718                    self.data_frame.column_mut(index).fill(DataValue::Null);
719                    continue;
720                }
721                let arr = other.data_frame.column(idx);
722                trace!(
723                    "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
724                    self.data_frame.nrows(),
725                    arr.len()
726                );
727                if arr.len() != self.data_frame.nrows() {
728                    self.data_frame.column_mut(index).fill(DataValue::Null);
729                } else {
730                    self.data_frame.column_mut(index).assign(&arr);
731                }
732            }
733        }
734        Ok(())
735    }
736
737    /// Broadcasts the data from the other [`ColumnFrame`] to the current [`ColumnFrame`]
738    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
739    /// If the other [`KeyIndex`] is empty, nothing happens
740    /// If the length (number of rows) of the other data frame is greater then 1 an error is returned [`Error::CannotBroadcast`]
741    pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
742        if self.check_or_init_frame(&other)?.should_end() {
743            return Ok(());
744        }
745        if other.data_frame.nrows() != 1 {
746            return Err(Error::CannotBroadcast);
747        }
748
749        // Get all keys (current + new from other)
750        let all_keys = self.index.get_keys().to_vec();
751        let other_keys: Vec<_> = other
752            .index
753            .get_keys()
754            .iter()
755            .filter(|k| self.index.get_column_index(k).is_none())
756            .cloned()
757            .collect();
758
759        // Add new keys to index
760        for key in &other_keys {
761            self.index.store_key(key.clone());
762        }
763
764        let nrows = self.len();
765        let ncols = self.index.len();
766        let ncols_old = all_keys.len();
767
768        // Pre-allocate result buffer
769        let mut data = Vec::with_capacity(nrows * ncols);
770
771        // Build result row by row - single pass allocation
772        for row_idx in 0..nrows {
773            // Copy existing columns
774            for col_idx in 0..ncols_old {
775                data.push(self.data_frame[(row_idx, col_idx)].clone());
776            }
777            // Broadcast values from other dataframe (only 1 row in other)
778            for key in &other_keys {
779                if let Some(other_idx) = other.index.get_column_index(key) {
780                    data.push(other.data_frame[(0, other_idx)].clone());
781                }
782            }
783        }
784
785        self.data_frame = Array2::from_shape_vec((nrows, ncols), data)
786            .map_err(|e| Error::UnknownError(format!("Broadcast reshape failed: {e}")))?;
787
788        Ok(())
789    }
790
791    /// Computes the Cartesian product of the input structures,
792    /// resulting in all possible combinations of elements.
793    /// The data is stored in the row-major order
794    /// The keys are stored in the order they are added - the order is preserved - new keys from the `other` [`ColumnFrame`] are added to the end
795    pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
796        if self.check_or_init_frame(&other)?.should_end() {
797            return Ok(());
798        }
799        // extend the columns
800        // let mut new_index = self.index.clone();
801        for other_key in other.keys() {
802            if self.index.get_column_index(other_key).is_none() {
803                self.index.store_key(other_key.clone());
804            } else {
805                self.index.store_key(Key::new(
806                    format!("{}-{}", other_key, other_key.id()).as_str(),
807                    other_key.ctype,
808                ));
809            }
810        }
811        let max_rows = self.len() * other.len();
812        let ncols = self.index.len();
813        // create new data frame
814        let mut new_df = Array2::default((max_rows, ncols));
815
816        let mut cur_idx = 0;
817        for cur_row in self.data_frame.rows() {
818            for other_row in other.data_frame.rows() {
819                new_df
820                    .slice_mut(s![cur_idx, ..])
821                    .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
822                cur_idx += 1;
823            }
824        }
825        self.data_frame = new_df;
826        Ok(())
827    }
828
829    /// Joins the candidates with the other candidates by the [`JoinRelation`] policy.
830    /// For [`JoinBy::AddColumns`] the columns are added to the existing structure via [`Self::add_columns`]
831    /// For [`JoinBy::Replace`] the columns are replaced with the new columns
832    /// For [`JoinBy::Extend`] the candidates are extended via [`Self::extend`]
833    /// For [`JoinBy::Broadcast`] each candidate is extended with the values of the other candidates `Self::broadcast`
834    /// For [`JoinBy::CartesianProduct`] the candidates are multiplied by the other candidates
835    /// For [`JoinBy::JoinById`] the candidates are joined by the keys in the `JoinRelation::JoinById` struct see [`Self::join_by_id_inner`]
836    pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
837        use JoinBy::*;
838        match &join_type.join_type {
839            AddColumns => self.add_columns(right),
840            Replace => self.replace(right),
841            Extend => self.extend(right),
842            Broadcast => self.broadcast(right),
843            CartesianProduct => self.cartesian_product(right),
844            JoinById(join) => self.join_by_id_inner(right, &join.keys),
845        }
846    }
847
848    pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
849        self.index
850            .get_column_index(key)
851            .map(|x| self.data_frame.column(x))
852    }
853
854    pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
855        let index = self
856            .index
857            .get_column_index(key)
858            .ok_or(Error::NotFound(key.clone()))?;
859        let column = self.data_frame.column(index);
860        let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
861        tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
862        data_with_index.sort_by(
863            |(a_idx, a_val), (b_idx, b_val)| match a_val.partial_cmp(b_val) {
864                Some(ordering) => ordering.then_with(|| a_idx.cmp(b_idx)),
865                None => {
866                    let a_null = matches!(a_val, DataValue::Null);
867                    let b_null = matches!(b_val, DataValue::Null);
868                    match (a_null, b_null) {
869                        (true, true) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
870                        (true, false) => std::cmp::Ordering::Greater.then_with(|| a_idx.cmp(b_idx)),
871                        (false, true) => std::cmp::Ordering::Less.then_with(|| a_idx.cmp(b_idx)),
872                        (false, false) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
873                    }
874                }
875            },
876        );
877
878        tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
879        let indicies = data_with_index
880            .into_iter()
881            .map(|(idx, _)| idx)
882            .collect::<Vec<_>>();
883
884        Ok(sorted_df::SortedDataFrame::new(self, indicies))
885    }
886
887    pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
888        let mut final_indices = Vec::new();
889        let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
890        for rule in &filter.rules {
891            final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
892        }
893
894        final_indices.sort_unstable();
895        final_indices.dedup();
896        let mut new_df = ColumnFrame::new(
897            self.index.clone(),
898            Array2::default((final_indices.len(), self.index.len())),
899        );
900        final_indices
901            .iter()
902            .enumerate()
903            .for_each(|(cur_idx, row_idx)| {
904                new_df
905                    .data_frame
906                    .slice_mut(s![cur_idx, ..])
907                    .assign(&self.data_frame.slice(s![*row_idx, ..]));
908            });
909
910        Ok(new_df)
911    }
912}
913
914pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
915    let width = source.len();
916    let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
917    let height = flattened.len() / width;
918    Ok(flattened.into_shape_with_order((width, height))?)
919}
920#[macro_export]
921macro_rules! df {
922    ($($everything:tt)*) => {
923        $crate::DataFrame::new($crate::column_frame!($($everything)*))
924    };
925}
926
927#[macro_export]
928macro_rules! column_frame {
929    // case { "a" => 1, }
930    ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
931    // case { "a" => vec![1, 2, 3] }
932    ($($key:expr => vec![$($value:expr),*]),*) => {
933        $crate::column_frame!($($key => [$($value),*]),*)
934    };
935    // case { "a" => [1, 2, 3] }
936    ($($key:expr => [$($value:expr),*]),*) => {
937        {
938           let data = ::ndarray::arr2(&[$(
939                [$($value.into(),)*],
940            )*]);
941
942           let _keys = vec![$($key.into(),)*];
943
944            $crate::ColumnFrame::new(
945                $crate::KeyIndex::new(_keys),
946                data.reversed_axes()
947            )
948        }
949    };
950    // case { "a" => 1 }
951    ($($key:expr => $value:expr),*) => {
952        {
953           let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
954           let _keys = vec![$($key.into(),)*];
955
956            $crate::ColumnFrame::new(
957                $crate::KeyIndex::new(_keys),
958                _data,
959            )
960        }
961    };
962}
963
964#[cfg(test)]
965mod test {
966    use crate::{filter::FilterRules, JoinById};
967
968    use super::*;
969    use data_value::stdhashmap;
970    use ndarray::ArrayView;
971    use rstest::*;
972    use tracing_test::traced_test;
973
974    #[rstest]
975    #[case(
976        column_frame! {
977            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
978            "b" => [4, 5, 6],
979            "c" => [7, 8, 9]
980        },
981        column_frame! {
982            "t" => [1752001987000000u64],
983            "b" => [5],
984            "c" => [8]
985        },
986        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
987    )]
988    #[case(
989        column_frame! {
990            "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
991            "b" => [4, 5, 6],
992            "c" => [7, 8, 9]
993        },
994        column_frame! {
995            "t" => [1752001987000000f64],
996            "b" => [5],
997            "c" => [8]
998        },
999        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1000    )]
1001    #[case(
1002        column_frame! {
1003            "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
1004            "b" => [4, 5, 6],
1005            "c" => [7, 8, 9]
1006        },
1007        column_frame! {
1008            "t" => [1752001987000000i64],
1009            "b" => [5],
1010            "c" => [8]
1011        },
1012        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1013    )]
1014    #[case(
1015        column_frame! {
1016            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1017            "b" => [4, 5, 6],
1018            "c" => [7, 8, 9]
1019        },
1020        column_frame! {
1021            "t" => [1751001987000000u64],
1022            "b" => [4],
1023            "c" => [7]
1024        },
1025        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1026    )]
1027    #[case(
1028        column_frame! {
1029            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1030            "b" => [4, 5, 6],
1031            "c" => [7, 8, 9]
1032        },
1033        column_frame! {
1034            "t" => ["2025-07-08 18:13:07"],
1035            "b" => [4],
1036            "c" => [7]
1037        },
1038        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1039    )]
1040    #[case(
1041        column_frame! {
1042            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1043            "b" => [4, 5, 6],
1044            "c" => [7, 8, 9]
1045        },
1046        column_frame! {
1047            "t" => [],
1048            "b" => [],
1049            "c" => []
1050        },
1051        FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
1052    )]
1053    #[case(
1054        column_frame! {
1055            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1056            "b" => [4, 5, 6],
1057            "c" => [7, 8, 9]
1058        },
1059        column_frame! {
1060            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1061            "b" => [4, 5, 6],
1062            "c" => [7, 8, 9]
1063        },
1064        FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
1065    )]
1066    #[case(
1067        column_frame! {
1068            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1069            "b" => [4, 5, 6],
1070            "c" => [7, 8, 9]
1071        },
1072        column_frame! {
1073            "t" => [DataValue::Vec(vec![])],
1074            "b" => [5],
1075            "c" => [ 8]
1076        },
1077        FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
1078    )]
1079    #[case(
1080        column_frame! {
1081            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1082            "b" => [4, 5, 6],
1083            "c" => [7, 8, 9]
1084        },
1085        column_frame! {
1086            "t" => [DataValue::Vec(vec![1.into()])],
1087            "b" => [6],
1088            "c" => [9]
1089        },
1090        FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
1091    )]
1092    #[case(
1093        column_frame! {
1094            "a" => [1, 2, 3],
1095            "b" => [4, 5, 6],
1096            "c" => [7, 8, 9]
1097        },
1098        column_frame! {
1099            "a" => [1, 2],
1100            "b" => [4, 5],
1101            "c" => [7, 8]
1102        },
1103        FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
1104    )]
1105    #[case(
1106        column_frame! {
1107            "a" => [1, 2, 3],
1108            "b" => [4, 5, 6],
1109            "c" => [7, 8, 9]
1110        },
1111        column_frame! {
1112            "a" => [2],
1113            "b" => [5],
1114            "c" => [8]
1115        },
1116        FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
1117    )]
1118    #[case(
1119        column_frame! {
1120            "a" => [1, 2, 3],
1121            "b" => [4, 5, 6],
1122            "c" => [7, 8, 9]
1123        },
1124        column_frame! {
1125            "a" => [],
1126            "b" => [],
1127            "c" => []
1128        },
1129        FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
1130    )]
1131    #[case(
1132        column_frame! {
1133            "a" => [1, 2, 3],
1134            "b" => [4, 5, 6],
1135            "c" => [7, 8, 9]
1136        },
1137        column_frame! {
1138            "a" => [1, 2],
1139            "b" => [4, 5],
1140            "c" => [7, 8]
1141        },
1142        FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1143    )]
1144    #[case(
1145        column_frame! {
1146            "a" => [1, 2, 3],
1147            "b" => [4, 5, 6],
1148            "c" => [7, 8, 9]
1149        },
1150        column_frame! {
1151            "a" => [2],
1152            "b" => [5],
1153            "c" => [8]
1154        },
1155        FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1156    )]
1157    #[case(
1158        column_frame! {
1159            "a" => ["abcd", "ab", "abcdefg"],
1160            "b" => [4, 5, 6],
1161            "c" => [7, 8, 9]
1162        },
1163        column_frame! {
1164            "a" => ["abcd","abcdefg"],
1165            "b" => [4, 6],
1166            "c" => [7, 9]
1167        },
1168        FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1169    )]
1170    #[case(
1171        column_frame! {
1172            "a" => [1, 2, 3],
1173            "b" => [4, 5, 6],
1174            "c" => [7, 8, 9]
1175        },
1176        column_frame! {
1177            "a" => [1],
1178            "b" => [4],
1179            "c" => [7]
1180        },
1181        FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1182    )]
1183    #[case(
1184        column_frame! {
1185            "a" => [1, 2, 3],
1186            "b" => [4, 5, 6],
1187            "c" => [7, 8, 9]
1188        },
1189        column_frame! {
1190            "a" => [2, 3],
1191            "b" => [5, 6],
1192            "c" => [8, 9]
1193        },
1194        FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1195    )]
1196    #[case(
1197        column_frame! {
1198            "a" => [1f64, 2f64, 3f64],
1199            "b" => [4, 5, 6],
1200            "c" => [7, 8, 9]
1201        },
1202        column_frame! {
1203            "a" => [1f64, 2f64],
1204            "b" => [4, 5],
1205            "c" => [7, 8]
1206        },
1207        FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1208    )]
1209    #[case(
1210        column_frame! {
1211            "a" => [1f64, 2f64, 3f64],
1212            "b" => [4i64, 5i64, 6i64],
1213            "c" => [7i64, 8i64, 9i64]
1214        },
1215        column_frame! {
1216            "a" => [1f64, 2f64],
1217            "b" => [4i64, 5i64],
1218            "c" => [7i64, 8i64]
1219        },
1220        FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1221    )]
1222    #[traced_test]
1223    fn filter_test(
1224        #[case] df: ColumnFrame,
1225        #[case] expected: ColumnFrame,
1226        #[case] filter: FilterRules,
1227    ) {
1228        let filtered = df.filter(&filter).expect("BUG: cannot filter");
1229        assert_eq!(filtered, expected);
1230    }
1231
1232    #[rstest]
1233    #[traced_test]
1234    fn test_macro() {
1235        let df = column_frame! {
1236            "a" => 1,
1237            "b" => 2,
1238            "c" => 3,
1239            "d" => 4,
1240        };
1241
1242        assert_eq!(df.len(), 1);
1243        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1244        let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1245            .expect("BUG: cannot create array");
1246        assert_eq!(df.select(None), f);
1247
1248        let df = column_frame! {
1249            "a" => [1, 2, 3],
1250            "b" => [4, 5, 6],
1251            "c" => [7, 8, 9]
1252        };
1253
1254        assert_eq!(df.len(), 3);
1255        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1256        let f = Array2::from_shape_vec(
1257            (3, 3),
1258            vec![
1259                1.into(),
1260                4.into(),
1261                7.into(),
1262                2.into(),
1263                5.into(),
1264                8.into(),
1265                3.into(),
1266                6.into(),
1267                9.into(),
1268            ],
1269        )
1270        .expect("BUG: cannot create array");
1271        let selected = df.select(None);
1272        trace!("{selected:?}");
1273        assert_eq!(selected, f);
1274
1275        let df1 = df! {
1276            "a" => [1, 2, 3],
1277            "b" => [4, 5, 6],
1278            "c" => [7, 8, 9]
1279        };
1280
1281        // just to make sure we've tested Display
1282        let formatted = format!("{}", df);
1283        debug!("{}", formatted);
1284
1285        assert_eq!(df1, crate::DataFrame::from(df));
1286    }
1287
1288    #[rstest]
1289    #[case(
1290        column_frame! {
1291            "a" => [1, 2, 3],
1292            "b" => [4, 5, 6],
1293            "c" => [7, 8, 9]
1294        },
1295        column_frame! {
1296            "a_new" => [1, 2, 3],
1297            "b" => [4, 5, 6],
1298            "c" => [7, 8, 9]
1299        },
1300        vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1301        vec![("a", "a_new".into())]
1302    )]
1303    #[traced_test]
1304    fn rename_test(
1305        #[case] df: ColumnFrame,
1306        #[case] expected: ColumnFrame,
1307        #[case] keys: Vec<Key>,
1308        #[case] renames: Vec<(&str, Key)>,
1309    ) {
1310        let mut df = df;
1311        for (old, new) in renames {
1312            df.rename_key(old, new).expect("BUG: cannot rename key");
1313        }
1314        assert_eq!(df, expected);
1315        assert_eq!(df.keys(), keys.as_slice());
1316    }
1317
1318    #[rstest]
1319    #[case(
1320        column_frame!("a" => [1, 2, 3]),
1321        Key::new("a", crate::DataType::I32),
1322        column_frame!("a" => [1i32, 2i32, 3i32])
1323    )]
1324    #[case(
1325        column_frame!("a" => [1, 2, 3]),
1326        Key::new("a", crate::DataType::U32),
1327        column_frame!("a" => [1u32, 2u32, 3u32])
1328    )]
1329    #[case(
1330        column_frame!("a" => [1, 2, 3]),
1331        Key::new("a", crate::DataType::I64),
1332        column_frame!("a" => [1i64, 2i64, 3i64])
1333    )]
1334    #[case(
1335        column_frame!("a" => [1, 2, 3]),
1336        Key::new("a", crate::DataType::U64),
1337        column_frame!("a" => [1u64, 2u64, 3u64])
1338    )]
1339    #[case(
1340        column_frame!("a" => [1, 2, 3]),
1341        Key::new("a", crate::DataType::F64),
1342        column_frame!("a" => [1f64, 2f64, 3f64])
1343    )]
1344    #[case(
1345        column_frame!("a" => [1, 2, 3]),
1346        Key::new("a", crate::DataType::F32),
1347        column_frame!("a" => [1f32, 2f32, 3f32])
1348    )]
1349    // #[case(
1350    //     column_frame!("a" => [1, 2, 3]),
1351    //     Key::new("a", crate::DataType::String),
1352    //     column_frame!("a" => ["1", "2", "3"])
1353    // )]
1354    fn test_try_fix_dtype(
1355        #[case] mut df: ColumnFrame,
1356        #[case] key: Key,
1357        #[case] expected: ColumnFrame,
1358    ) {
1359        assert!(df.try_fix_column_by_key(&key).is_ok());
1360        assert_eq!(
1361            df.select(Some(&[key.clone()])),
1362            expected.select(Some(&[key.clone()]))
1363        );
1364    }
1365
1366    #[fixture]
1367    fn unknown_df() -> ColumnFrame {
1368        let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1369
1370        hm.insert("a".into(), vec![1u32.into()]);
1371        hm.insert("b".into(), vec![3i64.into()]);
1372        hm.insert("c".into(), vec![1f64.into()]);
1373        hm.insert("d".into(), vec![1u64.into()]);
1374
1375        hm.into()
1376    }
1377    #[rstest]
1378    #[case(stdhashmap!(
1379        "a" => crate::DataType::U32,
1380        "b" => crate::DataType::I64,
1381        "c" => crate::DataType::F64,
1382        "d" => crate::DataType::U64)
1383    )]
1384    fn test_try_fix_dtype_unknown(
1385        mut unknown_df: ColumnFrame,
1386        #[case] dtypes: HashMap<String, crate::DataType>,
1387    ) {
1388        for dtype in dtypes.iter() {
1389            let t: &Key = unknown_df
1390                .keys()
1391                .iter()
1392                .find(|x| x.name() == dtype.0)
1393                .unwrap();
1394            assert_ne!(t.ctype, crate::DataType::Unknown);
1395        }
1396        assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1397        for dtype in dtypes.iter() {
1398            let t: &Key = unknown_df
1399                .keys()
1400                .iter()
1401                .find(|x| x.name() == dtype.0)
1402                .unwrap();
1403            assert_eq!(t.ctype, *dtype.1);
1404            assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1405        }
1406        assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1407    }
1408
1409    #[rstest]
1410    #[case(
1411        column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1412        Key::new("a", crate::DataType::F32),
1413        column_frame!("a" => [1f32, 2f32, 3f32])
1414    )]
1415    #[traced_test]
1416    fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1417        assert!(df.try_fix_dtype().is_ok());
1418        assert_eq!(
1419            df.select(Some(&[key.clone()])),
1420            expected.select(Some(&[key]))
1421        )
1422    }
1423
1424    #[rstest]
1425    #[traced_test]
1426    fn test_not_key_fix() {
1427        let mut cf = column_frame!("a" => [1]);
1428        let non_existing = Key::new("b", crate::DataType::I32);
1429        assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1430    }
1431
1432    #[rstest]
1433    #[case(
1434        column_frame! {
1435            "a" => [1, 2, 3],
1436            "b" => [4, 5, 6],
1437            "c" => [7, 8, 9]
1438        },
1439        vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1440        vec![("a", "a_alias")]
1441    )]
1442    #[traced_test]
1443    fn alias_test(
1444        #[case] df: ColumnFrame,
1445        #[case] keys: Vec<Key>,
1446        #[case] aliases: Vec<(&str, &str)>,
1447    ) {
1448        let mut df = df;
1449        for (old, new) in aliases {
1450            df.add_alias(old, new).expect("BUG: cannot rename key");
1451        }
1452        let origin_keys = df.keys().to_vec();
1453        let selected_aliases = df.select(Some(keys.as_slice()));
1454        let selected = df.select(Some(origin_keys.as_slice()));
1455        assert_eq!(selected, selected_aliases);
1456    }
1457
1458    #[rstest]
1459    #[traced_test]
1460    fn test_mut_view() {
1461        let data = vec![
1462            DataValue::from(1f64),
1463            DataValue::from(4f32),
1464            DataValue::from(2f64),
1465            DataValue::from(f32::NAN),
1466            DataValue::from(f64::NAN),
1467            DataValue::from(f32::INFINITY),
1468        ];
1469        let keys: Vec<Key> = vec!["a".into(), "b".into()];
1470
1471        let index = KeyIndex::new(keys.clone());
1472        let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1473        let mut df = ColumnFrame::new(index.clone(), df);
1474        df.get_mut_view().mapv_inplace(|x| match x {
1475            DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1476            DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1477            e => e,
1478        });
1479        let data = vec![
1480            DataValue::from(1f64),
1481            DataValue::from(4f32),
1482            DataValue::from(2f64),
1483            DataValue::from(0f32),
1484            DataValue::from(0f64),
1485            DataValue::from(0f32),
1486        ];
1487        let expected = ColumnFrame::new(
1488            index,
1489            Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1490        );
1491        assert_eq!(df, expected);
1492    }
1493
1494    #[rstest]
1495    #[traced_test]
1496    fn dummy_test() {
1497        let data = vec![
1498            DataValue::U32(1),
1499            DataValue::I32(2),
1500            DataValue::I64(3),
1501            DataValue::U64(4),
1502        ];
1503
1504        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1505
1506        let index = KeyIndex::new(keys.clone());
1507        let mut data_frame = Array2::default((1, keys.len()));
1508        for (idx, entry) in data.iter().enumerate() {
1509            data_frame
1510                .column_mut(idx)
1511                .assign(&ArrayView::from(&[entry.clone()]));
1512        }
1513
1514        let frame = ColumnFrame::new(index, data_frame);
1515        assert_eq!(
1516            frame.get_by_row_index(&"a".into(), 0),
1517            Some(&DataValue::U32(1))
1518        );
1519        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1520        assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1521        assert_eq!(
1522            frame.select(Some(&["a".into(), "b".into()])),
1523            Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1524                .expect("BUG: cannot create array")
1525        );
1526    }
1527
1528    #[rstest]
1529    #[traced_test]
1530    fn dummy_test_multiple_rows() {
1531        let data = vec![
1532            DataValue::U32(1),
1533            DataValue::I32(2),
1534            DataValue::I64(3),
1535            DataValue::U64(4),
1536            DataValue::U32(12),
1537            DataValue::I32(22),
1538            DataValue::I64(32),
1539            DataValue::U64(42),
1540        ];
1541
1542        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1543
1544        let index = KeyIndex::new(keys.clone());
1545        let data_frame =
1546            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1547
1548        let frame = ColumnFrame::new(index, data_frame);
1549        assert_eq!(
1550            frame.get_by_row_index(&"a".into(), 0),
1551            Some(&DataValue::U32(1))
1552        );
1553        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1554        assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1555        let arr = Array2::from_shape_vec(
1556            (2, 2),
1557            vec![
1558                DataValue::U32(1),
1559                DataValue::I32(2),
1560                DataValue::U32(12),
1561                DataValue::I32(22),
1562            ],
1563        )
1564        .expect("BUG: cannot create array");
1565        trace!("{arr:?}");
1566        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1567    }
1568
1569    #[rstest]
1570    #[traced_test]
1571    fn dummy_test_multiple_rows_push() {
1572        let data = vec![
1573            DataValue::U32(1),
1574            DataValue::I32(2),
1575            DataValue::I64(3),
1576            DataValue::U64(4),
1577            DataValue::U32(12),
1578            DataValue::I32(22),
1579            DataValue::I64(32),
1580            DataValue::U64(42),
1581        ];
1582        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1583
1584        let index = KeyIndex::new(keys.clone());
1585        let data_frame =
1586            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1587
1588        let mut frame = ColumnFrame::new(index, data_frame);
1589        assert!(frame
1590            .push(data_value::stdhashmap!(
1591                "a" => DataValue::U32(2),
1592                "b" => DataValue::I32(3),
1593                "c" => DataValue::I64(4),
1594                "d" => DataValue::U64(5)
1595            ))
1596            .is_ok());
1597        let arr = Array2::from_shape_vec(
1598            (3, 2),
1599            vec![
1600                DataValue::U32(1),
1601                DataValue::I32(2),
1602                DataValue::U32(12),
1603                DataValue::I32(22),
1604                DataValue::U32(2),
1605                DataValue::I32(3),
1606            ],
1607        )
1608        .expect("BUG: cannot create array");
1609        trace!("{arr:?}");
1610        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1611        let result = frame.push(data_value::stdhashmap!(
1612            "a" => DataValue::U32(34),
1613            "b" => DataValue::I32(44),
1614            "c" => DataValue::I64(54),
1615            "e" => DataValue::F32(6f32)
1616        ));
1617        assert!(result.is_ok(), "{result:?}");
1618        let arr = Array2::from_shape_vec(
1619            (4, 2),
1620            vec![
1621                DataValue::U64(4),
1622                DataValue::Null,
1623                DataValue::U64(42),
1624                DataValue::Null,
1625                DataValue::U64(5),
1626                DataValue::Null,
1627                DataValue::Null,
1628                DataValue::F32(6f32),
1629            ],
1630        )
1631        .expect("BUG: cannot create array");
1632        trace!("{arr:?}");
1633        assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1634    }
1635
1636    #[rstest]
1637    #[case(
1638        column_frame! {
1639            "group_id" => vec![1, 2],
1640            "feed_tag" => vec![3, 4]
1641        },
1642        Some(vec![Key::from("group_id")]),
1643        ndarray::array!([1.into()], [2.into()])
1644    )]
1645    #[case(
1646        column_frame! {
1647            "group_id" => vec![1, 2],
1648            "feed_tag" => vec![3, 4]
1649        },
1650        Some(vec!["group_id".into(), "feed_tag".into()]),
1651        ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1652    )]
1653    #[case(
1654        column_frame! {
1655            "group_id" => vec![1, 2],
1656            "feed_tag" => vec![3, DataValue::Null]
1657        },
1658        Some(vec!["feed_tag".into()]),
1659        ndarray::array![[3.into()], [DataValue::Null]]
1660    )]
1661    #[case(
1662        column_frame! {
1663            "group_id" => vec![1, 2],
1664            "feed_tag" => vec![1, DataValue::Null]
1665        },
1666        Some(vec!["feed_tag2".into()]),
1667       Array2::<DataValue>::default((0, 0))
1668    )]
1669    #[traced_test]
1670    fn test_select(
1671        #[case] input: ColumnFrame,
1672        #[case] keys: Option<Vec<Key>>,
1673        #[case] expected: Array2<DataValue>,
1674    ) {
1675        trace!("input={input:?}");
1676        let keys_slice = keys.as_deref();
1677        let selected = input.select(keys_slice);
1678        trace!("selected={selected:?}");
1679        assert_eq!(selected, expected);
1680        let selected = input.select_transposed(keys_slice);
1681        trace!("selected_transposed={selected:?}");
1682        assert!(selected.is_ok());
1683        assert_eq!(selected.unwrap(), expected.t());
1684    }
1685
1686    #[rstest]
1687    #[case(
1688        column_frame! {
1689            "group_id" => vec![1, 2],
1690            "feed_tag" => vec![3, 4]
1691        },
1692        Key::from("group_id"),
1693        Some(ndarray::array!(1.into(), 2.into()))
1694    )]
1695    #[case(
1696        column_frame! {
1697            "group_id" => vec![1, 2, 5, 6],
1698            "feed_tag" => vec![3, 4, 7, 8]
1699        },
1700        Key::from("group_id"),
1701        Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1702    )]
1703    #[case(
1704        column_frame! {
1705            "group_id" => vec![1, 2],
1706            "feed_tag" => vec![1, 1]
1707        },
1708        Key::from("feed_tag1"),
1709        None
1710    )]
1711    #[traced_test]
1712    fn test_select_column(
1713        #[case] input: ColumnFrame,
1714        #[case] key: Key,
1715        #[case] expected: Option<Array1<DataValue>>,
1716    ) {
1717        let selected = input.select_column(&key);
1718        trace!("selected={selected:?}");
1719        match expected {
1720            Some(expected) => {
1721                assert!(selected.is_some());
1722                assert_eq!(selected.expect("BUG: checked above"), expected);
1723            }
1724            None => assert!(selected.is_none()),
1725        }
1726    }
1727
1728    #[test]
1729    #[traced_test]
1730    fn empty_join_test() {
1731        let join = JoinRelation::add_columns();
1732        let mut column_frame = ColumnFrame::default();
1733        column_frame
1734            .add_single_column("group_id", Array1::from_vec(vec![]))
1735            .expect("BUG: cannot add column");
1736        let column_frame2 = column_frame! {
1737            "group_id" => vec![2, 1, 3],
1738            "feed_tag" => vec![1, 1, 1],
1739            "clicks" => vec![100, 10, 10],
1740            "imps" => vec![1000, 200, 200]
1741        };
1742        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1743
1744        let joined = column_frame.join(column_frame2, &join);
1745        assert!(joined.is_ok(), "{joined:?}");
1746
1747        trace!("{column_frame:?}");
1748        assert_eq!(
1749            column_frame.select(Some(&[
1750                "group_id".into(),
1751                "feed_tag".into(),
1752                "clicks".into(),
1753                "imps".into()
1754            ])),
1755            ndarray::array!(
1756                [2.into(), 1.into(), 100.into(), 1000.into()],
1757                [1.into(), 1.into(), 10.into(), 200.into()],
1758                [3.into(), 1.into(), 10.into(), 200.into()],
1759            )
1760        );
1761
1762        let mut column_frame2 = column_frame! {
1763            "feed_tag" => vec![1, 1, 1],
1764            "clicks" => vec![100, 10, 10],
1765            "imps" => vec![1000, 200, 200]
1766        };
1767        let mut column_frame = ColumnFrame::default();
1768        column_frame
1769            .add_single_column("group_id", Array1::from_vec(vec![]))
1770            .expect("BUG: cannot add column");
1771        let joined = column_frame2.join(column_frame, &join);
1772        assert!(joined.is_ok(), "{joined:?}");
1773
1774        trace!("{column_frame2:?}");
1775        assert_eq!(
1776            column_frame2.select(Some(&[
1777                "group_id".into(),
1778                "feed_tag".into(),
1779                "clicks".into(),
1780                "imps".into()
1781            ])),
1782            ndarray::array!(
1783                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1784                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1785                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1786            )
1787        );
1788
1789        let mut column_frame = ColumnFrame::default();
1790        column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1791        let joined = column_frame2.join(column_frame, &join);
1792        assert!(joined.is_ok(), "{joined:?}");
1793
1794        trace!("{column_frame2:?}");
1795        assert_eq!(
1796            column_frame2.select(Some(&[
1797                "group_id2".into(),
1798                "feed_tag".into(),
1799                "clicks".into(),
1800                "imps".into()
1801            ])),
1802            ndarray::array!(
1803                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1804                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1805                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1806            )
1807        );
1808    }
1809
1810    #[test]
1811    #[traced_test]
1812    fn join_test_multiple() {
1813        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1814        let mut column_frame = column_frame! {
1815            "group_id" => vec![1, 1, 3]
1816        };
1817        let column_frame2 = column_frame! {
1818            "group_id" => vec![2, 1, 1],
1819            "clicks" => vec![100, 10, 10],
1820            "imps" => vec![1000, 200, 200]
1821        };
1822
1823        let joined = column_frame.join(column_frame2, &join);
1824        assert!(joined.is_ok(), "{joined:?}");
1825
1826        trace!("{column_frame:?}");
1827        assert_eq!(
1828            column_frame.select(Some(&["group_id".into(), "clicks".into(), "imps".into(),])),
1829            ndarray::array!(
1830                [1.into(), 10.into(), 200.into()],
1831                [1.into(), 10.into(), 200.into()],
1832                [3.into(), DataValue::Null, DataValue::Null],
1833            )
1834        )
1835    }
1836
1837    #[test]
1838    #[traced_test]
1839    fn join_test_no_matches() {
1840        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1841        let mut column_frame = column_frame! {
1842            "group_id" => vec![DataValue::I32(1), DataValue::I32(2), DataValue::I32(3)]
1843        };
1844        let column_frame2 = column_frame! {
1845            "group_id" => vec![DataValue::I32(4), DataValue::I32(5), DataValue::I32(6)],
1846            "clicks" => vec![DataValue::I32(100), DataValue::I32(200), DataValue::I32(300)],
1847        };
1848
1849        let joined = column_frame.join(column_frame2, &join);
1850        assert!(joined.is_ok(), "{joined:?}");
1851
1852        trace!("{column_frame:?}");
1853        assert_eq!(
1854            column_frame.select(Some(&["group_id".into(), "clicks".into()])),
1855            ndarray::array!(
1856                [DataValue::I32(1), DataValue::Null],
1857                [DataValue::I32(2), DataValue::Null],
1858                [DataValue::I32(3), DataValue::Null],
1859            )
1860        )
1861    }
1862    #[test]
1863    #[traced_test]
1864    fn join_test() {
1865        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1866            "group_id".into(),
1867            "feed_tag".into(),
1868        ])));
1869        let mut column_frame = column_frame! {
1870            "group_id" => vec![1, 2, 8],
1871            "feed_tag" => vec![1, 1, 10]
1872        };
1873        let column_frame2 = column_frame! {
1874            "group_id" => vec![2, 1, 3],
1875            "feed_tag" => vec![1, 1, 1],
1876            "clicks" => vec![100, 10, 10],
1877            "imps" => vec![1000, 200, 200]
1878        };
1879        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1880
1881        let joined = column_frame.join(column_frame2, &join);
1882        assert!(joined.is_ok(), "{joined:?}");
1883
1884        trace!("{column_frame:?}");
1885        assert_eq!(
1886            column_frame.select(Some(&[
1887                "group_id".into(),
1888                "feed_tag".into(),
1889                "clicks".into(),
1890                "imps".into()
1891            ])),
1892            ndarray::array!(
1893                [1.into(), 1.into(), 10.into(), 200.into()],
1894                [2.into(), 1.into(), 100.into(), 1000.into()],
1895                [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1896            )
1897        )
1898    }
1899
1900    #[test]
1901    #[traced_test]
1902    fn join_test_with_additional() {
1903        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1904            "group_id".into(),
1905            "feed_tag".into(),
1906        ])));
1907        let mut column_frame = column_frame! {
1908            "group_id" => vec![1, 2, 8],
1909            "feed_tag" => vec![1, 1, 10],
1910            "clicked" => vec![0, 0, 1]
1911        };
1912        let column_frame2 = column_frame! {
1913            "group_id" => vec![2, 1, 3],
1914            "feed_tag" => vec![1, 1, 1],
1915            "clicks" => vec![100, 10, 10],
1916            "imps" => vec![1000, 200, 200]
1917        };
1918        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1919
1920        let joined = column_frame.join(column_frame2, &join);
1921        assert!(joined.is_ok(), "{joined:?}");
1922
1923        trace!("{column_frame:?}");
1924        assert_eq!(
1925            column_frame.select(Some(&[
1926                "group_id".into(),
1927                "feed_tag".into(),
1928                "clicks".into(),
1929                "imps".into(),
1930                "clicked".into()
1931            ])),
1932            ndarray::array!(
1933                [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1934                [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1935                [
1936                    8.into(),
1937                    10.into(),
1938                    DataValue::Null,
1939                    DataValue::Null,
1940                    1.into()
1941                ]
1942            )
1943        )
1944    }
1945
1946    #[test]
1947    #[traced_test]
1948    fn join_test_with_additional_single() {
1949        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1950            "group_id".into(),
1951            "feed_tag".into(),
1952        ])));
1953        let mut column_frame = column_frame! {
1954            "group_id" => vec![1, 2, 8],
1955            "feed_tag" => vec![1, 1, 10],
1956            "clicked" => vec![0, 0, 1]
1957        };
1958        let column_frame2 = column_frame! {
1959            "a" => vec![1],
1960            "group_id" => vec![2],
1961            "feed_tag" => vec![1],
1962            "clicks" => vec![10],
1963            "imps" => vec![200]
1964        };
1965        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1966
1967        let joined = column_frame.join(column_frame2, &join);
1968        assert!(joined.is_ok(), "{joined:?}");
1969
1970        trace!("{column_frame:?}");
1971        assert_eq!(
1972            column_frame.select(Some(&[
1973                "group_id".into(),
1974                "feed_tag".into(),
1975                "clicks".into(),
1976                "imps".into(),
1977                "clicked".into()
1978            ])),
1979            ndarray::array!(
1980                [
1981                    1.into(),
1982                    1.into(),
1983                    DataValue::Null,
1984                    DataValue::Null,
1985                    0.into(),
1986                ],
1987                [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1988                [
1989                    8.into(),
1990                    10.into(),
1991                    DataValue::Null,
1992                    DataValue::Null,
1993                    1.into()
1994                ]
1995            )
1996        )
1997    }
1998
1999    #[rstest]
2000    #[traced_test]
2001    fn cartesian_product_join() {
2002        let mut df = column_frame! {
2003            "group_id" => vec![1, 2, 3],
2004            "feed_tag" => vec![1, 2, 3]
2005        };
2006        let df2 = column_frame! {
2007            "zone_id" => vec![111111, 111133],
2008            "zone_avg_ctr" => vec![0.1, 0.001]
2009        };
2010        assert!(df
2011            .join(
2012                ColumnFrame::default(),
2013                &JoinRelation::new(JoinBy::CartesianProduct)
2014            )
2015            .is_ok());
2016        let join = JoinRelation::new(JoinBy::CartesianProduct);
2017        let result = df.join(df2, &join);
2018        assert!(result.is_ok(), "{result:?}");
2019        let selected = df.select(None);
2020        trace!("{selected:?}");
2021        assert_eq!(
2022            selected,
2023            ndarray::array!(
2024                [1.into(), 1.into(), 111111.into(), 0.1.into()],
2025                [1.into(), 1.into(), 111133.into(), 0.001.into()],
2026                [2.into(), 2.into(), 111111.into(), 0.1.into()],
2027                [2.into(), 2.into(), 111133.into(), 0.001.into()],
2028                [3.into(), 3.into(), 111111.into(), 0.1.into()],
2029                [3.into(), 3.into(), 111133.into(), 0.001.into()],
2030            )
2031        );
2032
2033        let df2 = column_frame! {
2034            "zone_id" => vec![111]
2035        };
2036        let result = df.join(df2, &join);
2037        assert!(result.is_ok(), "{result:?}");
2038        let selected = df.select(None);
2039        trace!("{selected:?}");
2040        assert_eq!(
2041            selected,
2042            ndarray::array!(
2043                [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
2044                [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
2045                [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
2046                [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
2047                [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
2048                [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
2049            )
2050        );
2051    }
2052
2053    #[rstest]
2054    #[traced_test]
2055    fn broadcast_join() {
2056        let mut df = column_frame! {
2057            "group_id" => vec![1, 2, 3],
2058            "feed_tag" => vec![1, 2, 3]
2059        };
2060        let df2 = column_frame! {
2061            "zone_id" => vec![111111]
2062        };
2063        assert!(df
2064            .join(
2065                ColumnFrame::default(),
2066                &JoinRelation::new(JoinBy::Broadcast)
2067            )
2068            .is_ok());
2069        let join = JoinRelation::new(JoinBy::Broadcast);
2070        assert!(df.join(df2, &join).is_ok());
2071        let selected = df.select(None);
2072        trace!("{selected:?}");
2073        assert_eq!(
2074            selected,
2075            ndarray::array!(
2076                [1.into(), 1.into(), 111111.into()],
2077                [2.into(), 2.into(), 111111.into()],
2078                [3.into(), 3.into(), 111111.into()]
2079            )
2080        );
2081    }
2082    #[rstest]
2083    #[traced_test]
2084    fn merge_test() {
2085        let mut df = column_frame! {
2086            "group_id" => vec![1, 2, 3],
2087            "feed_tag" => vec![1, 2, 3]
2088        };
2089        let df2 = column_frame! {
2090            "group_id" => vec![11, 21, 31],
2091            "feed_tag" => vec![12, 22, 32]
2092        };
2093
2094        let join = JoinRelation::new(JoinBy::Replace);
2095        assert!(df.join(df2, &join).is_ok());
2096        let selected = df.select(None);
2097        trace!("{selected:?}");
2098        assert_eq!(
2099            selected,
2100            ndarray::array!(
2101                [11.into(), 12.into()],
2102                [21.into(), 22.into()],
2103                [31.into(), 32.into()]
2104            )
2105        );
2106    }
2107
2108    #[rstest]
2109    #[traced_test]
2110    fn extend_test() {
2111        let mut df = column_frame! {
2112            "group_id" => vec![1, 2, 3],
2113            "feed_tag" => vec![1, 2, 3]
2114        };
2115        let df2 = column_frame! {
2116            "group_id" => vec![11, 21, 31],
2117            "feed_tag" => vec![5, 6, 7]
2118        };
2119        assert!(df
2120            .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
2121            .is_ok());
2122
2123        let join = JoinRelation::new(JoinBy::Extend);
2124        assert!(df.join(df2, &join).is_ok());
2125        let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
2126        trace!("{selected:?}");
2127        assert_eq!(
2128            selected,
2129            ndarray::array!(
2130                [1.into(), 1.into()],
2131                [2.into(), 2.into()],
2132                [3.into(), 3.into()],
2133                [5.into(), 11.into()],
2134                [6.into(), 21.into()],
2135                [7.into(), 31.into()]
2136            )
2137        );
2138        let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
2139        trace!("{as_map:?}");
2140        assert_eq!(
2141            as_map,
2142            stdhashmap!(
2143                "feed_tag" => vec![1, 2, 3, 5, 6, 7],
2144                "group_id" => vec![1, 2, 3, 11, 21, 31]
2145            )
2146        );
2147
2148        let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
2149        trace!("{as_map:?}");
2150        assert_eq!(as_map, HashMap::default());
2151    }
2152
2153    #[rstest]
2154    #[traced_test]
2155    fn extend_test_with_non_existing_cols() {
2156        let mut df = column_frame! {
2157            "group_id" => vec![1, 2, 3],
2158            "feed_tag" => vec![1, 2, 3]
2159        };
2160        let mut df2 = column_frame! {
2161            "group_id" => vec![11, 21, 31],
2162            "feed_tag" => vec![5, 6, 7],
2163            "clicks" => vec![100, 200, 300],
2164            "impressions" => vec![1000, 2000, 3000]
2165        };
2166        let df_bckp = df.clone();
2167        let join = JoinRelation::new(JoinBy::Extend);
2168        assert!(df.join(df2.clone(), &join).is_ok());
2169        let selected = df.select(None);
2170        trace!("{selected:?}");
2171        assert_eq!(
2172            selected,
2173            ndarray::array!(
2174                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2175                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2176                [3.into(), 3.into(), DataValue::Null, DataValue::Null],
2177                [11.into(), 5.into(), 100.into(), 1000.into()],
2178                [21.into(), 6.into(), 200.into(), 2000.into()],
2179                [31.into(), 7.into(), 300.into(), 3000.into()]
2180            )
2181        );
2182        let join = JoinRelation::new(JoinBy::Extend);
2183        let r = df2.join(df_bckp, &join);
2184        assert!(r.is_ok(), "{r:?}");
2185        let selected = df2.select(None);
2186        trace!("{selected:?}");
2187        assert_eq!(
2188            selected,
2189            ndarray::array!(
2190                [11.into(), 5.into(), 100.into(), 1000.into()],
2191                [21.into(), 6.into(), 200.into(), 2000.into()],
2192                [31.into(), 7.into(), 300.into(), 3000.into()],
2193                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2194                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2195                [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2196            )
2197        );
2198    }
2199
2200    #[rstest]
2201    #[traced_test]
2202    fn extend_test_with_non_existing_cols_wrong_order() {
2203        let mut df = column_frame! {
2204            "group_id" => vec![1, 2, 3],
2205            "feed_tag" => vec![1, 2, 3]
2206        };
2207        let df2 = column_frame! {
2208            "feed_tag" => vec![5, 6, 7],
2209            "group_id" => vec![11, 21, 31]
2210        };
2211        let join = JoinRelation::new(JoinBy::Extend);
2212        let err = df.join(df2, &join);
2213        assert!(err.is_ok(), "{err:?}");
2214    }
2215
2216    #[rstest]
2217    #[traced_test]
2218    fn test_replace_not_compatible() {
2219        let mut df = column_frame! {
2220            "group_id" => vec![1, 2, 3],
2221            "feed_tag" => vec![1, 2, 3]
2222        };
2223        let df2 = column_frame! {
2224            "feed_tag" => vec![5, 6],
2225            "group_id" => vec![11, 21]
2226        };
2227        let join = JoinRelation::new(JoinBy::Replace);
2228        let err = df.join(df2, &join);
2229        assert!(err.is_err(), "{err:?}");
2230        let empty = ColumnFrame::default();
2231        let err = df.join(empty, &join);
2232        assert!(err.is_ok(), "{err:?}");
2233    }
2234
2235    #[rstest]
2236    #[traced_test]
2237    fn test_different_data() {
2238        let mut df = column_frame! {
2239            "group_id" => vec![1, 2, 3],
2240            "feed_tag" => vec![1, 2, 3]
2241        };
2242        let df2 = column_frame! {
2243            "group_id" => vec![11, 21],
2244            "a" => vec![5, 6]
2245        };
2246        let join = JoinRelation::new(JoinBy::Extend);
2247        let err = df.join(df2, &join);
2248        assert!(err.is_ok(), "{err:?}");
2249        println!("{df:?}");
2250        let expected_df = ColumnFrame::new(
2251            KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2252            ndarray::array!(
2253                [1.into(), 1.into(), DataValue::Null],
2254                [2.into(), 2.into(), DataValue::Null],
2255                [3.into(), 3.into(), DataValue::Null],
2256                [11.into(), DataValue::Null, 5.into()],
2257                [21.into(), DataValue::Null, 6.into()]
2258            ),
2259        );
2260        assert_eq!(df, expected_df)
2261    }
2262
2263    #[rstest]
2264    #[traced_test]
2265    fn serde_column_frame() {
2266        let df = column_frame! {
2267            "group_id" => vec![1u64, 2u64, 3u64],
2268            "feed_tag" => vec![1u64, 2u64, 3u64]
2269        };
2270        let key_idx = df.index.clone();
2271        let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2272        let deserialized: KeyIndex =
2273            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2274        assert_eq!(key_idx, deserialized);
2275        assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2276        let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2277        let deserialized: ColumnFrame =
2278            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2279        assert_eq!(df, deserialized);
2280    }
2281
2282    #[rstest]
2283    #[traced_test]
2284    fn update_value() {
2285        let mut df = column_frame! {
2286            "group_id" => vec![1, 2, 3],
2287            "feed_tag" => vec![1, 2, 3]
2288        };
2289        let group_id: Key = "group_id".into();
2290        let v = df.get_mut_by_row_index(&group_id, 1);
2291        assert!(v.is_some());
2292        let v = v.unwrap();
2293        assert_eq!(v, &DataValue::I32(2));
2294        *v = DataValue::U64(22);
2295        let v = df.get_by_row_index(&group_id, 1);
2296        assert!(v.is_some());
2297        let v = v.unwrap();
2298        assert_eq!(v, &DataValue::U64(22));
2299
2300        assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2301    }
2302}