trs_dataframe/dataframe/
column_store.rs

1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16/// [`ColumnFrame`] is used to store the data for the candidates
17/// The data is stored in the [`Array2`] with the [`DataValue`] values
18/// The data is stored in the columns and the columns are indexed by the [`KeyIndex`]
19/// The [`KeyIndex`] is used to access the data by the column [`Key`]
20/// Memory layout is same like in [`ndarray`] - the data is stored in the row-major order
21#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
22pub struct ColumnFrame {
23    pub index: KeyIndex,
24    pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28    Continue,
29    End,
30}
31
32impl Continue {
33    pub fn should_end(&self) -> bool {
34        matches!(self, Self::End)
35    }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        // Display keys and and indices
43        write!(f, "\n|")?;
44
45        for key in &self.index.keys {
46            write!(f, " {key} |")?;
47        }
48
49        if self.index.is_empty() {
50            writeln!(f, "|")?;
51        }
52
53        // Display type for each key
54        if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55            write!(f, "\n|")?;
56            for value in row.iter() {
57                // Display types during first iteration
58                write!(f, " {:10?} |", crate::detect_dtype(value))?;
59            }
60            writeln!(f)?;
61        }
62
63        writeln!(f, "---")?;
64
65        // Display items, limit output to 256 rows
66        for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67            write!(f, "|")?;
68
69            for value in row.iter() {
70                // Display types during first iteration
71                write!(f, " {value} |")?;
72            }
73            writeln!(f)?;
74
75            if n >= 256 {
76                writeln!(f, "... (dataframe is too long)")?;
77                break;
78            }
79        }
80
81        writeln!(f, "---")
82    }
83}
84
85impl ColumnFrame {
86    pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
87        Self {
88            index: index.into(),
89            data_frame,
90        }
91    }
92
93    pub fn keys(&self) -> &[Key] {
94        self.index.get_keys()
95    }
96
97    pub fn len(&self) -> usize {
98        self.data_frame.nrows()
99    }
100
101    pub fn is_empty(&self) -> bool {
102        self.data_frame.nrows() == 0
103    }
104
105    pub fn shrink(&mut self) {
106        //fixme
107    }
108
109    pub fn get_mut_view(&mut self) -> ArrayViewMut2<DataValue> {
110        self.data_frame.view_mut()
111    }
112
113    pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
114        self.index.rename_key(old, new)
115    }
116
117    pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
118        self.index.add_alias(key, alias)
119    }
120
121    /// Selects the data from the [`ColumnFrame`] by the given keys
122    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
123    /// The data is returned as the [`Vec<Vec<DataValue>>`]
124    /// If the keys are not found, the empty [`Vec<Vec<DataValue>>`] is returned
125    /// Returns the data in the column-major order
126    pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
127        let selected = self.select(Some(keys));
128        let mut result = Vec::with_capacity(selected.nrows());
129        for row in selected.rows() {
130            let mut r = Vec::with_capacity(selected.ncols());
131            for value in row.iter() {
132                r.push(D::extract(value));
133            }
134            result.push(r);
135        }
136        result
137    }
138
139    /// Selects the data from the [`ColumnFrame`] by the given keys
140    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
141    /// The data is returned as the [`Array2`] with the [`DataValue`] values
142    /// If the keys are not found, the empty [`Array2`] is returned
143    /// Returns the [`Array2`] with the data in the column-major order
144    pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
145        let keys = keys.unwrap_or_else(|| self.index.get_keys());
146        let key_indexes = self.index.select(keys);
147        if key_indexes.is_empty() {
148            return Ok(Array2::default((0, 0)));
149        }
150        let data_vec: Vec<Array1<DataValue>> = key_indexes
151            .indexes()
152            .iter()
153            .map(|x| self.data_frame.column(*x).to_owned())
154            .collect();
155        to_array2(data_vec)
156    }
157
158    /// Selects whole column from the [`ColumnFrame`] by the given key
159    /// If the key is not found, the None is returned
160    /// If the key is found, the [`ArrayView1`] with the [`DataValue`] values is returned
161    pub fn select_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
162        self.index
163            .get_column_index(key)
164            .map(|x| self.data_frame.column(x))
165    }
166
167    pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
168    where
169        F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
170    {
171        func(keys, self)
172    }
173
174    /// Validates the access to the entry by the given column [`Key`] and row index
175    /// If the column is not found, an error is returned [`Error::NotFound`]
176    /// If the row index is out of bounds, an error is returned [`Error::IndexOutOfRange`]
177    /// Otherwise, the column index is returned
178    pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
179        if row_index >= self.data_frame.nrows() {
180            return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
181        }
182        let Some(column_index) = self.index.get_column_index(column) else {
183            return Err(Error::NotFound(column.clone()));
184        };
185        Ok(column_index)
186    }
187
188    /// Returns the value [`DataValue`] for the given column defined by [`Key`] and row index
189    /// If the column is not found, None is returned
190    /// If the row index is out of bounds, None is returned
191    pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
192        trace!(
193            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
194            self.data_frame.len(),
195            self.data_frame.nrows()
196        );
197        trace!("{:?}", self.data_frame);
198        match self.validate_entry_access(column, row_index) {
199            Ok(column_index) => self.data_frame.get((row_index, column_index)),
200            Err(e) => {
201                trace!("Error: {e}");
202                None
203            }
204        }
205    }
206
207    /// Returns mutable reference for the value [`DataValue`] for the given column defined by [`Key`] and row index
208    /// If the column is not found, None is returned
209    /// If the row index is out of bounds, None is returned
210    pub fn get_mut_by_row_index(
211        &mut self,
212        column: &Key,
213        row_index: usize,
214    ) -> Option<&mut DataValue> {
215        trace!(
216            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
217            self.data_frame.len(),
218            self.data_frame.nrows()
219        );
220        trace!("{:?}", self.data_frame);
221        match self.validate_entry_access(column, row_index) {
222            Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
223            Err(e) => {
224                trace!("Error: {e}");
225                None
226            }
227        }
228    }
229
230    /// Returns the value [`HashMap<Key, Vec<DataValue>>`] for the given columns defined by [`Key`].
231    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
232    /// If the keys are not found, the empty [`HashMap`] is returned
233    /// Returns the [`Array2`] with the data in the row-major order
234    pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
235        let keys = keys.unwrap_or_else(|| self.index.get_keys());
236        let indexes = self.index.select(keys);
237        if indexes.is_empty() {
238            return Default::default();
239        }
240
241        let mut new_data_frame = HashMap::with_capacity(keys.len());
242
243        for key in keys.iter() {
244            if let Some(column_index_in_source) = indexes.get_column_index(key) {
245                let column = self.data_frame.column(column_index_in_source);
246                new_data_frame.insert(key.clone(), column.to_vec());
247            }
248        }
249
250        new_data_frame
251    }
252
253    /// Returns the value [`Array2<DataValue>`] for the given columns defined by [`Key`].
254    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
255    /// If the keys are not found, the empty [`Array2`] is returned
256    /// Returns the [`Array2`] with the data in the row-major order
257    pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
258        let keys = keys.unwrap_or_else(|| self.index.get_keys());
259        let indexes = self.index.select(keys);
260        if indexes.is_empty() {
261            return Array2::default((0, 0));
262        }
263        let mut new_data_frame = Array2::default((self.data_frame.nrows(), keys.len()));
264
265        for (idx, key) in keys.iter().enumerate() {
266            if let Some(column_index_in_source) = indexes.get_column_index(key) {
267                new_data_frame
268                    .column_mut(idx)
269                    .assign(&self.data_frame.column(column_index_in_source));
270            }
271        }
272
273        new_data_frame
274    }
275
276    fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
277        self.index.store_key(key);
278        let len = self.data_frame.nrows();
279        self.data_frame.push_column(Array1::default(len).view())?;
280        Ok(())
281    }
282
283    /// Pushes the row candidate into the [`ColumnFrame`]
284    /// If the column is not found this method will add the column to the [`ColumnFrame`]
285    ///
286    pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
287        let mut arr = vec![];
288        for key in row_candidate.keys() {
289            if self.index.get_column_index(&key).is_none() {
290                self.extend_dataframe_for_column(key)?;
291            }
292        }
293        for index in self.index.get_keys() {
294            if let Some(value) = row_candidate.get_value_ref(index) {
295                arr.push(value.clone());
296            } else {
297                arr.push(DataValue::Null);
298            }
299        }
300        self.data_frame.push_row(Array::from_vec(arr).view())?;
301        Ok(())
302    }
303
304    pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
305        // fixme this is naive approach
306        let mut indexes = KeyIndex::default();
307        // take remove data
308        let data = self.select(Some(keys));
309        // remove labels from the index
310        for key in keys {
311            if let Some((current, _idx)) = self.index.remove_key(key) {
312                indexes.store_key(current);
313            }
314        }
315        // copy the rest of the data to the new data frame with new index
316        let rest = self.select(None);
317        let keys = self.index.get_keys().to_vec();
318        self.data_frame = rest;
319        self.index = KeyIndex::new(keys);
320
321        //remove_self.data_frame = to_array2(columns)?;
322        Ok(Self::new(indexes, data))
323    }
324
325    fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
326        if self.index.is_empty() {
327            self.index = other.index.clone();
328            self.data_frame = other.data_frame.clone();
329            return Ok(Continue::End);
330        }
331        if other.index.is_empty() {
332            return Ok(Continue::End);
333        }
334        if self.is_empty() {
335            self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
336        }
337
338        Ok(Continue::Continue)
339    }
340
341    fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
342        for key in other.index.get_keys() {
343            if self.index.get_column_index(key).is_none() {
344                self.extend_dataframe_for_column(key.clone())?;
345            }
346        }
347        Ok(())
348    }
349
350    fn try_extend(&mut self, other: Self) -> Result<(), Error> {
351        let mut joined_keys = self.index.clone();
352        // extend keys
353        for key in other.keys() {
354            if self.index.get_column_index(key).is_none() {
355                joined_keys.store_key(key.clone());
356            }
357        }
358
359        let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
360        let mut arr = Array2::default((sum_len, joined_keys.len()));
361        let increment = self.data_frame.nrows();
362        for key in joined_keys.get_keys() {
363            let index_result = joined_keys
364                .get_column_index(key)
365                .expect("BUG: index for this has to be defined");
366            if let Some(index) = self.index.get_column_index(key) {
367                for (idx, value) in self.data_frame.column(index).iter().enumerate() {
368                    if let Some(x) = arr.get_mut((idx, index_result)) {
369                        *x = value.to_owned();
370                    }
371                }
372            }
373
374            if let Some(index) = other.index.get_column_index(key) {
375                for (idx, value) in other.data_frame.column(index).iter().enumerate() {
376                    if let Some(x) = arr.get_mut((increment + idx, index_result)) {
377                        *x = value.to_owned();
378                    }
379                }
380            }
381        }
382        *self = ColumnFrame::new(joined_keys, arr);
383        Ok(())
384    }
385
386    /// Extends the [`ColumnFrame`] with the data from the other [`ColumnFrame`]
387    /// If the [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
388    /// If the other [`KeyIndex`] is empty, nothing happens
389    /// If the length of the [`KeyIndex`] of the other data frame is greater then current,
390    /// an error is returned [`Error::DataSetSizeDoesntMatch`]
391    /// If [`Key`] from other data frame - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
392    ///
393    pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
394        if self.check_or_init_frame(&other)?.should_end() {
395            return Ok(());
396        }
397
398        if self.index.check_order_of_indexes(&other.index).is_err() {
399            return self.try_extend(other);
400        }
401
402        trace!(
403            "Extend columns from other {:?} vs {:?}",
404            other.index.get_keys(),
405            self.index.get_keys()
406        );
407
408        if other.data_frame.ncols() < self.data_frame.ncols() {
409            other.extend_columns_from_other(self)?;
410        } else {
411            self.extend_columns_from_other(&other)?;
412        }
413        self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
414
415        Ok(())
416    }
417
418    /// Replace the [`ColumnFrame`] with the other [`ColumnFrame`]
419    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
420    /// If the other [`KeyIndex`] is empty, nothing happens
421    /// If the [`KeyIndex`] of the other data frame and current doesn't match an error is returned [`Error::DataSetSizeDoesntMatch`]
422    /// If the [`Key`] from other data frame is not present in the current [`ColumnFrame`] - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
423    pub fn replace(&mut self, other: Self) -> Result<(), Error> {
424        if self.check_or_init_frame(&other)?.should_end() {
425            return Ok(());
426        }
427
428        if self.data_frame.len() > other.data_frame.len() {
429            return Err(Error::DataSetSizeDoesntMatch(
430                self.data_frame.len(),
431                other.data_frame.len(),
432            ));
433        }
434        self.index = other.index;
435        self.data_frame = other.data_frame;
436
437        Ok(())
438    }
439
440    /// Joins the candidates by the keys in the `JoinRelation::JoinById` struct.
441    /// This function creates [`Index`] for the keys and then joins the candidates by the keys.
442    pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
443        if self.check_or_init_frame(&right)?.should_end() {
444            return Ok(());
445        }
446        let new_columnns = right.index.get_complement_keys(self.index.get_keys());
447        // add new columns and keys into the column frame index
448        self.extend_columns_from_other(&right)?;
449        // get the indexes for the keys
450        let index = Index::new(keys.to_vec(), self);
451        tracing::trace!("Index {index:?}");
452        let right_indexes = right.index.select(keys);
453
454        let mut new_df = Array2::default((self.len(), self.index.len()));
455        new_df.assign(&self.data_frame);
456        trace!("Indexes: {right_indexes:?}");
457        trace!("New DF: {new_df:?}");
458        trace!("Right DF: {:?}", right.data_frame);
459        trace!("current {:?}", self.data_frame);
460        // iterate over all rows in the right data frame and find the corresponding row in the current data frame
461        // then fill the new data frame with the values from the right data frame
462        // if the row is not found, the row is skipped - the data is not filled - [`DataValue::Null`] is used
463        for c_row in right.data_frame.rows() {
464            let values = right_indexes
465                .get_keys()
466                .iter()
467                .map(|k| {
468                    c_row
469                        .get(
470                            right_indexes
471                                .get_column_index(k)
472                                .expect("BUG: Something is very wrong"),
473                        )
474                        .cloned()
475                        .unwrap_or_default()
476                })
477                .collect::<Vec<_>>();
478            if let Some(row_index) = index.get(&values) {
479                trace!("Index: {row_index} {c_row:?} -> {values:?} vs {new_df:?}");
480                for complement_key in new_columnns.iter() {
481                    let column_index = self
482                        .index
483                        .get_column_index(complement_key)
484                        .expect("BUG: Something is very wrong");
485                    if let Some(right_column_index) = right.index.get_column_index(complement_key) {
486                        trace!("Filling Index: [{complement_key:?}] ri:{row_index} rci:{right_column_index:?} -> {:?} vs {:?}", c_row[right_column_index], new_df.get_mut((row_index, column_index)));
487                        if let Some(v) = new_df.get_mut((row_index, column_index)) {
488                            *v = c_row[right_column_index].to_owned();
489                        }
490                    }
491                }
492            }
493        }
494
495        self.data_frame = new_df;
496
497        Ok(())
498    }
499
500    /// Adds the single column to the current [`ColumnFrame`]
501    /// If the column is already present, an error is returned [`Error::ColumnAlreadyExists`]
502    /// If the length of the column is different from the current data frame, an error is returned [`Error::DataSetSizeDoesntMatch`]
503    pub fn add_single_column<K: Into<Key>>(
504        &mut self,
505        key: K,
506        column: Array1<DataValue>,
507    ) -> Result<(), Error> {
508        let key = key.into();
509        if self.index.get_column_index(&key).is_some() {
510            return Err(Error::ColumnAlreadyExists(key));
511        }
512        if self.len() != column.len() && !self.is_empty() {
513            return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
514        }
515
516        self.index.store_key(key.clone());
517        let rows = column.len();
518        let column_index = self
519            .index
520            .get_column_index(&key)
521            .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
522        if self.is_empty() && self.index.len() == 1 {
523            self.data_frame = column.into_shape_clone((rows, 1))?;
524            assert_eq!(self.data_frame.column(column_index).len(), rows);
525        } else if self.is_empty() {
526            self.data_frame = Array2::default((column.len(), self.index.len() - 1));
527            self.data_frame.push_column(column.view())?;
528            assert_eq!(self.data_frame.column(column_index).len(), rows);
529        } else {
530            self.data_frame.push_column(column.view())?;
531        }
532        assert_eq!(self.data_frame.column(column_index).len(), rows);
533
534        Ok(())
535    }
536    /// Adds the columns from the other [`ColumnFrame`] to the current [`ColumnFrame`]
537    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
538    /// If the other [`KeyIndex`] is empty, nothing happens
539    pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
540        if self.check_or_init_frame(&other)?.should_end() {
541            return Ok(());
542        }
543
544        self.extend_columns_from_other(&other)?;
545        for (idx, key) in other.index.get_keys().iter().enumerate() {
546            if let Some(index) = self.index.get_column_index(key) {
547                let arr = other.data_frame.column(idx);
548                trace!(
549                    "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
550                    self.data_frame.nrows(),
551                    arr.len()
552                );
553                if arr.len() != self.data_frame.nrows() {
554                    self.data_frame.column_mut(index).fill(DataValue::Null);
555                } else {
556                    self.data_frame.column_mut(index).assign(&arr);
557                }
558            }
559        }
560        Ok(())
561    }
562
563    /// Broadcasts the data from the other [`ColumnFrame`] to the current [`ColumnFrame`]
564    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
565    /// If the other [`KeyIndex`] is empty, nothing happens
566    /// If the length (number of rows) of the other data frame is greater then 1 an error is returned [`Error::CannotBroadcast`]
567    pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
568        if self.check_or_init_frame(&other)?.should_end() {
569            return Ok(());
570        }
571        if other.data_frame.nrows() != 1 {
572            return Err(Error::CannotBroadcast);
573        }
574        self.extend_columns_from_other(&other)?;
575        let mut new_df = Array2::default((self.len(), self.index.len()));
576        for (idx, key) in self.index.get_keys().iter().enumerate() {
577            if let Some(other_idx) = other.index.get_column_index(key) {
578                new_df
579                    .column_mut(idx)
580                    .assign(&other.data_frame.column(other_idx));
581            } else {
582                new_df.column_mut(idx).assign(&self.data_frame.column(idx));
583            }
584        }
585        self.data_frame = new_df;
586        Ok(())
587    }
588
589    /// Computes the Cartesian product of the input structures,
590    /// resulting in all possible combinations of elements.
591    /// The data is stored in the row-major order
592    /// The keys are stored in the order they are added - the order is preserved - new keys from the `other` [`ColumnFrame`] are added to the end
593    pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
594        if self.check_or_init_frame(&other)?.should_end() {
595            return Ok(());
596        }
597        // extend the columns
598        // let mut new_index = self.index.clone();
599        for other_key in other.keys() {
600            if self.index.get_column_index(other_key).is_none() {
601                self.index.store_key(other_key.clone());
602            } else {
603                self.index.store_key(Key::new(
604                    format!("{}-{}", other_key, other_key.id()).as_str(),
605                    other_key.ctype,
606                ));
607            }
608        }
609        let max_rows = self.len() * other.len();
610        let ncols = self.index.len();
611        // create new data frame
612        let mut new_df = Array2::default((max_rows, ncols));
613
614        let mut cur_idx = 0;
615        for cur_row in self.data_frame.rows() {
616            for other_row in other.data_frame.rows() {
617                new_df
618                    .slice_mut(s![cur_idx, ..])
619                    .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
620                cur_idx += 1;
621            }
622        }
623        self.data_frame = new_df;
624        Ok(())
625    }
626
627    /// Joins the candidates with the other candidates by the [`JoinRelation`] policy.
628    /// For [`JoinBy::AddColumns`] the columns are added to the existing structure via [`Self::add_columns`]
629    /// For [`JoinBy::Replace`] the columns are replaced with the new columns
630    /// For [`JoinBy::Extend`] the candidates are extended via [`Self::extend`]
631    /// For [`JoinBy::Broadcast`] each candidate is extended with the values of the other candidates `Self::broadcast`
632    /// For [`JoinBy::CartesianProduct`] the candidates are multiplied by the other candidates
633    /// For [`JoinBy::JoinById`] the candidates are joined by the keys in the `JoinRelation::JoinById` struct see [`Self::join_by_id_inner`]
634    pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
635        use JoinBy::*;
636        match &join_type.join_type {
637            AddColumns => self.add_columns(right),
638            Replace => self.replace(right),
639            Extend => self.extend(right),
640            Broadcast => self.broadcast(right),
641            CartesianProduct => self.cartesian_product(right),
642            JoinById(join) => self.join_by_id_inner(right, &join.keys),
643        }
644    }
645
646    pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<DataValue>> {
647        self.index
648            .get_column_index(key)
649            .map(|x| self.data_frame.column(x))
650    }
651
652    pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
653        let index = self
654            .index
655            .get_column_index(key)
656            .ok_or(Error::NotFound(key.clone()))?;
657        let column = self.data_frame.column(index);
658        let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
659        tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
660        let middle = data_with_index.len() / 2;
661        data_with_index.select_nth_unstable_by(middle, |(a_idx, a_val), (b_idx, b_val)| {
662            a_val
663                .partial_cmp(b_val)
664                .unwrap_or(std::cmp::Ordering::Equal)
665                .then_with(|| a_idx.cmp(b_idx))
666        });
667        tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
668        let indicies = data_with_index
669            .into_iter()
670            .map(|(idx, _)| idx)
671            .collect::<Vec<_>>();
672
673        Ok(sorted_df::SortedDataFrame::new(self, indicies))
674    }
675
676    pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
677        let mut final_indices = Vec::new();
678        for rule in &filter.rules {
679            final_indices.extend(filter_df::filter_combination(self, rule)?);
680        }
681
682        final_indices.sort_unstable();
683        final_indices.dedup();
684        let mut new_df = ColumnFrame::new(
685            self.index.clone(),
686            Array2::default((final_indices.len(), self.index.len())),
687        );
688        final_indices
689            .iter()
690            .enumerate()
691            .for_each(|(cur_idx, row_idx)| {
692                new_df
693                    .data_frame
694                    .slice_mut(s![cur_idx, ..])
695                    .assign(&self.data_frame.slice(s![*row_idx, ..]));
696            });
697
698        Ok(new_df)
699    }
700}
701
702pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
703    let width = source.len();
704    let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
705    let height = flattened.len() / width;
706    Ok(flattened.into_shape_with_order((width, height))?)
707}
708#[macro_export]
709macro_rules! df {
710    ($($everything:tt)*) => {
711        $crate::DataFrame::new($crate::column_frame!($($everything)*))
712    };
713}
714
715#[macro_export]
716macro_rules! column_frame {
717    // case { "a" => 1, }
718    ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
719    // case { "a" => vec![1, 2, 3] }
720    ($($key:expr => vec![$($value:expr),*]),*) => {
721        $crate::column_frame!($($key => [$($value),*]),*)
722    };
723    // case { "a" => [1, 2, 3] }
724    ($($key:expr => [$($value:expr),*]),*) => {
725        {
726           let data = ::ndarray::arr2(&[$(
727                [$($value.into(),)*],
728            )*]);
729
730           let _keys = vec![$($key.into(),)*];
731
732            $crate::ColumnFrame::new(
733                $crate::KeyIndex::new(_keys),
734                data.reversed_axes()
735            )
736        }
737    };
738    // case { "a" => 1 }
739    ($($key:expr => $value:expr),*) => {
740        {
741           let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
742           let _keys = vec![$($key.into(),)*];
743
744            $crate::ColumnFrame::new(
745                $crate::KeyIndex::new(_keys),
746                _data,
747            )
748        }
749    };
750}
751
752#[cfg(test)]
753mod test {
754    use crate::{filter::FilterRules, JoinById};
755
756    use super::*;
757    use data_value::stdhashmap;
758    use ndarray::ArrayView;
759    use rstest::*;
760    use tracing_test::traced_test;
761
762    #[rstest]
763    #[case(
764        column_frame! {
765            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
766            "b" => [4, 5, 6],
767            "c" => [7, 8, 9]
768        },
769        column_frame! {
770            "t" => [1752001987000000u64],
771            "b" => [5],
772            "c" => [8]
773        },
774        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
775    )]
776    #[case(
777        column_frame! {
778            "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
779            "b" => [4, 5, 6],
780            "c" => [7, 8, 9]
781        },
782        column_frame! {
783            "t" => [1752001987000000f64],
784            "b" => [5],
785            "c" => [8]
786        },
787        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
788    )]
789    #[case(
790        column_frame! {
791            "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
792            "b" => [4, 5, 6],
793            "c" => [7, 8, 9]
794        },
795        column_frame! {
796            "t" => [1752001987000000i64],
797            "b" => [5],
798            "c" => [8]
799        },
800        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
801    )]
802    #[case(
803        column_frame! {
804            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
805            "b" => [4, 5, 6],
806            "c" => [7, 8, 9]
807        },
808        column_frame! {
809            "t" => [1751001987000000u64],
810            "b" => [4],
811            "c" => [7]
812        },
813        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
814    )]
815    #[case(
816        column_frame! {
817            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
818            "b" => [4, 5, 6],
819            "c" => [7, 8, 9]
820        },
821        column_frame! {
822            "t" => ["2025-07-08 18:13:07"],
823            "b" => [4],
824            "c" => [7]
825        },
826        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
827    )]
828    #[case(
829        column_frame! {
830            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
831            "b" => [4, 5, 6],
832            "c" => [7, 8, 9]
833        },
834        column_frame! {
835            "t" => [],
836            "b" => [],
837            "c" => []
838        },
839        FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
840    )]
841    #[case(
842        column_frame! {
843            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
844            "b" => [4, 5, 6],
845            "c" => [7, 8, 9]
846        },
847        column_frame! {
848            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
849            "b" => [4, 5, 6],
850            "c" => [7, 8, 9]
851        },
852        FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
853    )]
854    #[case(
855        column_frame! {
856            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
857            "b" => [4, 5, 6],
858            "c" => [7, 8, 9]
859        },
860        column_frame! {
861            "t" => [DataValue::Vec(vec![])],
862            "b" => [5],
863            "c" => [ 8]
864        },
865        FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
866    )]
867    #[case(
868        column_frame! {
869            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
870            "b" => [4, 5, 6],
871            "c" => [7, 8, 9]
872        },
873        column_frame! {
874            "t" => [DataValue::Vec(vec![1.into()])],
875            "b" => [6],
876            "c" => [9]
877        },
878        FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
879    )]
880    #[case(
881        column_frame! {
882            "a" => [1, 2, 3],
883            "b" => [4, 5, 6],
884            "c" => [7, 8, 9]
885        },
886        column_frame! {
887            "a" => [1, 2],
888            "b" => [4, 5],
889            "c" => [7, 8]
890        },
891        FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
892    )]
893    #[case(
894        column_frame! {
895            "a" => [1, 2, 3],
896            "b" => [4, 5, 6],
897            "c" => [7, 8, 9]
898        },
899        column_frame! {
900            "a" => [2],
901            "b" => [5],
902            "c" => [8]
903        },
904        FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
905    )]
906    #[case(
907        column_frame! {
908            "a" => [1, 2, 3],
909            "b" => [4, 5, 6],
910            "c" => [7, 8, 9]
911        },
912        column_frame! {
913            "a" => [],
914            "b" => [],
915            "c" => []
916        },
917        FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
918    )]
919    #[case(
920        column_frame! {
921            "a" => [1, 2, 3],
922            "b" => [4, 5, 6],
923            "c" => [7, 8, 9]
924        },
925        column_frame! {
926            "a" => [1, 2],
927            "b" => [4, 5],
928            "c" => [7, 8]
929        },
930        FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
931    )]
932    #[case(
933        column_frame! {
934            "a" => [1, 2, 3],
935            "b" => [4, 5, 6],
936            "c" => [7, 8, 9]
937        },
938        column_frame! {
939            "a" => [2],
940            "b" => [5],
941            "c" => [8]
942        },
943        FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
944    )]
945    #[case(
946        column_frame! {
947            "a" => ["abcd", "ab", "abcdefg"],
948            "b" => [4, 5, 6],
949            "c" => [7, 8, 9]
950        },
951        column_frame! {
952            "a" => ["abcd","abcdefg"],
953            "b" => [4, 6],
954            "c" => [7, 9]
955        },
956        FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
957    )]
958    #[case(
959        column_frame! {
960            "a" => [1, 2, 3],
961            "b" => [4, 5, 6],
962            "c" => [7, 8, 9]
963        },
964        column_frame! {
965            "a" => [1],
966            "b" => [4],
967            "c" => [7]
968        },
969        FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
970    )]
971    #[case(
972        column_frame! {
973            "a" => [1, 2, 3],
974            "b" => [4, 5, 6],
975            "c" => [7, 8, 9]
976        },
977        column_frame! {
978            "a" => [2, 3],
979            "b" => [5, 6],
980            "c" => [8, 9]
981        },
982        FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
983    )]
984    #[case(
985        column_frame! {
986            "a" => [1f64, 2f64, 3f64],
987            "b" => [4, 5, 6],
988            "c" => [7, 8, 9]
989        },
990        column_frame! {
991            "a" => [1f64, 2f64],
992            "b" => [4, 5],
993            "c" => [7, 8]
994        },
995        FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
996    )]
997    #[case(
998        column_frame! {
999            "a" => [1f64, 2f64, 3f64],
1000            "b" => [4i64, 5i64, 6i64],
1001            "c" => [7i64, 8i64, 9i64]
1002        },
1003        column_frame! {
1004            "a" => [1f64, 2f64],
1005            "b" => [4i64, 5i64],
1006            "c" => [7i64, 8i64]
1007        },
1008        FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1009    )]
1010    #[traced_test]
1011    fn filter_test(
1012        #[case] df: ColumnFrame,
1013        #[case] expected: ColumnFrame,
1014        #[case] filter: FilterRules,
1015    ) {
1016        let filtered = df.filter(&filter).expect("BUG: cannot filter");
1017        assert_eq!(filtered, expected);
1018    }
1019
1020    #[rstest]
1021    #[traced_test]
1022    fn test_macro() {
1023        let df = column_frame! {
1024            "a" => 1,
1025            "b" => 2,
1026            "c" => 3,
1027            "d" => 4,
1028        };
1029
1030        assert_eq!(df.len(), 1);
1031        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1032        let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1033            .expect("BUG: cannot create array");
1034        assert_eq!(df.select(None), f);
1035
1036        let df = column_frame! {
1037            "a" => [1, 2, 3],
1038            "b" => [4, 5, 6],
1039            "c" => [7, 8, 9]
1040        };
1041
1042        assert_eq!(df.len(), 3);
1043        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1044        let f = Array2::from_shape_vec(
1045            (3, 3),
1046            vec![
1047                1.into(),
1048                4.into(),
1049                7.into(),
1050                2.into(),
1051                5.into(),
1052                8.into(),
1053                3.into(),
1054                6.into(),
1055                9.into(),
1056            ],
1057        )
1058        .expect("BUG: cannot create array");
1059        let selected = df.select(None);
1060        trace!("{selected:?}");
1061        assert_eq!(selected, f);
1062
1063        let df1 = df! {
1064            "a" => [1, 2, 3],
1065            "b" => [4, 5, 6],
1066            "c" => [7, 8, 9]
1067        };
1068
1069        // just to make sure we've tested Display
1070        let formatted = format!("{}", df);
1071        debug!("{}", formatted);
1072
1073        assert_eq!(df1, crate::DataFrame::from(df));
1074    }
1075
1076    #[rstest]
1077    #[case(
1078        column_frame! {
1079            "a" => [1, 2, 3],
1080            "b" => [4, 5, 6],
1081            "c" => [7, 8, 9]
1082        },
1083        column_frame! {
1084            "a_new" => [1, 2, 3],
1085            "b" => [4, 5, 6],
1086            "c" => [7, 8, 9]
1087        },
1088        vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1089        vec![("a", "a_new".into())]
1090    )]
1091    #[traced_test]
1092    fn rename_test(
1093        #[case] df: ColumnFrame,
1094        #[case] expected: ColumnFrame,
1095        #[case] keys: Vec<Key>,
1096        #[case] renames: Vec<(&str, Key)>,
1097    ) {
1098        let mut df = df;
1099        for (old, new) in renames {
1100            df.rename_key(old, new).expect("BUG: cannot rename key");
1101        }
1102        assert_eq!(df, expected);
1103        assert_eq!(df.keys(), keys.as_slice());
1104    }
1105
1106    #[rstest]
1107    #[case(
1108        column_frame! {
1109            "a" => [1, 2, 3],
1110            "b" => [4, 5, 6],
1111            "c" => [7, 8, 9]
1112        },
1113        vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1114        vec![("a", "a_alias")]
1115    )]
1116    #[traced_test]
1117    fn alias_test(
1118        #[case] df: ColumnFrame,
1119        #[case] keys: Vec<Key>,
1120        #[case] aliases: Vec<(&str, &str)>,
1121    ) {
1122        let mut df = df;
1123        for (old, new) in aliases {
1124            df.add_alias(old, new).expect("BUG: cannot rename key");
1125        }
1126        let origin_keys = df.keys().to_vec();
1127        let selected_aliases = df.select(Some(keys.as_slice()));
1128        let selected = df.select(Some(origin_keys.as_slice()));
1129        assert_eq!(selected, selected_aliases);
1130    }
1131
1132    #[rstest]
1133    #[traced_test]
1134    fn test_mut_view() {
1135        let data = vec![
1136            DataValue::from(1f64),
1137            DataValue::from(4f32),
1138            DataValue::from(2f64),
1139            DataValue::from(f32::NAN),
1140            DataValue::from(f64::NAN),
1141            DataValue::from(f32::INFINITY),
1142        ];
1143        let keys: Vec<Key> = vec!["a".into(), "b".into()];
1144
1145        let index = KeyIndex::new(keys.clone());
1146        let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1147        let mut df = ColumnFrame::new(index.clone(), df);
1148        df.get_mut_view().mapv_inplace(|x| match x {
1149            DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1150            DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1151            e => e,
1152        });
1153        let data = vec![
1154            DataValue::from(1f64),
1155            DataValue::from(4f32),
1156            DataValue::from(2f64),
1157            DataValue::from(0f32),
1158            DataValue::from(0f64),
1159            DataValue::from(0f32),
1160        ];
1161        let expected = ColumnFrame::new(
1162            index,
1163            Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1164        );
1165        assert_eq!(df, expected);
1166    }
1167
1168    #[rstest]
1169    #[traced_test]
1170    fn dummy_test() {
1171        let data = vec![
1172            DataValue::U32(1),
1173            DataValue::I32(2),
1174            DataValue::I64(3),
1175            DataValue::U64(4),
1176        ];
1177
1178        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1179
1180        let index = KeyIndex::new(keys.clone());
1181        let mut data_frame = Array2::default((1, keys.len()));
1182        for (idx, entry) in data.iter().enumerate() {
1183            data_frame
1184                .column_mut(idx)
1185                .assign(&ArrayView::from(&[entry.clone()]));
1186        }
1187
1188        let frame = ColumnFrame::new(index, data_frame);
1189        assert_eq!(
1190            frame.get_by_row_index(&"a".into(), 0),
1191            Some(&DataValue::U32(1))
1192        );
1193        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1194        assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1195        assert_eq!(
1196            frame.select(Some(&["a".into(), "b".into()])),
1197            Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1198                .expect("BUG: cannot create array")
1199        );
1200    }
1201
1202    #[rstest]
1203    #[traced_test]
1204    fn dummy_test_multiple_rows() {
1205        let data = vec![
1206            DataValue::U32(1),
1207            DataValue::I32(2),
1208            DataValue::I64(3),
1209            DataValue::U64(4),
1210            DataValue::U32(12),
1211            DataValue::I32(22),
1212            DataValue::I64(32),
1213            DataValue::U64(42),
1214        ];
1215
1216        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1217
1218        let index = KeyIndex::new(keys.clone());
1219        let data_frame =
1220            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1221
1222        let frame = ColumnFrame::new(index, data_frame);
1223        assert_eq!(
1224            frame.get_by_row_index(&"a".into(), 0),
1225            Some(&DataValue::U32(1))
1226        );
1227        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1228        assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1229        let arr = Array2::from_shape_vec(
1230            (2, 2),
1231            vec![
1232                DataValue::U32(1),
1233                DataValue::I32(2),
1234                DataValue::U32(12),
1235                DataValue::I32(22),
1236            ],
1237        )
1238        .expect("BUG: cannot create array");
1239        trace!("{arr:?}");
1240        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1241    }
1242
1243    #[rstest]
1244    #[traced_test]
1245    fn dummy_test_multiple_rows_push() {
1246        let data = vec![
1247            DataValue::U32(1),
1248            DataValue::I32(2),
1249            DataValue::I64(3),
1250            DataValue::U64(4),
1251            DataValue::U32(12),
1252            DataValue::I32(22),
1253            DataValue::I64(32),
1254            DataValue::U64(42),
1255        ];
1256        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1257
1258        let index = KeyIndex::new(keys.clone());
1259        let data_frame =
1260            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1261
1262        let mut frame = ColumnFrame::new(index, data_frame);
1263        assert!(frame
1264            .push(data_value::stdhashmap!(
1265                "a" => DataValue::U32(2),
1266                "b" => DataValue::I32(3),
1267                "c" => DataValue::I64(4),
1268                "d" => DataValue::U64(5)
1269            ))
1270            .is_ok());
1271        let arr = Array2::from_shape_vec(
1272            (3, 2),
1273            vec![
1274                DataValue::U32(1),
1275                DataValue::I32(2),
1276                DataValue::U32(12),
1277                DataValue::I32(22),
1278                DataValue::U32(2),
1279                DataValue::I32(3),
1280            ],
1281        )
1282        .expect("BUG: cannot create array");
1283        trace!("{arr:?}");
1284        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1285        let result = frame.push(data_value::stdhashmap!(
1286            "a" => DataValue::U32(34),
1287            "b" => DataValue::I32(44),
1288            "c" => DataValue::I64(54),
1289            "e" => DataValue::F32(6f32)
1290        ));
1291        assert!(result.is_ok(), "{result:?}");
1292        let arr = Array2::from_shape_vec(
1293            (4, 2),
1294            vec![
1295                DataValue::U64(4),
1296                DataValue::Null,
1297                DataValue::U64(42),
1298                DataValue::Null,
1299                DataValue::U64(5),
1300                DataValue::Null,
1301                DataValue::Null,
1302                DataValue::F32(6f32),
1303            ],
1304        )
1305        .expect("BUG: cannot create array");
1306        trace!("{arr:?}");
1307        assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1308    }
1309
1310    #[rstest]
1311    #[case(
1312        column_frame! {
1313            "group_id" => vec![1, 2],
1314            "feed_tag" => vec![3, 4]
1315        },
1316        Some(vec![Key::from("group_id")]),
1317        ndarray::array!([1.into()], [2.into()])
1318    )]
1319    #[case(
1320        column_frame! {
1321            "group_id" => vec![1, 2],
1322            "feed_tag" => vec![3, 4]
1323        },
1324        Some(vec!["group_id".into(), "feed_tag".into()]),
1325        ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1326    )]
1327    #[case(
1328        column_frame! {
1329            "group_id" => vec![1, 2],
1330            "feed_tag" => vec![3, DataValue::Null]
1331        },
1332        Some(vec!["feed_tag".into()]),
1333        ndarray::array![[3.into()], [DataValue::Null]]
1334    )]
1335    #[case(
1336        column_frame! {
1337            "group_id" => vec![1, 2],
1338            "feed_tag" => vec![1, DataValue::Null]
1339        },
1340        Some(vec!["feed_tag2".into()]),
1341       Array2::<DataValue>::default((0, 0))
1342    )]
1343    #[traced_test]
1344    fn test_select(
1345        #[case] input: ColumnFrame,
1346        #[case] keys: Option<Vec<Key>>,
1347        #[case] expected: Array2<DataValue>,
1348    ) {
1349        trace!("input={input:?}");
1350        let keys_slice = keys.as_deref();
1351        let selected = input.select(keys_slice);
1352        trace!("selected={selected:?}");
1353        assert_eq!(selected, expected);
1354        let selected = input.select_transposed(keys_slice);
1355        trace!("selected_transposed={selected:?}");
1356        assert!(selected.is_ok());
1357        assert_eq!(selected.unwrap(), expected.t());
1358    }
1359
1360    #[rstest]
1361    #[case(
1362        column_frame! {
1363            "group_id" => vec![1, 2],
1364            "feed_tag" => vec![3, 4]
1365        },
1366        Key::from("group_id"),
1367        Some(ndarray::array!(1.into(), 2.into()))
1368    )]
1369    #[case(
1370        column_frame! {
1371            "group_id" => vec![1, 2, 5, 6],
1372            "feed_tag" => vec![3, 4, 7, 8]
1373        },
1374        Key::from("group_id"),
1375        Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1376    )]
1377    #[case(
1378        column_frame! {
1379            "group_id" => vec![1, 2],
1380            "feed_tag" => vec![1, 1]
1381        },
1382        Key::from("feed_tag1"),
1383        None
1384    )]
1385    #[traced_test]
1386    fn test_select_column(
1387        #[case] input: ColumnFrame,
1388        #[case] key: Key,
1389        #[case] expected: Option<Array1<DataValue>>,
1390    ) {
1391        let selected = input.select_column(&key);
1392        trace!("selected={selected:?}");
1393        match expected {
1394            Some(expected) => {
1395                assert!(selected.is_some());
1396                assert_eq!(selected.expect("BUG: checked above"), expected);
1397            }
1398            None => assert!(selected.is_none()),
1399        }
1400    }
1401
1402    #[test]
1403    #[traced_test]
1404    fn empty_join_test() {
1405        let join = JoinRelation::add_columns();
1406        let mut column_frame = ColumnFrame::default();
1407        column_frame
1408            .add_single_column("group_id", Array1::from_vec(vec![]))
1409            .expect("BUG: cannot add column");
1410        let column_frame2 = column_frame! {
1411            "group_id" => vec![2, 1, 3],
1412            "feed_tag" => vec![1, 1, 1],
1413            "clicks" => vec![100, 10, 10],
1414            "imps" => vec![1000, 200, 200]
1415        };
1416        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1417
1418        let joined = column_frame.join(column_frame2, &join);
1419        assert!(joined.is_ok(), "{joined:?}");
1420
1421        trace!("{column_frame:?}");
1422        assert_eq!(
1423            column_frame.select(Some(&[
1424                "group_id".into(),
1425                "feed_tag".into(),
1426                "clicks".into(),
1427                "imps".into()
1428            ])),
1429            ndarray::array!(
1430                [2.into(), 1.into(), 100.into(), 1000.into()],
1431                [1.into(), 1.into(), 10.into(), 200.into()],
1432                [3.into(), 1.into(), 10.into(), 200.into()],
1433            )
1434        );
1435
1436        let mut column_frame2 = column_frame! {
1437            "feed_tag" => vec![1, 1, 1],
1438            "clicks" => vec![100, 10, 10],
1439            "imps" => vec![1000, 200, 200]
1440        };
1441        let mut column_frame = ColumnFrame::default();
1442        column_frame
1443            .add_single_column("group_id", Array1::from_vec(vec![]))
1444            .expect("BUG: cannot add column");
1445        let joined = column_frame2.join(column_frame, &join);
1446        assert!(joined.is_ok(), "{joined:?}");
1447
1448        trace!("{column_frame2:?}");
1449        assert_eq!(
1450            column_frame2.select(Some(&[
1451                "group_id".into(),
1452                "feed_tag".into(),
1453                "clicks".into(),
1454                "imps".into()
1455            ])),
1456            ndarray::array!(
1457                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1458                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1459                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1460            )
1461        );
1462    }
1463
1464    #[test]
1465    #[traced_test]
1466    fn join_test() {
1467        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1468            "group_id".into(),
1469            "feed_tag".into(),
1470        ])));
1471        let mut column_frame = column_frame! {
1472            "group_id" => vec![1, 2, 8],
1473            "feed_tag" => vec![1, 1, 10]
1474        };
1475        let column_frame2 = column_frame! {
1476            "group_id" => vec![2, 1, 3],
1477            "feed_tag" => vec![1, 1, 1],
1478            "clicks" => vec![100, 10, 10],
1479            "imps" => vec![1000, 200, 200]
1480        };
1481        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1482
1483        let joined = column_frame.join(column_frame2, &join);
1484        assert!(joined.is_ok(), "{joined:?}");
1485
1486        trace!("{column_frame:?}");
1487        assert_eq!(
1488            column_frame.select(Some(&[
1489                "group_id".into(),
1490                "feed_tag".into(),
1491                "clicks".into(),
1492                "imps".into()
1493            ])),
1494            ndarray::array!(
1495                [1.into(), 1.into(), 10.into(), 200.into()],
1496                [2.into(), 1.into(), 100.into(), 1000.into()],
1497                [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1498            )
1499        )
1500    }
1501
1502    #[test]
1503    #[traced_test]
1504    fn join_test_with_additional() {
1505        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1506            "group_id".into(),
1507            "feed_tag".into(),
1508        ])));
1509        let mut column_frame = column_frame! {
1510            "group_id" => vec![1, 2, 8],
1511            "feed_tag" => vec![1, 1, 10],
1512            "clicked" => vec![0, 0, 1]
1513        };
1514        let column_frame2 = column_frame! {
1515            "group_id" => vec![2, 1, 3],
1516            "feed_tag" => vec![1, 1, 1],
1517            "clicks" => vec![100, 10, 10],
1518            "imps" => vec![1000, 200, 200]
1519        };
1520        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1521
1522        let joined = column_frame.join(column_frame2, &join);
1523        assert!(joined.is_ok(), "{joined:?}");
1524
1525        trace!("{column_frame:?}");
1526        assert_eq!(
1527            column_frame.select(Some(&[
1528                "group_id".into(),
1529                "feed_tag".into(),
1530                "clicks".into(),
1531                "imps".into(),
1532                "clicked".into()
1533            ])),
1534            ndarray::array!(
1535                [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1536                [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
1537                [
1538                    8.into(),
1539                    10.into(),
1540                    DataValue::Null,
1541                    DataValue::Null,
1542                    1.into()
1543                ]
1544            )
1545        )
1546    }
1547
1548    #[test]
1549    #[traced_test]
1550    fn join_test_with_additional_single() {
1551        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1552            "group_id".into(),
1553            "feed_tag".into(),
1554        ])));
1555        let mut column_frame = column_frame! {
1556            "group_id" => vec![1, 2, 8],
1557            "feed_tag" => vec![1, 1, 10],
1558            "clicked" => vec![0, 0, 1]
1559        };
1560        let column_frame2 = column_frame! {
1561            "a" => vec![1],
1562            "group_id" => vec![2],
1563            "feed_tag" => vec![1],
1564            "clicks" => vec![10],
1565            "imps" => vec![200]
1566        };
1567        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1568
1569        let joined = column_frame.join(column_frame2, &join);
1570        assert!(joined.is_ok(), "{joined:?}");
1571
1572        trace!("{column_frame:?}");
1573        assert_eq!(
1574            column_frame.select(Some(&[
1575                "group_id".into(),
1576                "feed_tag".into(),
1577                "clicks".into(),
1578                "imps".into(),
1579                "clicked".into()
1580            ])),
1581            ndarray::array!(
1582                [
1583                    1.into(),
1584                    1.into(),
1585                    DataValue::Null,
1586                    DataValue::Null,
1587                    0.into(),
1588                ],
1589                [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
1590                [
1591                    8.into(),
1592                    10.into(),
1593                    DataValue::Null,
1594                    DataValue::Null,
1595                    1.into()
1596                ]
1597            )
1598        )
1599    }
1600
1601    #[rstest]
1602    #[traced_test]
1603    fn cartesian_product_join() {
1604        let mut df = column_frame! {
1605            "group_id" => vec![1, 2, 3],
1606            "feed_tag" => vec![1, 2, 3]
1607        };
1608        let df2 = column_frame! {
1609            "zone_id" => vec![111111, 111133],
1610            "zone_avg_ctr" => vec![0.1, 0.001]
1611        };
1612        assert!(df
1613            .join(
1614                ColumnFrame::default(),
1615                &JoinRelation::new(JoinBy::CartesianProduct)
1616            )
1617            .is_ok());
1618        let join = JoinRelation::new(JoinBy::CartesianProduct);
1619        let result = df.join(df2, &join);
1620        assert!(result.is_ok(), "{result:?}");
1621        let selected = df.select(None);
1622        trace!("{selected:?}");
1623        assert_eq!(
1624            selected,
1625            ndarray::array!(
1626                [1.into(), 1.into(), 111111.into(), 0.1.into()],
1627                [1.into(), 1.into(), 111133.into(), 0.001.into()],
1628                [2.into(), 2.into(), 111111.into(), 0.1.into()],
1629                [2.into(), 2.into(), 111133.into(), 0.001.into()],
1630                [3.into(), 3.into(), 111111.into(), 0.1.into()],
1631                [3.into(), 3.into(), 111133.into(), 0.001.into()],
1632            )
1633        );
1634
1635        let df2 = column_frame! {
1636            "zone_id" => vec![111]
1637        };
1638        let result = df.join(df2, &join);
1639        assert!(result.is_ok(), "{result:?}");
1640        let selected = df.select(None);
1641        trace!("{selected:?}");
1642        assert_eq!(
1643            selected,
1644            ndarray::array!(
1645                [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
1646                [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
1647                [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
1648                [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
1649                [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
1650                [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
1651            )
1652        );
1653    }
1654
1655    #[rstest]
1656    #[traced_test]
1657    fn broadcast_join() {
1658        let mut df = column_frame! {
1659            "group_id" => vec![1, 2, 3],
1660            "feed_tag" => vec![1, 2, 3]
1661        };
1662        let df2 = column_frame! {
1663            "zone_id" => vec![111111]
1664        };
1665        assert!(df
1666            .join(
1667                ColumnFrame::default(),
1668                &JoinRelation::new(JoinBy::Broadcast)
1669            )
1670            .is_ok());
1671        let join = JoinRelation::new(JoinBy::Broadcast);
1672        assert!(df.join(df2, &join).is_ok());
1673        let selected = df.select(None);
1674        trace!("{selected:?}");
1675        assert_eq!(
1676            selected,
1677            ndarray::array!(
1678                [1.into(), 1.into(), 111111.into()],
1679                [2.into(), 2.into(), 111111.into()],
1680                [3.into(), 3.into(), 111111.into()]
1681            )
1682        );
1683    }
1684    #[rstest]
1685    #[traced_test]
1686    fn merge_test() {
1687        let mut df = column_frame! {
1688            "group_id" => vec![1, 2, 3],
1689            "feed_tag" => vec![1, 2, 3]
1690        };
1691        let df2 = column_frame! {
1692            "group_id" => vec![11, 21, 31],
1693            "feed_tag" => vec![12, 22, 32]
1694        };
1695
1696        let join = JoinRelation::new(JoinBy::Replace);
1697        assert!(df.join(df2, &join).is_ok());
1698        let selected = df.select(None);
1699        trace!("{selected:?}");
1700        assert_eq!(
1701            selected,
1702            ndarray::array!(
1703                [11.into(), 12.into()],
1704                [21.into(), 22.into()],
1705                [31.into(), 32.into()]
1706            )
1707        );
1708    }
1709
1710    #[rstest]
1711    #[traced_test]
1712    fn extend_test() {
1713        let mut df = column_frame! {
1714            "group_id" => vec![1, 2, 3],
1715            "feed_tag" => vec![1, 2, 3]
1716        };
1717        let df2 = column_frame! {
1718            "group_id" => vec![11, 21, 31],
1719            "feed_tag" => vec![5, 6, 7]
1720        };
1721        assert!(df
1722            .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
1723            .is_ok());
1724
1725        let join = JoinRelation::new(JoinBy::Extend);
1726        assert!(df.join(df2, &join).is_ok());
1727        let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
1728        trace!("{selected:?}");
1729        assert_eq!(
1730            selected,
1731            ndarray::array!(
1732                [1.into(), 1.into()],
1733                [2.into(), 2.into()],
1734                [3.into(), 3.into()],
1735                [5.into(), 11.into()],
1736                [6.into(), 21.into()],
1737                [7.into(), 31.into()]
1738            )
1739        );
1740        let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
1741        trace!("{as_map:?}");
1742        assert_eq!(
1743            as_map,
1744            stdhashmap!(
1745                "feed_tag" => vec![1, 2, 3, 5, 6, 7],
1746                "group_id" => vec![1, 2, 3, 11, 21, 31]
1747            )
1748        );
1749
1750        let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
1751        trace!("{as_map:?}");
1752        assert_eq!(as_map, HashMap::default());
1753    }
1754
1755    #[rstest]
1756    #[traced_test]
1757    fn extend_test_with_non_existing_cols() {
1758        let mut df = column_frame! {
1759            "group_id" => vec![1, 2, 3],
1760            "feed_tag" => vec![1, 2, 3]
1761        };
1762        let mut df2 = column_frame! {
1763            "group_id" => vec![11, 21, 31],
1764            "feed_tag" => vec![5, 6, 7],
1765            "clicks" => vec![100, 200, 300],
1766            "impressions" => vec![1000, 2000, 3000]
1767        };
1768        let df_bckp = df.clone();
1769        let join = JoinRelation::new(JoinBy::Extend);
1770        assert!(df.join(df2.clone(), &join).is_ok());
1771        let selected = df.select(None);
1772        trace!("{selected:?}");
1773        assert_eq!(
1774            selected,
1775            ndarray::array!(
1776                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
1777                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
1778                [3.into(), 3.into(), DataValue::Null, DataValue::Null],
1779                [11.into(), 5.into(), 100.into(), 1000.into()],
1780                [21.into(), 6.into(), 200.into(), 2000.into()],
1781                [31.into(), 7.into(), 300.into(), 3000.into()]
1782            )
1783        );
1784        let join = JoinRelation::new(JoinBy::Extend);
1785        let r = df2.join(df_bckp, &join);
1786        assert!(r.is_ok(), "{r:?}");
1787        let selected = df2.select(None);
1788        trace!("{selected:?}");
1789        assert_eq!(
1790            selected,
1791            ndarray::array!(
1792                [11.into(), 5.into(), 100.into(), 1000.into()],
1793                [21.into(), 6.into(), 200.into(), 2000.into()],
1794                [31.into(), 7.into(), 300.into(), 3000.into()],
1795                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
1796                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
1797                [3.into(), 3.into(), DataValue::Null, DataValue::Null]
1798            )
1799        );
1800    }
1801
1802    #[rstest]
1803    #[traced_test]
1804    fn extend_test_with_non_existing_cols_wrong_order() {
1805        let mut df = column_frame! {
1806            "group_id" => vec![1, 2, 3],
1807            "feed_tag" => vec![1, 2, 3]
1808        };
1809        let df2 = column_frame! {
1810            "feed_tag" => vec![5, 6, 7],
1811            "group_id" => vec![11, 21, 31]
1812        };
1813        let join = JoinRelation::new(JoinBy::Extend);
1814        let err = df.join(df2, &join);
1815        assert!(err.is_ok(), "{err:?}");
1816    }
1817
1818    #[rstest]
1819    #[traced_test]
1820    fn test_replace_not_compatible() {
1821        let mut df = column_frame! {
1822            "group_id" => vec![1, 2, 3],
1823            "feed_tag" => vec![1, 2, 3]
1824        };
1825        let df2 = column_frame! {
1826            "feed_tag" => vec![5, 6],
1827            "group_id" => vec![11, 21]
1828        };
1829        let join = JoinRelation::new(JoinBy::Replace);
1830        let err = df.join(df2, &join);
1831        assert!(err.is_err(), "{err:?}");
1832        let empty = ColumnFrame::default();
1833        let err = df.join(empty, &join);
1834        assert!(err.is_ok(), "{err:?}");
1835    }
1836
1837    #[rstest]
1838    #[traced_test]
1839    fn test_different_data() {
1840        let mut df = column_frame! {
1841            "group_id" => vec![1, 2, 3],
1842            "feed_tag" => vec![1, 2, 3]
1843        };
1844        let df2 = column_frame! {
1845            "group_id" => vec![11, 21],
1846            "a" => vec![5, 6]
1847        };
1848        let join = JoinRelation::new(JoinBy::Extend);
1849        let err = df.join(df2, &join);
1850        assert!(err.is_ok(), "{err:?}");
1851        println!("{df:?}");
1852        let expected_df = ColumnFrame::new(
1853            KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
1854            ndarray::array!(
1855                [1.into(), 1.into(), DataValue::Null],
1856                [2.into(), 2.into(), DataValue::Null],
1857                [3.into(), 3.into(), DataValue::Null],
1858                [11.into(), DataValue::Null, 5.into()],
1859                [21.into(), DataValue::Null, 6.into()]
1860            ),
1861        );
1862        assert_eq!(df, expected_df)
1863    }
1864
1865    #[rstest]
1866    #[traced_test]
1867    fn serde_column_frame() {
1868        let df = column_frame! {
1869            "group_id" => vec![1u64, 2u64, 3u64],
1870            "feed_tag" => vec![1u64, 2u64, 3u64]
1871        };
1872        let key_idx = df.index.clone();
1873        let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
1874        let deserialized: KeyIndex =
1875            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
1876        assert_eq!(key_idx, deserialized);
1877        assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
1878        let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
1879        let deserialized: ColumnFrame =
1880            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
1881        assert_eq!(df, deserialized);
1882    }
1883
1884    #[rstest]
1885    #[traced_test]
1886    fn update_value() {
1887        let mut df = column_frame! {
1888            "group_id" => vec![1, 2, 3],
1889            "feed_tag" => vec![1, 2, 3]
1890        };
1891        let group_id: Key = "group_id".into();
1892        let v = df.get_mut_by_row_index(&group_id, 1);
1893        assert!(v.is_some());
1894        let v = v.unwrap();
1895        assert_eq!(v, &DataValue::I32(2));
1896        *v = DataValue::U64(22);
1897        let v = df.get_by_row_index(&group_id, 1);
1898        assert!(v.is_some());
1899        let v = v.unwrap();
1900        assert_eq!(v, &DataValue::U64(22));
1901
1902        assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
1903    }
1904}