Skip to main content

trs_dataframe/dataframe/
column_store.rs

1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16/// [`ColumnFrame`] is used to store the data for the candidates
17/// The data is stored in the [`Array2`] with the [`DataValue`] values
18/// The data is stored in the columns and the columns are indexed by the [`KeyIndex`]
19/// The [`KeyIndex`] is used to access the data by the column [`Key`]
20/// Memory layout is same like in [`ndarray`] - the data is stored in the row-major order
21#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
22pub struct ColumnFrame {
23    pub index: KeyIndex,
24    pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28    Continue,
29    End,
30}
31
32impl Continue {
33    pub fn should_end(&self) -> bool {
34        matches!(self, Self::End)
35    }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        // Display keys and and indices
43        write!(f, "\n|")?;
44
45        for key in &self.index.keys {
46            write!(f, " {key} |")?;
47        }
48
49        if self.index.is_empty() {
50            writeln!(f, "|")?;
51        }
52
53        // Display type for each key
54        if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55            write!(f, "\n|")?;
56            for value in row.iter() {
57                // Display types during first iteration
58                write!(f, " {:10?} |", crate::detect_dtype(value))?;
59            }
60            writeln!(f)?;
61        }
62
63        writeln!(f, "---")?;
64
65        // Display items, limit output to 256 rows
66        for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67            write!(f, "|")?;
68
69            for value in row.iter() {
70                // Display types during first iteration
71                write!(f, " {value} |")?;
72            }
73            writeln!(f)?;
74
75            if n >= 256 {
76                writeln!(f, "... (dataframe is too long)")?;
77                break;
78            }
79        }
80
81        writeln!(f, "---")
82    }
83}
84pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
85    let x = &item;
86    match dtype {
87        crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
88        crate::DataType::U32 => DataValue::U32(u32::extract(x)),
89        crate::DataType::I32 => DataValue::I32(i32::extract(x)),
90        crate::DataType::U64 => DataValue::U64(u64::extract(x)),
91        crate::DataType::I64 => DataValue::I64(i64::extract(x)),
92        crate::DataType::F32 => DataValue::F32(f32::extract(x)),
93        crate::DataType::U128 => DataValue::U128(u128::extract(x)),
94        crate::DataType::I128 => DataValue::I128(i128::extract(x)),
95        crate::DataType::F64 => DataValue::F64(f64::extract(x)),
96        crate::DataType::U8 => DataValue::U8(u8::extract(x)),
97        crate::DataType::String => DataValue::String(String::extract(x).into()),
98        crate::DataType::Bytes => item,
99        crate::DataType::Map => item,
100        crate::DataType::Vec => item,
101        crate::DataType::Unknown => {
102            if matches!(item, DataValue::Null) {
103                return item;
104            }
105            let dtype = crate::detect_dtype(&item);
106            // this situation should not ever happen
107            if matches!(dtype, crate::DataType::Unknown) {
108                tracing::error!("Unknown datatype {dtype:?} - {item:?}");
109                return item;
110            }
111            convert_data_value(item, dtype)
112        }
113    }
114}
115pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
116    convert_data_value(item, key.ctype)
117}
118impl ColumnFrame {
119    pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
120        Self {
121            index: index.into(),
122            data_frame,
123        }
124    }
125
126    pub fn keys(&self) -> &[Key] {
127        self.index.get_keys()
128    }
129
130    pub fn len(&self) -> usize {
131        self.data_frame.nrows()
132    }
133
134    pub fn is_empty(&self) -> bool {
135        self.data_frame.nrows() == 0
136    }
137
138    pub fn shrink(&mut self) {
139        let shape = self.data_frame.shape();
140        if shape[0] > 0 && shape[1] > 0 {
141            let mut new_data = Vec::with_capacity(shape[0] * shape[1]);
142            for elem in self.data_frame.iter() {
143                new_data.push(elem.clone());
144            }
145            if let Ok(arr) = Array2::from_shape_vec((shape[0], shape[1]), new_data) {
146                self.data_frame = arr;
147            }
148        }
149    }
150
151    /// This method will try to fix dtype based on data stored in each column. If dtype is [`crate::DataType::Unknown`]
152    /// this method will replace dtype for "correct" one based on [`DataValue`].
153    /// NOTE: flag `force` will enforce this dtype even if dtype is known
154    pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
155        for i in 0..self.index.keys.len() {
156            let should_fix = force || matches!(self.index.keys[i].ctype, crate::DataType::Unknown);
157
158            if should_fix {
159                let column = self
160                    .get_single_column(&self.index.keys[i])
161                    .ok_or_else(|| Error::EmptyData)?;
162                let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
163                self.index.keys[i].ctype = dtype;
164            }
165        }
166
167        Ok(())
168    }
169    pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
170        let mut errors = vec![];
171        let keys = self.index.keys.clone();
172        for key in keys {
173            tracing::trace!("key: {key:?}- {:?}", key.ctype);
174            if let Err(e) = self.try_fix_column_by_key(&key) {
175                errors.push((key, e.to_string()));
176            }
177        }
178        if errors.is_empty() {
179            Ok(())
180        } else {
181            Err(Error::CastFailed(errors))
182        }
183    }
184
185    pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
186        let idx = self
187            .index
188            .get_column_index(key)
189            .ok_or(Error::MissingField(format!("{key}").into()))?;
190        let mut col = self.data_frame.column_mut(idx);
191
192        col.mapv_inplace(|item| convert_dv_to_dtype(key, item));
193        Ok(())
194    }
195
196    pub fn enforce_dtype_for_column(
197        &mut self,
198        key: &str,
199        dtype: crate::DataType,
200    ) -> Result<(), Error> {
201        if let Some(idx) = self.index.get_column_index_by_name(key) {
202            let new_key = Key::new(key, dtype);
203            let mut col = self.data_frame.column_mut(idx);
204
205            col.mapv_inplace(|item| convert_dv_to_dtype(&new_key, item));
206            self.index.rename_key(key, new_key)?;
207            Ok(())
208        } else {
209            Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
210        }
211    }
212
213    pub fn get_mut_view(&mut self) -> ArrayViewMut2<'_, DataValue> {
214        self.data_frame.view_mut()
215    }
216
217    pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
218        self.index.rename_key(old, new)
219    }
220
221    pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
222        self.index.add_alias(key, alias)
223    }
224
225    /// Selects the data from the [`ColumnFrame`] by the given keys
226    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
227    /// The data is returned as the [`Vec<Vec<DataValue>>`]
228    /// If the keys are not found, the empty [`Vec<Vec<DataValue>>`] is returned
229    /// Returns the data in the column-major order
230    pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
231        let selected = self.select(Some(keys));
232        let mut result = Vec::with_capacity(selected.nrows());
233        for row in selected.rows() {
234            let mut r = Vec::with_capacity(selected.ncols());
235            for value in row.iter() {
236                r.push(D::extract(value));
237            }
238            result.push(r);
239        }
240        result
241    }
242
243    /// Selects the data from the [`ColumnFrame`] by the given keys
244    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
245    /// The data is returned as the [`Array2`] with the [`DataValue`] values
246    /// If the keys are not found, the empty [`Array2`] is returned
247    /// Returns the [`Array2`] with the data in row-major order (same as ndarray default)
248    /// Note: Despite the method name, the data is NOT transposed - it returns data in row-major order
249    /// Use this when you want row-oriented access to the selected columns
250    pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
251        let keys = keys.unwrap_or_else(|| self.index.get_keys());
252        let key_indexes = self.index.select(keys);
253        if key_indexes.is_empty() {
254            return Ok(Array2::default((0, 0)));
255        }
256        let data_vec: Vec<Array1<DataValue>> = key_indexes
257            .indexes()
258            .iter()
259            .map(|x| self.data_frame.column(*x).to_owned())
260            .collect();
261        to_array2(data_vec)
262    }
263
264    /// Selects whole column from the [`ColumnFrame`] by the given key
265    /// If the key is not found, the None is returned
266    /// If the key is found, the [`ArrayView1`] with the [`DataValue`] values is returned
267    pub fn select_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
268        self.index
269            .get_column_index(key)
270            .map(|x| self.data_frame.column(x))
271    }
272
273    pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
274    where
275        F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
276    {
277        func(keys, self)
278    }
279
280    /// Validates the access to the entry by the given column [`Key`] and row index
281    /// If the column is not found, an error is returned [`Error::NotFound`]
282    /// If the row index is out of bounds, an error is returned [`Error::IndexOutOfRange`]
283    /// Otherwise, the column index is returned
284    pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
285        if row_index >= self.data_frame.nrows() {
286            return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
287        }
288        let Some(column_index) = self.index.get_column_index(column) else {
289            return Err(Error::NotFound(column.clone()));
290        };
291        Ok(column_index)
292    }
293
294    /// Returns the value [`DataValue`] for the given column defined by [`Key`] and row index
295    /// If the column is not found, None is returned
296    /// If the row index is out of bounds, None is returned
297    pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
298        trace!(
299            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
300            self.data_frame.len(),
301            self.data_frame.nrows()
302        );
303        trace!("{:?}", self.data_frame);
304        match self.validate_entry_access(column, row_index) {
305            Ok(column_index) => self.data_frame.get((row_index, column_index)),
306            Err(e) => {
307                trace!("Error: {e}");
308                None
309            }
310        }
311    }
312
313    /// Returns mutable reference for the value [`DataValue`] for the given column defined by [`Key`] and row index
314    /// If the column is not found, None is returned
315    /// If the row index is out of bounds, None is returned
316    pub fn get_mut_by_row_index(
317        &mut self,
318        column: &Key,
319        row_index: usize,
320    ) -> Option<&mut DataValue> {
321        trace!(
322            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
323            self.data_frame.len(),
324            self.data_frame.nrows()
325        );
326        trace!("{:?}", self.data_frame);
327        match self.validate_entry_access(column, row_index) {
328            Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
329            Err(e) => {
330                trace!("Error: {e}");
331                None
332            }
333        }
334    }
335
336    /// Returns the value [`HashMap<Key, Vec<DataValue>>`] for the given columns defined by [`Key`].
337    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
338    /// If the keys are not found, the empty [`HashMap`] is returned
339    /// Returns the [`Array2`] with the data in the row-major order
340    pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
341        let keys = keys.unwrap_or_else(|| self.index.get_keys());
342        let indexes = self.index.select(keys);
343        if indexes.is_empty() {
344            return Default::default();
345        }
346
347        let mut new_data_frame = HashMap::with_capacity(keys.len());
348
349        for key in keys.iter() {
350            if let Some(column_index_in_source) = indexes.get_column_index(key) {
351                let column = self.data_frame.column(column_index_in_source);
352                new_data_frame.insert(key.clone(), column.to_vec());
353            }
354        }
355
356        new_data_frame
357    }
358
359    /// Returns the value [`Array2<DataValue>`] for the given columns defined by [`Key`].
360    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
361    /// If the keys are not found, the empty [`Array2`] is returned
362    /// Returns the [`Array2`] with the data in the row-major order
363    ///
364    /// This operation clones all selected data. For read-only access, consider using
365    /// [`Self::select_column`] to get a view of a single column without copying.
366    pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
367        let keys = keys.unwrap_or_else(|| self.index.get_keys());
368        let indexes = self.index.select(keys);
369        if indexes.is_empty() || keys.is_empty() {
370            return Array2::default((0, 0));
371        }
372
373        let nrows = self.data_frame.nrows();
374        let ncols = keys.len();
375
376        // Pre-allocate vector with exact capacity
377        let mut data = Vec::with_capacity(nrows * ncols);
378
379        // Collect valid column mappings
380        let col_mappings: Vec<(usize, usize)> = keys
381            .iter()
382            .enumerate()
383            .filter_map(|(dst_idx, key)| {
384                indexes
385                    .get_column_index(key)
386                    .map(|src_idx| (dst_idx, src_idx))
387            })
388            .collect();
389
390        // Fill with nulls first, then overwrite valid data
391        data.resize(nrows * ncols, DataValue::Null);
392
393        // Copy column by column - better for row-major layout
394        for row_idx in 0..nrows {
395            for (dst_col, src_col) in &col_mappings {
396                let dst_idx = row_idx * ncols + dst_col;
397                data[dst_idx] = self.data_frame[(row_idx, *src_col)].clone();
398            }
399        }
400
401        Array2::from_shape_vec((nrows, ncols), data).unwrap_or_else(|_| Array2::default((0, 0)))
402    }
403
404    fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
405        self.index.store_key(key);
406        let len = self.data_frame.nrows();
407        self.data_frame.push_column(Array1::default(len).view())?;
408        Ok(())
409    }
410
411    /// Pushes the row candidate into the [`ColumnFrame`]
412    /// If the column is not found this method will add the column to the [`ColumnFrame`]
413    ///
414    /// This operation clones all values from the candidate. For batch operations,
415    /// consider using [`Self::extend`] instead which can be more efficient.
416    pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
417        // Pre-allocate with capacity to avoid reallocations
418        let num_keys = self.index.len();
419        let candidate_keys = row_candidate.keys();
420        let mut arr = Vec::with_capacity(num_keys.max(candidate_keys.len()));
421
422        // First pass: extend dataframe with any new columns
423        for key in &candidate_keys {
424            if self.index.get_column_index(key).is_none() {
425                self.extend_dataframe_for_column(key.clone())?;
426            }
427        }
428
429        // Second pass: collect values - capacity may have changed after extension
430        arr.reserve(self.index.len());
431        for index in self.index.get_keys() {
432            arr.push(
433                row_candidate
434                    .get_value_ref(index)
435                    .cloned()
436                    .unwrap_or(DataValue::Null),
437            );
438        }
439
440        self.data_frame.push_row(Array::from_vec(arr).view())?;
441        Ok(())
442    }
443
444    pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
445        // fixme this is naive approach
446        let mut indexes = KeyIndex::default();
447        // take remove data
448        let data = self.select(Some(keys));
449        // remove labels from the index
450        for key in keys {
451            if let Some((current, _idx)) = self.index.remove_key(key) {
452                indexes.store_key(current);
453            }
454        }
455        // copy the rest of the data to the new data frame with new index
456        let rest = self.select(Some(self.keys()));
457        let keys = self.index.get_keys().to_vec();
458        self.data_frame = rest;
459        self.index = KeyIndex::new(keys);
460
461        //remove_self.data_frame = to_array2(columns)?;
462        Ok(Self::new(indexes, data))
463    }
464
465    fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
466        if self.index.is_empty() {
467            self.index = other.index.clone();
468            self.data_frame = other.data_frame.clone();
469            return Ok(Continue::End);
470        }
471        if other.index.is_empty() {
472            return Ok(Continue::End);
473        }
474        if self.is_empty() {
475            self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
476        }
477
478        Ok(Continue::Continue)
479    }
480
481    fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
482        let missing_keys: Vec<Key> = other
483            .index
484            .get_keys()
485            .iter()
486            .filter(|key| self.index.get_column_index(key).is_none())
487            .cloned()
488            .collect();
489
490        if missing_keys.is_empty() {
491            return Ok(());
492        }
493
494        for key in missing_keys {
495            self.index.store_key(key);
496        }
497
498        let nrows = self.data_frame.nrows();
499        let new_cols = self.index.len() - self.data_frame.ncols();
500
501        if new_cols > 0 {
502            // Use ndarray's optimized concatenate - faster than manual copying
503            let new_data = Array2::default((nrows, new_cols));
504            self.data_frame = concatenate(Axis(1), &[self.data_frame.view(), new_data.view()])?;
505        }
506
507        Ok(())
508    }
509
510    fn try_extend(&mut self, other: Self) -> Result<(), Error> {
511        let mut joined_keys = self.index.clone();
512        for key in other.keys() {
513            if self.index.get_column_index(key).is_none() {
514                joined_keys.store_key(key.clone());
515            }
516        }
517
518        let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
519        let mut arr = Array2::default((sum_len, joined_keys.len()));
520        let increment = self.data_frame.nrows();
521
522        for key in joined_keys.get_keys() {
523            let index_result = joined_keys.get_column_index(key).ok_or_else(|| {
524                Error::UnknownError(format!(
525                    "Index lookup failed for key '{}' in try_extend",
526                    key.name()
527                ))
528            })?;
529
530            let mut col = arr.column_mut(index_result);
531
532            if let Some(index) = self.index.get_column_index(key) {
533                let src_col = self.data_frame.column(index);
534                col.slice_mut(s![..increment]).assign(&src_col);
535            }
536
537            if let Some(index) = other.index.get_column_index(key) {
538                let src_col = other.data_frame.column(index);
539                col.slice_mut(s![increment..]).assign(&src_col);
540            }
541        }
542
543        *self = ColumnFrame::new(joined_keys, arr);
544        Ok(())
545    }
546
547    /// Extends the [`ColumnFrame`] with the data from the other [`ColumnFrame`]
548    /// If the [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
549    /// If the other [`KeyIndex`] is empty, nothing happens
550    /// If the length of the [`KeyIndex`] of the other data frame is greater then current,
551    /// an error is returned [`Error::DataSetSizeDoesntMatch`]
552    /// If [`Key`] from other data frame - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
553    ///
554    pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
555        if self.check_or_init_frame(&other)?.should_end() {
556            return Ok(());
557        }
558
559        if self.index.check_order_of_indexes(&other.index).is_err() {
560            return self.try_extend(other);
561        }
562
563        trace!(
564            "Extend columns from other {:?} vs {:?}",
565            other.index.get_keys(),
566            self.index.get_keys()
567        );
568
569        if other.data_frame.ncols() < self.data_frame.ncols() {
570            other.extend_columns_from_other(self)?;
571        } else {
572            self.extend_columns_from_other(&other)?;
573        }
574        self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
575
576        Ok(())
577    }
578
579    /// Replace the [`ColumnFrame`] with the other [`ColumnFrame`]
580    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
581    /// If the other [`KeyIndex`] is empty, nothing happens
582    /// If the [`KeyIndex`] of the other data frame and current doesn't match an error is returned [`Error::DataSetSizeDoesntMatch`]
583    /// If the [`Key`] from other data frame is not present in the current [`ColumnFrame`] - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
584    pub fn replace(&mut self, other: Self) -> Result<(), Error> {
585        if self.check_or_init_frame(&other)?.should_end() {
586            return Ok(());
587        }
588
589        if self.data_frame.len() > other.data_frame.len() {
590            return Err(Error::DataSetSizeDoesntMatch(
591                self.data_frame.len(),
592                other.data_frame.len(),
593            ));
594        }
595        self.index = other.index;
596        self.data_frame = other.data_frame;
597
598        Ok(())
599    }
600
601    /// Joins the candidates by the keys in the `JoinRelation::JoinById` struct.
602    /// This function creates [`Index`] for the keys and then joins the candidates by the keys.
603    ///
604    pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
605        if self.check_or_init_frame(&right)?.should_end() {
606            return Ok(());
607        }
608
609        let timer = std::time::Instant::now();
610        let new_columns = right.index.get_complement_keys(self.index.get_keys());
611
612        // add new columns and keys into the column frame index
613        self.extend_columns_from_other(&right)?;
614        tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
615
616        // Pre-compute column index mappings AFTER extension to get correct indices
617        let column_mappings: Vec<(usize, usize)> = new_columns
618            .iter()
619            .filter_map(|key| {
620                let left_idx = self.index.get_column_index(key)?;
621                let right_idx = right.index.get_column_index(key)?;
622                Some((left_idx, right_idx))
623            })
624            .collect();
625
626        // get the indexes for the keys
627        let timer = std::time::Instant::now();
628        let index = Index::new(keys.to_vec(), self);
629        tracing::debug!("Left index build took: {}ns", timer.elapsed().as_nanos());
630        tracing::trace!("Index {index:?}");
631
632        let timer = std::time::Instant::now();
633        let right_index = Index::new(keys.to_vec(), &right);
634        let joined_idx = index.join(right_index);
635        tracing::debug!(
636            "Right index build and join took: {}ns",
637            timer.elapsed().as_nanos()
638        );
639
640        // Instead of copying all data, we only fill in the new columns
641        // The existing columns are already correct from extend_columns_from_other
642        let timer = std::time::Instant::now();
643        let joined_idx_len = joined_idx.len();
644
645        for (left_index, right_index) in joined_idx {
646            if let Some(right_row_idx) = right_index {
647                // Direct column-wise copy instead of row-wise to improve cache locality
648                for (left_col_idx, right_col_idx) in &column_mappings {
649                    if let (Some(left_val), Some(right_val)) = (
650                        self.data_frame.get_mut((left_index, *left_col_idx)),
651                        right.data_frame.get((right_row_idx, *right_col_idx)),
652                    ) {
653                        *left_val = right_val.clone();
654                    }
655                }
656            }
657        }
658
659        let elapsed = timer.elapsed();
660        tracing::debug!(
661            "Filled {} rows in {}ms|{}s",
662            joined_idx_len,
663            elapsed.as_millis(),
664            elapsed.as_secs()
665        );
666
667        Ok(())
668    }
669
670    /// Adds the single column to the current [`ColumnFrame`]
671    /// If the column is already present, an error is returned [`Error::ColumnAlreadyExists`]
672    /// If the length of the column is different from the current data frame, an error is returned [`Error::DataSetSizeDoesntMatch`]
673    pub fn add_single_column<K: Into<Key>>(
674        &mut self,
675        key: K,
676        column: Array1<DataValue>,
677    ) -> Result<(), Error> {
678        let key = key.into();
679        if self.index.get_column_index(&key).is_some() {
680            return Err(Error::ColumnAlreadyExists(key));
681        }
682        if self.len() != column.len() && !self.is_empty() {
683            return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
684        }
685
686        self.index.store_key(key.clone());
687        let rows = column.len();
688        let column_index = self
689            .index
690            .get_column_index(&key)
691            .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
692        if self.is_empty() && self.index.len() == 1 {
693            self.data_frame = column.into_shape_clone((rows, 1))?;
694            assert_eq!(self.data_frame.column(column_index).len(), rows);
695        } else if self.is_empty() {
696            self.data_frame = Array2::default((column.len(), self.index.len() - 1));
697            self.data_frame.push_column(column.view())?;
698            assert_eq!(self.data_frame.column(column_index).len(), rows);
699        } else {
700            self.data_frame.push_column(column.view())?;
701        }
702        assert_eq!(self.data_frame.column(column_index).len(), rows);
703
704        Ok(())
705    }
706    /// Adds the columns from the other [`ColumnFrame`] to the current [`ColumnFrame`]
707    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
708    /// If the other [`KeyIndex`] is empty, nothing happens
709    pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
710        if self.check_or_init_frame(&other)?.should_end() {
711            return Ok(());
712        }
713
714        self.extend_columns_from_other(&other)?;
715        for (idx, key) in other.index.get_keys().iter().enumerate() {
716            if let Some(index) = self.index.get_column_index(key) {
717                trace!("Other array = {:?}", other.data_frame.dim());
718                if other.data_frame.dim() == (0, 0) {
719                    self.data_frame.column_mut(index).fill(DataValue::Null);
720                    continue;
721                }
722                let arr = other.data_frame.column(idx);
723                trace!(
724                    "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
725                    self.data_frame.nrows(),
726                    arr.len()
727                );
728                if arr.len() != self.data_frame.nrows() {
729                    self.data_frame.column_mut(index).fill(DataValue::Null);
730                } else {
731                    self.data_frame.column_mut(index).assign(&arr);
732                }
733            }
734        }
735        Ok(())
736    }
737
738    /// Broadcasts the data from the other [`ColumnFrame`] to the current [`ColumnFrame`]
739    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
740    /// If the other [`KeyIndex`] is empty, nothing happens
741    /// If the length (number of rows) of the other data frame is greater then 1 an error is returned [`Error::CannotBroadcast`]
742    pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
743        if self.check_or_init_frame(&other)?.should_end() {
744            return Ok(());
745        }
746        if other.data_frame.nrows() != 1 {
747            return Err(Error::CannotBroadcast);
748        }
749
750        // Get all keys (current + new from other)
751        let all_keys = self.index.get_keys().to_vec();
752        let other_keys: Vec<_> = other
753            .index
754            .get_keys()
755            .iter()
756            .filter(|k| self.index.get_column_index(k).is_none())
757            .cloned()
758            .collect();
759
760        // Add new keys to index
761        for key in &other_keys {
762            self.index.store_key(key.clone());
763        }
764
765        let nrows = self.len();
766        let ncols = self.index.len();
767        let ncols_old = all_keys.len();
768
769        // Pre-allocate result buffer
770        let mut data = Vec::with_capacity(nrows * ncols);
771
772        // Build result row by row - single pass allocation
773        for row_idx in 0..nrows {
774            // Copy existing columns
775            for col_idx in 0..ncols_old {
776                data.push(self.data_frame[(row_idx, col_idx)].clone());
777            }
778            // Broadcast values from other dataframe (only 1 row in other)
779            for key in &other_keys {
780                if let Some(other_idx) = other.index.get_column_index(key) {
781                    data.push(other.data_frame[(0, other_idx)].clone());
782                }
783            }
784        }
785
786        self.data_frame = Array2::from_shape_vec((nrows, ncols), data)
787            .map_err(|e| Error::UnknownError(format!("Broadcast reshape failed: {e}")))?;
788
789        Ok(())
790    }
791
792    /// Computes the Cartesian product of the input structures,
793    /// resulting in all possible combinations of elements.
794    /// The data is stored in the row-major order
795    /// The keys are stored in the order they are added - the order is preserved - new keys from the `other` [`ColumnFrame`] are added to the end
796    pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
797        if self.check_or_init_frame(&other)?.should_end() {
798            return Ok(());
799        }
800        // extend the columns
801        // let mut new_index = self.index.clone();
802        for other_key in other.keys() {
803            if self.index.get_column_index(other_key).is_none() {
804                self.index.store_key(other_key.clone());
805            } else {
806                self.index.store_key(Key::new(
807                    format!("{}-{}", other_key, other_key.id()).as_str(),
808                    other_key.ctype,
809                ));
810            }
811        }
812        let max_rows = self.len() * other.len();
813        let ncols = self.index.len();
814        // create new data frame
815        let mut new_df = Array2::default((max_rows, ncols));
816
817        let mut cur_idx = 0;
818        for cur_row in self.data_frame.rows() {
819            for other_row in other.data_frame.rows() {
820                new_df
821                    .slice_mut(s![cur_idx, ..])
822                    .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
823                cur_idx += 1;
824            }
825        }
826        self.data_frame = new_df;
827        Ok(())
828    }
829
830    /// Joins the candidates with the other candidates by the [`JoinRelation`] policy.
831    /// For [`JoinBy::AddColumns`] the columns are added to the existing structure via [`Self::add_columns`]
832    /// For [`JoinBy::Replace`] the columns are replaced with the new columns
833    /// For [`JoinBy::Extend`] the candidates are extended via [`Self::extend`]
834    /// For [`JoinBy::Broadcast`] each candidate is extended with the values of the other candidates `Self::broadcast`
835    /// For [`JoinBy::CartesianProduct`] the candidates are multiplied by the other candidates
836    /// For [`JoinBy::JoinById`] the candidates are joined by the keys in the `JoinRelation::JoinById` struct see [`Self::join_by_id_inner`]
837    pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
838        use JoinBy::*;
839        match &join_type.join_type {
840            AddColumns => self.add_columns(right),
841            Replace => self.replace(right),
842            Extend => self.extend(right),
843            Broadcast => self.broadcast(right),
844            CartesianProduct => self.cartesian_product(right),
845            JoinById(join) => self.join_by_id_inner(right, &join.keys),
846        }
847    }
848
849    pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
850        self.index
851            .get_column_index(key)
852            .map(|x| self.data_frame.column(x))
853    }
854
855    pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
856        let index = self
857            .index
858            .get_column_index(key)
859            .ok_or(Error::NotFound(key.clone()))?;
860        let column = self.data_frame.column(index);
861        let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
862        tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
863        data_with_index.sort_by(
864            |(a_idx, a_val), (b_idx, b_val)| match a_val.partial_cmp(b_val) {
865                Some(ordering) => ordering.then_with(|| a_idx.cmp(b_idx)),
866                None => {
867                    let a_null = matches!(a_val, DataValue::Null);
868                    let b_null = matches!(b_val, DataValue::Null);
869                    match (a_null, b_null) {
870                        (true, true) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
871                        (true, false) => std::cmp::Ordering::Greater.then_with(|| a_idx.cmp(b_idx)),
872                        (false, true) => std::cmp::Ordering::Less.then_with(|| a_idx.cmp(b_idx)),
873                        (false, false) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
874                    }
875                }
876            },
877        );
878
879        tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
880        let indicies = data_with_index
881            .into_iter()
882            .map(|(idx, _)| idx)
883            .collect::<Vec<_>>();
884
885        Ok(sorted_df::SortedDataFrame::new(self, indicies))
886    }
887
888    pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
889        let mut final_indices = Vec::new();
890        let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
891        for rule in &filter.rules {
892            final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
893        }
894
895        final_indices.sort_unstable();
896        final_indices.dedup();
897        let mut new_df = ColumnFrame::new(
898            self.index.clone(),
899            Array2::default((final_indices.len(), self.index.len())),
900        );
901        final_indices
902            .iter()
903            .enumerate()
904            .for_each(|(cur_idx, row_idx)| {
905                new_df
906                    .data_frame
907                    .slice_mut(s![cur_idx, ..])
908                    .assign(&self.data_frame.slice(s![*row_idx, ..]));
909            });
910
911        Ok(new_df)
912    }
913}
914
915pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
916    let width = source.len();
917    let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
918    let height = flattened.len() / width;
919    Ok(flattened.into_shape_with_order((width, height))?)
920}
921#[macro_export]
922macro_rules! df {
923    ($($everything:tt)*) => {
924        $crate::DataFrame::new($crate::column_frame!($($everything)*))
925    };
926}
927
928#[macro_export]
929macro_rules! column_frame {
930    // case { "a" => 1, }
931    ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
932    // case { "a" => vec![1, 2, 3] }
933    ($($key:expr => vec![$($value:expr),*]),*) => {
934        $crate::column_frame!($($key => [$($value),*]),*)
935    };
936    // case { "a" => [1, 2, 3] }
937    ($($key:expr => [$($value:expr),*]),*) => {
938        {
939           let data = ::ndarray::arr2(&[$(
940                [$($value.into(),)*],
941            )*]);
942
943           let _keys = vec![$($key.into(),)*];
944
945            $crate::ColumnFrame::new(
946                $crate::KeyIndex::new(_keys),
947                data.reversed_axes()
948            )
949        }
950    };
951    // case { "a" => 1 }
952    ($($key:expr => $value:expr),*) => {
953        {
954           let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
955           let _keys = vec![$($key.into(),)*];
956
957            $crate::ColumnFrame::new(
958                $crate::KeyIndex::new(_keys),
959                _data,
960            )
961        }
962    };
963}
964
965#[cfg(test)]
966mod test {
967    use crate::{filter::FilterRules, JoinById};
968
969    use super::*;
970    use data_value::stdhashmap;
971    use ndarray::ArrayView;
972    use rstest::*;
973    use tracing_test::traced_test;
974
975    #[rstest]
976    #[case(
977        column_frame! {
978            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
979            "b" => [4, 5, 6],
980            "c" => [7, 8, 9]
981        },
982        column_frame! {
983            "t" => [1752001987000000u64],
984            "b" => [5],
985            "c" => [8]
986        },
987        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
988    )]
989    #[case(
990        column_frame! {
991            "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
992            "b" => [4, 5, 6],
993            "c" => [7, 8, 9]
994        },
995        column_frame! {
996            "t" => [1752001987000000f64],
997            "b" => [5],
998            "c" => [8]
999        },
1000        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1001    )]
1002    #[case(
1003        column_frame! {
1004            "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
1005            "b" => [4, 5, 6],
1006            "c" => [7, 8, 9]
1007        },
1008        column_frame! {
1009            "t" => [1752001987000000i64],
1010            "b" => [5],
1011            "c" => [8]
1012        },
1013        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1014    )]
1015    #[case(
1016        column_frame! {
1017            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1018            "b" => [4, 5, 6],
1019            "c" => [7, 8, 9]
1020        },
1021        column_frame! {
1022            "t" => [1751001987000000u64],
1023            "b" => [4],
1024            "c" => [7]
1025        },
1026        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1027    )]
1028    #[case(
1029        column_frame! {
1030            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1031            "b" => [4, 5, 6],
1032            "c" => [7, 8, 9]
1033        },
1034        column_frame! {
1035            "t" => ["2025-07-08 18:13:07"],
1036            "b" => [4],
1037            "c" => [7]
1038        },
1039        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1040    )]
1041    #[case(
1042        column_frame! {
1043            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1044            "b" => [4, 5, 6],
1045            "c" => [7, 8, 9]
1046        },
1047        column_frame! {
1048            "t" => [],
1049            "b" => [],
1050            "c" => []
1051        },
1052        FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
1053    )]
1054    #[case(
1055        column_frame! {
1056            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1057            "b" => [4, 5, 6],
1058            "c" => [7, 8, 9]
1059        },
1060        column_frame! {
1061            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1062            "b" => [4, 5, 6],
1063            "c" => [7, 8, 9]
1064        },
1065        FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
1066    )]
1067    #[case(
1068        column_frame! {
1069            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1070            "b" => [4, 5, 6],
1071            "c" => [7, 8, 9]
1072        },
1073        column_frame! {
1074            "t" => [DataValue::Vec(vec![])],
1075            "b" => [5],
1076            "c" => [ 8]
1077        },
1078        FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
1079    )]
1080    #[case(
1081        column_frame! {
1082            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1083            "b" => [4, 5, 6],
1084            "c" => [7, 8, 9]
1085        },
1086        column_frame! {
1087            "t" => [DataValue::Vec(vec![1.into()])],
1088            "b" => [6],
1089            "c" => [9]
1090        },
1091        FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
1092    )]
1093    #[case(
1094        column_frame! {
1095            "a" => [1, 2, 3],
1096            "b" => [4, 5, 6],
1097            "c" => [7, 8, 9]
1098        },
1099        column_frame! {
1100            "a" => [1, 2],
1101            "b" => [4, 5],
1102            "c" => [7, 8]
1103        },
1104        FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
1105    )]
1106    #[case(
1107        column_frame! {
1108            "a" => [1, 2, 3],
1109            "b" => [4, 5, 6],
1110            "c" => [7, 8, 9]
1111        },
1112        column_frame! {
1113            "a" => [2],
1114            "b" => [5],
1115            "c" => [8]
1116        },
1117        FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
1118    )]
1119    #[case(
1120        column_frame! {
1121            "a" => [1, 2, 3],
1122            "b" => [4, 5, 6],
1123            "c" => [7, 8, 9]
1124        },
1125        column_frame! {
1126            "a" => [],
1127            "b" => [],
1128            "c" => []
1129        },
1130        FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
1131    )]
1132    #[case(
1133        column_frame! {
1134            "a" => [1, 2, 3],
1135            "b" => [4, 5, 6],
1136            "c" => [7, 8, 9]
1137        },
1138        column_frame! {
1139            "a" => [1, 2],
1140            "b" => [4, 5],
1141            "c" => [7, 8]
1142        },
1143        FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1144    )]
1145    #[case(
1146        column_frame! {
1147            "a" => [1, 2, 3],
1148            "b" => [4, 5, 6],
1149            "c" => [7, 8, 9]
1150        },
1151        column_frame! {
1152            "a" => [2],
1153            "b" => [5],
1154            "c" => [8]
1155        },
1156        FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1157    )]
1158    #[case(
1159        column_frame! {
1160            "a" => ["abcd", "ab", "abcdefg"],
1161            "b" => [4, 5, 6],
1162            "c" => [7, 8, 9]
1163        },
1164        column_frame! {
1165            "a" => ["abcd","abcdefg"],
1166            "b" => [4, 6],
1167            "c" => [7, 9]
1168        },
1169        FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1170    )]
1171    #[case(
1172        column_frame! {
1173            "a" => [1, 2, 3],
1174            "b" => [4, 5, 6],
1175            "c" => [7, 8, 9]
1176        },
1177        column_frame! {
1178            "a" => [1],
1179            "b" => [4],
1180            "c" => [7]
1181        },
1182        FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1183    )]
1184    #[case(
1185        column_frame! {
1186            "a" => [1, 2, 3],
1187            "b" => [4, 5, 6],
1188            "c" => [7, 8, 9]
1189        },
1190        column_frame! {
1191            "a" => [2, 3],
1192            "b" => [5, 6],
1193            "c" => [8, 9]
1194        },
1195        FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1196    )]
1197    #[case(
1198        column_frame! {
1199            "a" => [1f64, 2f64, 3f64],
1200            "b" => [4, 5, 6],
1201            "c" => [7, 8, 9]
1202        },
1203        column_frame! {
1204            "a" => [1f64, 2f64],
1205            "b" => [4, 5],
1206            "c" => [7, 8]
1207        },
1208        FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1209    )]
1210    #[case(
1211        column_frame! {
1212            "a" => [1f64, 2f64, 3f64],
1213            "b" => [4i64, 5i64, 6i64],
1214            "c" => [7i64, 8i64, 9i64]
1215        },
1216        column_frame! {
1217            "a" => [1f64, 2f64],
1218            "b" => [4i64, 5i64],
1219            "c" => [7i64, 8i64]
1220        },
1221        FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1222    )]
1223    #[traced_test]
1224    fn filter_test(
1225        #[case] df: ColumnFrame,
1226        #[case] expected: ColumnFrame,
1227        #[case] filter: FilterRules,
1228    ) {
1229        let filtered = df.filter(&filter).expect("BUG: cannot filter");
1230        assert_eq!(filtered, expected);
1231    }
1232
1233    #[rstest]
1234    #[traced_test]
1235    fn test_macro() {
1236        let df = column_frame! {
1237            "a" => 1,
1238            "b" => 2,
1239            "c" => 3,
1240            "d" => 4,
1241        };
1242
1243        assert_eq!(df.len(), 1);
1244        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1245        let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1246            .expect("BUG: cannot create array");
1247        assert_eq!(df.select(None), f);
1248
1249        let df = column_frame! {
1250            "a" => [1, 2, 3],
1251            "b" => [4, 5, 6],
1252            "c" => [7, 8, 9]
1253        };
1254
1255        assert_eq!(df.len(), 3);
1256        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1257        let f = Array2::from_shape_vec(
1258            (3, 3),
1259            vec![
1260                1.into(),
1261                4.into(),
1262                7.into(),
1263                2.into(),
1264                5.into(),
1265                8.into(),
1266                3.into(),
1267                6.into(),
1268                9.into(),
1269            ],
1270        )
1271        .expect("BUG: cannot create array");
1272        let selected = df.select(None);
1273        trace!("{selected:?}");
1274        assert_eq!(selected, f);
1275
1276        let df1 = df! {
1277            "a" => [1, 2, 3],
1278            "b" => [4, 5, 6],
1279            "c" => [7, 8, 9]
1280        };
1281
1282        // just to make sure we've tested Display
1283        let formatted = format!("{}", df);
1284        debug!("{}", formatted);
1285
1286        assert_eq!(df1, crate::DataFrame::from(df));
1287    }
1288
1289    #[rstest]
1290    #[case(
1291        column_frame! {
1292            "a" => [1, 2, 3],
1293            "b" => [4, 5, 6],
1294            "c" => [7, 8, 9]
1295        },
1296        column_frame! {
1297            "a_new" => [1, 2, 3],
1298            "b" => [4, 5, 6],
1299            "c" => [7, 8, 9]
1300        },
1301        vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1302        vec![("a", "a_new".into())]
1303    )]
1304    #[traced_test]
1305    fn rename_test(
1306        #[case] df: ColumnFrame,
1307        #[case] expected: ColumnFrame,
1308        #[case] keys: Vec<Key>,
1309        #[case] renames: Vec<(&str, Key)>,
1310    ) {
1311        let mut df = df;
1312        for (old, new) in renames {
1313            df.rename_key(old, new).expect("BUG: cannot rename key");
1314        }
1315        assert_eq!(df, expected);
1316        assert_eq!(df.keys(), keys.as_slice());
1317    }
1318
1319    #[rstest]
1320    #[case(
1321        column_frame!("a" => [1, 2, 3]),
1322        Key::new("a", crate::DataType::I32),
1323        column_frame!("a" => [1i32, 2i32, 3i32])
1324    )]
1325    #[case(
1326        column_frame!("a" => [1, 2, 3]),
1327        Key::new("a", crate::DataType::U32),
1328        column_frame!("a" => [1u32, 2u32, 3u32])
1329    )]
1330    #[case(
1331        column_frame!("a" => [1, 2, 3]),
1332        Key::new("a", crate::DataType::I64),
1333        column_frame!("a" => [1i64, 2i64, 3i64])
1334    )]
1335    #[case(
1336        column_frame!("a" => [1, 2, 3]),
1337        Key::new("a", crate::DataType::U64),
1338        column_frame!("a" => [1u64, 2u64, 3u64])
1339    )]
1340    #[case(
1341        column_frame!("a" => [1, 2, 3]),
1342        Key::new("a", crate::DataType::F64),
1343        column_frame!("a" => [1f64, 2f64, 3f64])
1344    )]
1345    #[case(
1346        column_frame!("a" => [1, 2, 3]),
1347        Key::new("a", crate::DataType::F32),
1348        column_frame!("a" => [1f32, 2f32, 3f32])
1349    )]
1350    // #[case(
1351    //     column_frame!("a" => [1, 2, 3]),
1352    //     Key::new("a", crate::DataType::String),
1353    //     column_frame!("a" => ["1", "2", "3"])
1354    // )]
1355    fn test_try_fix_dtype(
1356        #[case] mut df: ColumnFrame,
1357        #[case] key: Key,
1358        #[case] expected: ColumnFrame,
1359    ) {
1360        assert!(df.try_fix_column_by_key(&key).is_ok());
1361        assert_eq!(
1362            df.select(Some(&[key.clone()])),
1363            expected.select(Some(&[key.clone()]))
1364        );
1365    }
1366
1367    #[fixture]
1368    fn unknown_df() -> ColumnFrame {
1369        let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1370
1371        hm.insert("a".into(), vec![1u32.into()]);
1372        hm.insert("b".into(), vec![3i64.into()]);
1373        hm.insert("c".into(), vec![1f64.into()]);
1374        hm.insert("d".into(), vec![1u64.into()]);
1375
1376        hm.into()
1377    }
1378    #[rstest]
1379    #[case(stdhashmap!(
1380        "a" => crate::DataType::U32,
1381        "b" => crate::DataType::I64,
1382        "c" => crate::DataType::F64,
1383        "d" => crate::DataType::U64)
1384    )]
1385    fn test_try_fix_dtype_unknown(
1386        mut unknown_df: ColumnFrame,
1387        #[case] dtypes: HashMap<String, crate::DataType>,
1388    ) {
1389        for dtype in dtypes.iter() {
1390            let t: &Key = unknown_df
1391                .keys()
1392                .iter()
1393                .find(|x| x.name() == dtype.0)
1394                .unwrap();
1395            assert_ne!(t.ctype, crate::DataType::Unknown);
1396        }
1397        assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1398        for dtype in dtypes.iter() {
1399            let t: &Key = unknown_df
1400                .keys()
1401                .iter()
1402                .find(|x| x.name() == dtype.0)
1403                .unwrap();
1404            assert_eq!(t.ctype, *dtype.1);
1405            assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1406        }
1407        assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1408    }
1409
1410    #[rstest]
1411    #[case(
1412        column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1413        Key::new("a", crate::DataType::F32),
1414        column_frame!("a" => [1f32, 2f32, 3f32])
1415    )]
1416    #[traced_test]
1417    fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1418        assert!(df.try_fix_dtype().is_ok());
1419        assert_eq!(
1420            df.select(Some(&[key.clone()])),
1421            expected.select(Some(&[key]))
1422        )
1423    }
1424
1425    #[rstest]
1426    #[traced_test]
1427    fn test_not_key_fix() {
1428        let mut cf = column_frame!("a" => [1]);
1429        let non_existing = Key::new("b", crate::DataType::I32);
1430        assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1431    }
1432
1433    #[rstest]
1434    #[case(
1435        column_frame! {
1436            "a" => [1, 2, 3],
1437            "b" => [4, 5, 6],
1438            "c" => [7, 8, 9]
1439        },
1440        vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1441        vec![("a", "a_alias")]
1442    )]
1443    #[traced_test]
1444    fn alias_test(
1445        #[case] df: ColumnFrame,
1446        #[case] keys: Vec<Key>,
1447        #[case] aliases: Vec<(&str, &str)>,
1448    ) {
1449        let mut df = df;
1450        for (old, new) in aliases {
1451            df.add_alias(old, new).expect("BUG: cannot rename key");
1452        }
1453        let origin_keys = df.keys().to_vec();
1454        let selected_aliases = df.select(Some(keys.as_slice()));
1455        let selected = df.select(Some(origin_keys.as_slice()));
1456        assert_eq!(selected, selected_aliases);
1457    }
1458
1459    #[rstest]
1460    #[traced_test]
1461    fn test_mut_view() {
1462        let data = vec![
1463            DataValue::from(1f64),
1464            DataValue::from(4f32),
1465            DataValue::from(2f64),
1466            DataValue::from(f32::NAN),
1467            DataValue::from(f64::NAN),
1468            DataValue::from(f32::INFINITY),
1469        ];
1470        let keys: Vec<Key> = vec!["a".into(), "b".into()];
1471
1472        let index = KeyIndex::new(keys.clone());
1473        let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1474        let mut df = ColumnFrame::new(index.clone(), df);
1475        df.get_mut_view().mapv_inplace(|x| match x {
1476            DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1477            DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1478            e => e,
1479        });
1480        let data = vec![
1481            DataValue::from(1f64),
1482            DataValue::from(4f32),
1483            DataValue::from(2f64),
1484            DataValue::from(0f32),
1485            DataValue::from(0f64),
1486            DataValue::from(0f32),
1487        ];
1488        let expected = ColumnFrame::new(
1489            index,
1490            Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1491        );
1492        assert_eq!(df, expected);
1493    }
1494
1495    #[rstest]
1496    #[traced_test]
1497    fn dummy_test() {
1498        let data = vec![
1499            DataValue::U32(1),
1500            DataValue::I32(2),
1501            DataValue::I64(3),
1502            DataValue::U64(4),
1503        ];
1504
1505        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1506
1507        let index = KeyIndex::new(keys.clone());
1508        let mut data_frame = Array2::default((1, keys.len()));
1509        for (idx, entry) in data.iter().enumerate() {
1510            data_frame
1511                .column_mut(idx)
1512                .assign(&ArrayView::from(&[entry.clone()]));
1513        }
1514
1515        let frame = ColumnFrame::new(index, data_frame);
1516        assert_eq!(
1517            frame.get_by_row_index(&"a".into(), 0),
1518            Some(&DataValue::U32(1))
1519        );
1520        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1521        assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1522        assert_eq!(
1523            frame.select(Some(&["a".into(), "b".into()])),
1524            Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1525                .expect("BUG: cannot create array")
1526        );
1527    }
1528
1529    #[rstest]
1530    #[traced_test]
1531    fn dummy_test_multiple_rows() {
1532        let data = vec![
1533            DataValue::U32(1),
1534            DataValue::I32(2),
1535            DataValue::I64(3),
1536            DataValue::U64(4),
1537            DataValue::U32(12),
1538            DataValue::I32(22),
1539            DataValue::I64(32),
1540            DataValue::U64(42),
1541        ];
1542
1543        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1544
1545        let index = KeyIndex::new(keys.clone());
1546        let data_frame =
1547            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1548
1549        let frame = ColumnFrame::new(index, data_frame);
1550        assert_eq!(
1551            frame.get_by_row_index(&"a".into(), 0),
1552            Some(&DataValue::U32(1))
1553        );
1554        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1555        assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1556        let arr = Array2::from_shape_vec(
1557            (2, 2),
1558            vec![
1559                DataValue::U32(1),
1560                DataValue::I32(2),
1561                DataValue::U32(12),
1562                DataValue::I32(22),
1563            ],
1564        )
1565        .expect("BUG: cannot create array");
1566        trace!("{arr:?}");
1567        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1568    }
1569
1570    #[rstest]
1571    #[traced_test]
1572    fn dummy_test_multiple_rows_push() {
1573        let data = vec![
1574            DataValue::U32(1),
1575            DataValue::I32(2),
1576            DataValue::I64(3),
1577            DataValue::U64(4),
1578            DataValue::U32(12),
1579            DataValue::I32(22),
1580            DataValue::I64(32),
1581            DataValue::U64(42),
1582        ];
1583        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1584
1585        let index = KeyIndex::new(keys.clone());
1586        let data_frame =
1587            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1588
1589        let mut frame = ColumnFrame::new(index, data_frame);
1590        assert!(frame
1591            .push(data_value::stdhashmap!(
1592                "a" => DataValue::U32(2),
1593                "b" => DataValue::I32(3),
1594                "c" => DataValue::I64(4),
1595                "d" => DataValue::U64(5)
1596            ))
1597            .is_ok());
1598        let arr = Array2::from_shape_vec(
1599            (3, 2),
1600            vec![
1601                DataValue::U32(1),
1602                DataValue::I32(2),
1603                DataValue::U32(12),
1604                DataValue::I32(22),
1605                DataValue::U32(2),
1606                DataValue::I32(3),
1607            ],
1608        )
1609        .expect("BUG: cannot create array");
1610        trace!("{arr:?}");
1611        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1612        let result = frame.push(data_value::stdhashmap!(
1613            "a" => DataValue::U32(34),
1614            "b" => DataValue::I32(44),
1615            "c" => DataValue::I64(54),
1616            "e" => DataValue::F32(6f32)
1617        ));
1618        assert!(result.is_ok(), "{result:?}");
1619        let arr = Array2::from_shape_vec(
1620            (4, 2),
1621            vec![
1622                DataValue::U64(4),
1623                DataValue::Null,
1624                DataValue::U64(42),
1625                DataValue::Null,
1626                DataValue::U64(5),
1627                DataValue::Null,
1628                DataValue::Null,
1629                DataValue::F32(6f32),
1630            ],
1631        )
1632        .expect("BUG: cannot create array");
1633        trace!("{arr:?}");
1634        assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1635    }
1636
1637    #[rstest]
1638    #[case(
1639        column_frame! {
1640            "group_id" => vec![1, 2],
1641            "feed_tag" => vec![3, 4]
1642        },
1643        Some(vec![Key::from("group_id")]),
1644        ndarray::array!([1.into()], [2.into()])
1645    )]
1646    #[case(
1647        column_frame! {
1648            "group_id" => vec![1, 2],
1649            "feed_tag" => vec![3, 4]
1650        },
1651        Some(vec!["group_id".into(), "feed_tag".into()]),
1652        ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1653    )]
1654    #[case(
1655        column_frame! {
1656            "group_id" => vec![1, 2],
1657            "feed_tag" => vec![3, DataValue::Null]
1658        },
1659        Some(vec!["feed_tag".into()]),
1660        ndarray::array![[3.into()], [DataValue::Null]]
1661    )]
1662    #[case(
1663        column_frame! {
1664            "group_id" => vec![1, 2],
1665            "feed_tag" => vec![1, DataValue::Null]
1666        },
1667        Some(vec!["feed_tag2".into()]),
1668       Array2::<DataValue>::default((0, 0))
1669    )]
1670    #[traced_test]
1671    fn test_select(
1672        #[case] input: ColumnFrame,
1673        #[case] keys: Option<Vec<Key>>,
1674        #[case] expected: Array2<DataValue>,
1675    ) {
1676        trace!("input={input:?}");
1677        let keys_slice = keys.as_deref();
1678        let selected = input.select(keys_slice);
1679        trace!("selected={selected:?}");
1680        assert_eq!(selected, expected);
1681        let selected = input.select_transposed(keys_slice);
1682        trace!("selected_transposed={selected:?}");
1683        assert!(selected.is_ok());
1684        assert_eq!(selected.unwrap(), expected.t());
1685    }
1686
1687    #[rstest]
1688    #[case(
1689        column_frame! {
1690            "group_id" => vec![1, 2],
1691            "feed_tag" => vec![3, 4]
1692        },
1693        Key::from("group_id"),
1694        Some(ndarray::array!(1.into(), 2.into()))
1695    )]
1696    #[case(
1697        column_frame! {
1698            "group_id" => vec![1, 2, 5, 6],
1699            "feed_tag" => vec![3, 4, 7, 8]
1700        },
1701        Key::from("group_id"),
1702        Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1703    )]
1704    #[case(
1705        column_frame! {
1706            "group_id" => vec![1, 2],
1707            "feed_tag" => vec![1, 1]
1708        },
1709        Key::from("feed_tag1"),
1710        None
1711    )]
1712    #[traced_test]
1713    fn test_select_column(
1714        #[case] input: ColumnFrame,
1715        #[case] key: Key,
1716        #[case] expected: Option<Array1<DataValue>>,
1717    ) {
1718        let selected = input.select_column(&key);
1719        trace!("selected={selected:?}");
1720        match expected {
1721            Some(expected) => {
1722                assert!(selected.is_some());
1723                assert_eq!(selected.expect("BUG: checked above"), expected);
1724            }
1725            None => assert!(selected.is_none()),
1726        }
1727    }
1728
1729    #[test]
1730    #[traced_test]
1731    fn empty_join_test() {
1732        let join = JoinRelation::add_columns();
1733        let mut column_frame = ColumnFrame::default();
1734        column_frame
1735            .add_single_column("group_id", Array1::from_vec(vec![]))
1736            .expect("BUG: cannot add column");
1737        let column_frame2 = column_frame! {
1738            "group_id" => vec![2, 1, 3],
1739            "feed_tag" => vec![1, 1, 1],
1740            "clicks" => vec![100, 10, 10],
1741            "imps" => vec![1000, 200, 200]
1742        };
1743        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1744
1745        let joined = column_frame.join(column_frame2, &join);
1746        assert!(joined.is_ok(), "{joined:?}");
1747
1748        trace!("{column_frame:?}");
1749        assert_eq!(
1750            column_frame.select(Some(&[
1751                "group_id".into(),
1752                "feed_tag".into(),
1753                "clicks".into(),
1754                "imps".into()
1755            ])),
1756            ndarray::array!(
1757                [2.into(), 1.into(), 100.into(), 1000.into()],
1758                [1.into(), 1.into(), 10.into(), 200.into()],
1759                [3.into(), 1.into(), 10.into(), 200.into()],
1760            )
1761        );
1762
1763        let mut column_frame2 = column_frame! {
1764            "feed_tag" => vec![1, 1, 1],
1765            "clicks" => vec![100, 10, 10],
1766            "imps" => vec![1000, 200, 200]
1767        };
1768        let mut column_frame = ColumnFrame::default();
1769        column_frame
1770            .add_single_column("group_id", Array1::from_vec(vec![]))
1771            .expect("BUG: cannot add column");
1772        let joined = column_frame2.join(column_frame, &join);
1773        assert!(joined.is_ok(), "{joined:?}");
1774
1775        trace!("{column_frame2:?}");
1776        assert_eq!(
1777            column_frame2.select(Some(&[
1778                "group_id".into(),
1779                "feed_tag".into(),
1780                "clicks".into(),
1781                "imps".into()
1782            ])),
1783            ndarray::array!(
1784                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1785                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1786                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1787            )
1788        );
1789
1790        let mut column_frame = ColumnFrame::default();
1791        column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1792        let joined = column_frame2.join(column_frame, &join);
1793        assert!(joined.is_ok(), "{joined:?}");
1794
1795        trace!("{column_frame2:?}");
1796        assert_eq!(
1797            column_frame2.select(Some(&[
1798                "group_id2".into(),
1799                "feed_tag".into(),
1800                "clicks".into(),
1801                "imps".into()
1802            ])),
1803            ndarray::array!(
1804                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1805                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1806                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1807            )
1808        );
1809    }
1810
1811    #[test]
1812    #[traced_test]
1813    fn join_test() {
1814        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1815            "group_id".into(),
1816            "feed_tag".into(),
1817        ])));
1818        let mut column_frame = column_frame! {
1819            "group_id" => vec![1, 2, 8],
1820            "feed_tag" => vec![1, 1, 10]
1821        };
1822        let column_frame2 = column_frame! {
1823            "group_id" => vec![2, 1, 3],
1824            "feed_tag" => vec![1, 1, 1],
1825            "clicks" => vec![100, 10, 10],
1826            "imps" => vec![1000, 200, 200]
1827        };
1828        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1829
1830        let joined = column_frame.join(column_frame2, &join);
1831        assert!(joined.is_ok(), "{joined:?}");
1832
1833        trace!("{column_frame:?}");
1834        assert_eq!(
1835            column_frame.select(Some(&[
1836                "group_id".into(),
1837                "feed_tag".into(),
1838                "clicks".into(),
1839                "imps".into()
1840            ])),
1841            ndarray::array!(
1842                [1.into(), 1.into(), 10.into(), 200.into()],
1843                [2.into(), 1.into(), 100.into(), 1000.into()],
1844                [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1845            )
1846        )
1847    }
1848
1849    #[test]
1850    #[traced_test]
1851    fn join_test_with_additional() {
1852        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1853            "group_id".into(),
1854            "feed_tag".into(),
1855        ])));
1856        let mut column_frame = column_frame! {
1857            "group_id" => vec![1, 2, 8],
1858            "feed_tag" => vec![1, 1, 10],
1859            "clicked" => vec![0, 0, 1]
1860        };
1861        let column_frame2 = column_frame! {
1862            "group_id" => vec![2, 1, 3],
1863            "feed_tag" => vec![1, 1, 1],
1864            "clicks" => vec![100, 10, 10],
1865            "imps" => vec![1000, 200, 200]
1866        };
1867        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1868
1869        let joined = column_frame.join(column_frame2, &join);
1870        assert!(joined.is_ok(), "{joined:?}");
1871
1872        trace!("{column_frame:?}");
1873        assert_eq!(
1874            column_frame.select(Some(&[
1875                "group_id".into(),
1876                "feed_tag".into(),
1877                "clicks".into(),
1878                "imps".into(),
1879                "clicked".into()
1880            ])),
1881            ndarray::array!(
1882                [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1883                [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1884                [
1885                    8.into(),
1886                    10.into(),
1887                    DataValue::Null,
1888                    DataValue::Null,
1889                    1.into()
1890                ]
1891            )
1892        )
1893    }
1894
1895    #[test]
1896    #[traced_test]
1897    fn join_test_with_additional_single() {
1898        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1899            "group_id".into(),
1900            "feed_tag".into(),
1901        ])));
1902        let mut column_frame = column_frame! {
1903            "group_id" => vec![1, 2, 8],
1904            "feed_tag" => vec![1, 1, 10],
1905            "clicked" => vec![0, 0, 1]
1906        };
1907        let column_frame2 = column_frame! {
1908            "a" => vec![1],
1909            "group_id" => vec![2],
1910            "feed_tag" => vec![1],
1911            "clicks" => vec![10],
1912            "imps" => vec![200]
1913        };
1914        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1915
1916        let joined = column_frame.join(column_frame2, &join);
1917        assert!(joined.is_ok(), "{joined:?}");
1918
1919        trace!("{column_frame:?}");
1920        assert_eq!(
1921            column_frame.select(Some(&[
1922                "group_id".into(),
1923                "feed_tag".into(),
1924                "clicks".into(),
1925                "imps".into(),
1926                "clicked".into()
1927            ])),
1928            ndarray::array!(
1929                [
1930                    1.into(),
1931                    1.into(),
1932                    DataValue::Null,
1933                    DataValue::Null,
1934                    0.into(),
1935                ],
1936                [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1937                [
1938                    8.into(),
1939                    10.into(),
1940                    DataValue::Null,
1941                    DataValue::Null,
1942                    1.into()
1943                ]
1944            )
1945        )
1946    }
1947
1948    #[rstest]
1949    #[traced_test]
1950    fn cartesian_product_join() {
1951        let mut df = column_frame! {
1952            "group_id" => vec![1, 2, 3],
1953            "feed_tag" => vec![1, 2, 3]
1954        };
1955        let df2 = column_frame! {
1956            "zone_id" => vec![111111, 111133],
1957            "zone_avg_ctr" => vec![0.1, 0.001]
1958        };
1959        assert!(df
1960            .join(
1961                ColumnFrame::default(),
1962                &JoinRelation::new(JoinBy::CartesianProduct)
1963            )
1964            .is_ok());
1965        let join = JoinRelation::new(JoinBy::CartesianProduct);
1966        let result = df.join(df2, &join);
1967        assert!(result.is_ok(), "{result:?}");
1968        let selected = df.select(None);
1969        trace!("{selected:?}");
1970        assert_eq!(
1971            selected,
1972            ndarray::array!(
1973                [1.into(), 1.into(), 111111.into(), 0.1.into()],
1974                [1.into(), 1.into(), 111133.into(), 0.001.into()],
1975                [2.into(), 2.into(), 111111.into(), 0.1.into()],
1976                [2.into(), 2.into(), 111133.into(), 0.001.into()],
1977                [3.into(), 3.into(), 111111.into(), 0.1.into()],
1978                [3.into(), 3.into(), 111133.into(), 0.001.into()],
1979            )
1980        );
1981
1982        let df2 = column_frame! {
1983            "zone_id" => vec![111]
1984        };
1985        let result = df.join(df2, &join);
1986        assert!(result.is_ok(), "{result:?}");
1987        let selected = df.select(None);
1988        trace!("{selected:?}");
1989        assert_eq!(
1990            selected,
1991            ndarray::array!(
1992                [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
1993                [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
1994                [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
1995                [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
1996                [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
1997                [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
1998            )
1999        );
2000    }
2001
2002    #[rstest]
2003    #[traced_test]
2004    fn broadcast_join() {
2005        let mut df = column_frame! {
2006            "group_id" => vec![1, 2, 3],
2007            "feed_tag" => vec![1, 2, 3]
2008        };
2009        let df2 = column_frame! {
2010            "zone_id" => vec![111111]
2011        };
2012        assert!(df
2013            .join(
2014                ColumnFrame::default(),
2015                &JoinRelation::new(JoinBy::Broadcast)
2016            )
2017            .is_ok());
2018        let join = JoinRelation::new(JoinBy::Broadcast);
2019        assert!(df.join(df2, &join).is_ok());
2020        let selected = df.select(None);
2021        trace!("{selected:?}");
2022        assert_eq!(
2023            selected,
2024            ndarray::array!(
2025                [1.into(), 1.into(), 111111.into()],
2026                [2.into(), 2.into(), 111111.into()],
2027                [3.into(), 3.into(), 111111.into()]
2028            )
2029        );
2030    }
2031    #[rstest]
2032    #[traced_test]
2033    fn merge_test() {
2034        let mut df = column_frame! {
2035            "group_id" => vec![1, 2, 3],
2036            "feed_tag" => vec![1, 2, 3]
2037        };
2038        let df2 = column_frame! {
2039            "group_id" => vec![11, 21, 31],
2040            "feed_tag" => vec![12, 22, 32]
2041        };
2042
2043        let join = JoinRelation::new(JoinBy::Replace);
2044        assert!(df.join(df2, &join).is_ok());
2045        let selected = df.select(None);
2046        trace!("{selected:?}");
2047        assert_eq!(
2048            selected,
2049            ndarray::array!(
2050                [11.into(), 12.into()],
2051                [21.into(), 22.into()],
2052                [31.into(), 32.into()]
2053            )
2054        );
2055    }
2056
2057    #[rstest]
2058    #[traced_test]
2059    fn extend_test() {
2060        let mut df = column_frame! {
2061            "group_id" => vec![1, 2, 3],
2062            "feed_tag" => vec![1, 2, 3]
2063        };
2064        let df2 = column_frame! {
2065            "group_id" => vec![11, 21, 31],
2066            "feed_tag" => vec![5, 6, 7]
2067        };
2068        assert!(df
2069            .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
2070            .is_ok());
2071
2072        let join = JoinRelation::new(JoinBy::Extend);
2073        assert!(df.join(df2, &join).is_ok());
2074        let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
2075        trace!("{selected:?}");
2076        assert_eq!(
2077            selected,
2078            ndarray::array!(
2079                [1.into(), 1.into()],
2080                [2.into(), 2.into()],
2081                [3.into(), 3.into()],
2082                [5.into(), 11.into()],
2083                [6.into(), 21.into()],
2084                [7.into(), 31.into()]
2085            )
2086        );
2087        let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
2088        trace!("{as_map:?}");
2089        assert_eq!(
2090            as_map,
2091            stdhashmap!(
2092                "feed_tag" => vec![1, 2, 3, 5, 6, 7],
2093                "group_id" => vec![1, 2, 3, 11, 21, 31]
2094            )
2095        );
2096
2097        let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
2098        trace!("{as_map:?}");
2099        assert_eq!(as_map, HashMap::default());
2100    }
2101
2102    #[rstest]
2103    #[traced_test]
2104    fn extend_test_with_non_existing_cols() {
2105        let mut df = column_frame! {
2106            "group_id" => vec![1, 2, 3],
2107            "feed_tag" => vec![1, 2, 3]
2108        };
2109        let mut df2 = column_frame! {
2110            "group_id" => vec![11, 21, 31],
2111            "feed_tag" => vec![5, 6, 7],
2112            "clicks" => vec![100, 200, 300],
2113            "impressions" => vec![1000, 2000, 3000]
2114        };
2115        let df_bckp = df.clone();
2116        let join = JoinRelation::new(JoinBy::Extend);
2117        assert!(df.join(df2.clone(), &join).is_ok());
2118        let selected = df.select(None);
2119        trace!("{selected:?}");
2120        assert_eq!(
2121            selected,
2122            ndarray::array!(
2123                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2124                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2125                [3.into(), 3.into(), DataValue::Null, DataValue::Null],
2126                [11.into(), 5.into(), 100.into(), 1000.into()],
2127                [21.into(), 6.into(), 200.into(), 2000.into()],
2128                [31.into(), 7.into(), 300.into(), 3000.into()]
2129            )
2130        );
2131        let join = JoinRelation::new(JoinBy::Extend);
2132        let r = df2.join(df_bckp, &join);
2133        assert!(r.is_ok(), "{r:?}");
2134        let selected = df2.select(None);
2135        trace!("{selected:?}");
2136        assert_eq!(
2137            selected,
2138            ndarray::array!(
2139                [11.into(), 5.into(), 100.into(), 1000.into()],
2140                [21.into(), 6.into(), 200.into(), 2000.into()],
2141                [31.into(), 7.into(), 300.into(), 3000.into()],
2142                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2143                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2144                [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2145            )
2146        );
2147    }
2148
2149    #[rstest]
2150    #[traced_test]
2151    fn extend_test_with_non_existing_cols_wrong_order() {
2152        let mut df = column_frame! {
2153            "group_id" => vec![1, 2, 3],
2154            "feed_tag" => vec![1, 2, 3]
2155        };
2156        let df2 = column_frame! {
2157            "feed_tag" => vec![5, 6, 7],
2158            "group_id" => vec![11, 21, 31]
2159        };
2160        let join = JoinRelation::new(JoinBy::Extend);
2161        let err = df.join(df2, &join);
2162        assert!(err.is_ok(), "{err:?}");
2163    }
2164
2165    #[rstest]
2166    #[traced_test]
2167    fn test_replace_not_compatible() {
2168        let mut df = column_frame! {
2169            "group_id" => vec![1, 2, 3],
2170            "feed_tag" => vec![1, 2, 3]
2171        };
2172        let df2 = column_frame! {
2173            "feed_tag" => vec![5, 6],
2174            "group_id" => vec![11, 21]
2175        };
2176        let join = JoinRelation::new(JoinBy::Replace);
2177        let err = df.join(df2, &join);
2178        assert!(err.is_err(), "{err:?}");
2179        let empty = ColumnFrame::default();
2180        let err = df.join(empty, &join);
2181        assert!(err.is_ok(), "{err:?}");
2182    }
2183
2184    #[rstest]
2185    #[traced_test]
2186    fn test_different_data() {
2187        let mut df = column_frame! {
2188            "group_id" => vec![1, 2, 3],
2189            "feed_tag" => vec![1, 2, 3]
2190        };
2191        let df2 = column_frame! {
2192            "group_id" => vec![11, 21],
2193            "a" => vec![5, 6]
2194        };
2195        let join = JoinRelation::new(JoinBy::Extend);
2196        let err = df.join(df2, &join);
2197        assert!(err.is_ok(), "{err:?}");
2198        println!("{df:?}");
2199        let expected_df = ColumnFrame::new(
2200            KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2201            ndarray::array!(
2202                [1.into(), 1.into(), DataValue::Null],
2203                [2.into(), 2.into(), DataValue::Null],
2204                [3.into(), 3.into(), DataValue::Null],
2205                [11.into(), DataValue::Null, 5.into()],
2206                [21.into(), DataValue::Null, 6.into()]
2207            ),
2208        );
2209        assert_eq!(df, expected_df)
2210    }
2211
2212    #[rstest]
2213    #[traced_test]
2214    fn serde_column_frame() {
2215        let df = column_frame! {
2216            "group_id" => vec![1u64, 2u64, 3u64],
2217            "feed_tag" => vec![1u64, 2u64, 3u64]
2218        };
2219        let key_idx = df.index.clone();
2220        let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2221        let deserialized: KeyIndex =
2222            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2223        assert_eq!(key_idx, deserialized);
2224        assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2225        let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2226        let deserialized: ColumnFrame =
2227            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2228        assert_eq!(df, deserialized);
2229    }
2230
2231    #[rstest]
2232    #[traced_test]
2233    fn update_value() {
2234        let mut df = column_frame! {
2235            "group_id" => vec![1, 2, 3],
2236            "feed_tag" => vec![1, 2, 3]
2237        };
2238        let group_id: Key = "group_id".into();
2239        let v = df.get_mut_by_row_index(&group_id, 1);
2240        assert!(v.is_some());
2241        let v = v.unwrap();
2242        assert_eq!(v, &DataValue::I32(2));
2243        *v = DataValue::U64(22);
2244        let v = df.get_by_row_index(&group_id, 1);
2245        assert!(v.is_some());
2246        let v = v.unwrap();
2247        assert_eq!(v, &DataValue::U64(22));
2248
2249        assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2250    }
2251}