Skip to main content

trs_dataframe/dataframe/
column_store.rs

1use ndarray::{concatenate, s, Array, Array1, Array2, ArrayView1, ArrayViewMut2, Axis};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::error::Error;
6use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
7use data_value::{DataValue, Extract};
8use tracing::*;
9mod from;
10mod key_index;
11mod ops;
12pub mod sorted_df;
13pub use key_index::KeyIndex;
14pub mod filter_df;
15
16/// [`ColumnFrame`] is used to store the data for the candidates
17/// The data is stored in the [`Array2`] with the [`DataValue`] values
18/// The data is stored in the columns and the columns are indexed by the [`KeyIndex`]
19/// The [`KeyIndex`] is used to access the data by the column [`Key`]
20/// Memory layout is same like in [`ndarray`] - the data is stored in the row-major order
21#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
22pub struct ColumnFrame {
23    pub index: KeyIndex,
24    pub data_frame: Array2<DataValue>,
25}
26
27enum Continue {
28    Continue,
29    End,
30}
31
32impl Continue {
33    pub fn should_end(&self) -> bool {
34        matches!(self, Self::End)
35    }
36}
37
38use std::fmt;
39
40impl fmt::Display for ColumnFrame {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        // Display keys and and indices
43        write!(f, "\n|")?;
44
45        for key in &self.index.keys {
46            write!(f, " {key} |")?;
47        }
48
49        if self.index.is_empty() {
50            writeln!(f, "|")?;
51        }
52
53        // Display type for each key
54        if let Some(row) = self.data_frame.axis_iter(Axis(0)).next() {
55            write!(f, "\n|")?;
56            for value in row.iter() {
57                // Display types during first iteration
58                write!(f, " {:10?} |", crate::detect_dtype(value))?;
59            }
60            writeln!(f)?;
61        }
62
63        writeln!(f, "---")?;
64
65        // Display items, limit output to 256 rows
66        for (n, row) in self.data_frame.axis_iter(Axis(0)).enumerate() {
67            write!(f, "|")?;
68
69            for value in row.iter() {
70                // Display types during first iteration
71                write!(f, " {value} |")?;
72            }
73            writeln!(f)?;
74
75            if n >= 256 {
76                writeln!(f, "... (dataframe is too long)")?;
77                break;
78            }
79        }
80
81        writeln!(f, "---")
82    }
83}
84/// Converts a [`DataValue`] to the canonical representation for the given [`crate::DataType`].
85///
86/// Uses the [`Extract`] trait for numeric coercion. If `dtype` is [`crate::DataType::Unknown`]
87/// and the value is not `Null`, the type is auto-detected and conversion retried.
88pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
89    let x = &item;
90    match dtype {
91        crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
92        crate::DataType::U32 => DataValue::U32(u32::extract(x)),
93        crate::DataType::I32 => DataValue::I32(i32::extract(x)),
94        crate::DataType::U64 => DataValue::U64(u64::extract(x)),
95        crate::DataType::I64 => DataValue::I64(i64::extract(x)),
96        crate::DataType::F32 => DataValue::F32(f32::extract(x)),
97        crate::DataType::U128 => DataValue::U128(u128::extract(x)),
98        crate::DataType::I128 => DataValue::I128(i128::extract(x)),
99        crate::DataType::F64 => DataValue::F64(f64::extract(x)),
100        crate::DataType::U8 => DataValue::U8(u8::extract(x)),
101        crate::DataType::String => DataValue::String(String::extract(x).into()),
102        crate::DataType::Bytes => item,
103        crate::DataType::Map => item,
104        crate::DataType::Vec => item,
105        crate::DataType::Unknown => {
106            if matches!(item, DataValue::Null) {
107                return item;
108            }
109            let dtype = crate::detect_dtype(&item);
110            // this situation should not ever happen
111            if matches!(dtype, crate::DataType::Unknown) {
112                tracing::error!("Unknown datatype {dtype:?} - {item:?}");
113                return item;
114            }
115            convert_data_value(item, dtype)
116        }
117    }
118}
119/// Converts a [`DataValue`] to match the dtype declared by the column [`Key`].
120///
121/// Shorthand for `convert_data_value(item, key.ctype)`.
122pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
123    convert_data_value(item, key.ctype)
124}
125impl ColumnFrame {
126    /// Creates a new [`ColumnFrame`] from a column index and a 2D data array.
127    ///
128    /// The number of columns in `data_frame` must match the number of keys in `index`.
129    pub fn new<K: Into<KeyIndex>>(index: K, data_frame: Array2<DataValue>) -> Self {
130        Self {
131            index: index.into(),
132            data_frame,
133        }
134    }
135
136    /// Returns the ordered slice of column [`Key`]s in this frame.
137    pub fn keys(&self) -> &[Key] {
138        self.index.get_keys()
139    }
140
141    /// Returns the number of rows in this frame.
142    pub fn len(&self) -> usize {
143        self.data_frame.nrows()
144    }
145
146    /// Returns `true` if the frame contains no rows.
147    pub fn is_empty(&self) -> bool {
148        self.data_frame.nrows() == 0
149    }
150
151    /// Compacts internal storage to reduce memory usage after mutations
152    /// that may have left over-allocated capacity.
153    pub fn shrink(&mut self) {
154        let shape = self.data_frame.shape();
155        if shape[0] > 0 && shape[1] > 0 {
156            let mut new_data = Vec::with_capacity(shape[0] * shape[1]);
157            for elem in self.data_frame.iter() {
158                new_data.push(elem.clone());
159            }
160            if let Ok(arr) = Array2::from_shape_vec((shape[0], shape[1]), new_data) {
161                self.data_frame = arr;
162            }
163        }
164    }
165
166    /// This method will try to fix dtype based on data stored in each column. If dtype is [`crate::DataType::Unknown`]
167    /// this method will replace dtype for "correct" one based on [`DataValue`].
168    /// NOTE: flag `force` will enforce this dtype even if dtype is known
169    pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
170        for i in 0..self.index.keys.len() {
171            let should_fix = force || matches!(self.index.keys[i].ctype, crate::DataType::Unknown);
172
173            if should_fix {
174                let column = self
175                    .get_single_column(&self.index.keys[i])
176                    .ok_or_else(|| Error::EmptyData)?;
177                let dtype = crate::detect_dtype(column.get(0).ok_or_else(|| Error::EmptyData)?);
178                self.index.keys[i].ctype = dtype;
179            }
180        }
181
182        Ok(())
183    }
184    /// Attempts to fix the dtype of every column by inspecting stored values.
185    ///
186    /// Columns whose data does not match their declared dtype are cast in-place.
187    /// Returns `Err` with a list of `(key, message)` pairs for columns that failed casting.
188    pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
189        let mut errors = vec![];
190        let keys = self.index.keys.clone();
191        for key in keys {
192            tracing::trace!("key: {key:?}- {:?}", key.ctype);
193            if let Err(e) = self.try_fix_column_by_key(&key) {
194                errors.push((key, e.to_string()));
195            }
196        }
197        if errors.is_empty() {
198            Ok(())
199        } else {
200            Err(Error::CastFailed(errors))
201        }
202    }
203
204    /// Casts all values in the column identified by `key` to match the key's declared dtype.
205    ///
206    /// Returns an error if the key is not found in the frame.
207    pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
208        let idx = self
209            .index
210            .get_column_index(key)
211            .ok_or(Error::MissingField(format!("{key}").into()))?;
212        let mut col = self.data_frame.column_mut(idx);
213
214        col.mapv_inplace(|item| convert_dv_to_dtype(key, item));
215        Ok(())
216    }
217
218    /// Forces all values in the named column to the specified `dtype`, updating
219    /// both the stored data and the column's [`Key`] metadata.
220    ///
221    /// Returns an error if the column name does not exist.
222    pub fn enforce_dtype_for_column(
223        &mut self,
224        key: &str,
225        dtype: crate::DataType,
226    ) -> Result<(), Error> {
227        if let Some(idx) = self.index.get_column_index_by_name(key) {
228            let new_key = Key::new(key, dtype);
229            let mut col = self.data_frame.column_mut(idx);
230
231            col.mapv_inplace(|item| convert_dv_to_dtype(&new_key, item));
232            self.index.rename_key(key, new_key)?;
233            Ok(())
234        } else {
235            Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
236        }
237    }
238
239    /// Returns a mutable 2D view over the entire data array.
240    ///
241    /// This allows direct element-level mutation without going through
242    /// [`get_mut_by_row_index`](Self::get_mut_by_row_index).
243    pub fn get_mut_view(&mut self) -> ArrayViewMut2<'_, DataValue> {
244        self.data_frame.view_mut()
245    }
246
247    /// Renames a column from `old` (matched by name) to `new`.
248    ///
249    /// The new [`Key`] can carry a different [`DataType`](crate::DataType), which
250    /// effectively re-declares the column's type without casting existing values.
251    pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
252        self.index.rename_key(old, new)
253    }
254
255    /// Registers `alias` as an alternative name for the column named `key`.
256    ///
257    /// After aliasing, `select` and friends accept both the original name and the alias.
258    pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
259        self.index.add_alias(key, alias)
260    }
261
262    /// Selects the data from the [`ColumnFrame`] by the given keys
263    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
264    /// The data is returned as the [`Vec<Vec<DataValue>>`]
265    /// If the keys are not found, the empty [`Vec<Vec<DataValue>>`] is returned
266    /// Returns the data in the column-major order
267    pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
268        let selected = self.select(Some(keys));
269        let mut result = Vec::with_capacity(selected.nrows());
270        for row in selected.rows() {
271            let mut r = Vec::with_capacity(selected.ncols());
272            for value in row.iter() {
273                r.push(D::extract(value));
274            }
275            result.push(r);
276        }
277        result
278    }
279
280    /// Selects the data from the [`ColumnFrame`] by the given keys
281    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
282    /// The data is returned as the [`Array2`] with the [`DataValue`] values
283    /// If the keys are not found, the empty [`Array2`] is returned
284    /// Returns the [`Array2`] with the data in row-major order (same as ndarray default)
285    /// Note: Despite the method name, the data is NOT transposed - it returns data in row-major order
286    /// Use this when you want row-oriented access to the selected columns
287    pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
288        let keys = keys.unwrap_or_else(|| self.index.get_keys());
289        let key_indexes = self.index.select(keys);
290        if key_indexes.is_empty() {
291            return Ok(Array2::default((0, 0)));
292        }
293        let data_vec: Vec<Array1<DataValue>> = key_indexes
294            .indexes()
295            .iter()
296            .map(|x| self.data_frame.column(*x).to_owned())
297            .collect();
298        to_array2(data_vec)
299    }
300
301    /// Selects whole column from the [`ColumnFrame`] by the given key
302    /// If the key is not found, the None is returned
303    /// If the key is found, the [`ArrayView1`] with the [`DataValue`] values is returned
304    pub fn select_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
305        self.index
306            .get_column_index(key)
307            .map(|x| self.data_frame.column(x))
308    }
309
310    /// Applies a user-defined closure to this frame.
311    ///
312    /// The closure receives the provided `keys` and a mutable reference to the
313    /// frame, allowing arbitrary in-place transformations.
314    pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
315    where
316        F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
317    {
318        func(keys, self)
319    }
320
321    /// Validates the access to the entry by the given column [`Key`] and row index
322    /// If the column is not found, an error is returned [`Error::NotFound`]
323    /// If the row index is out of bounds, an error is returned [`Error::IndexOutOfRange`]
324    /// Otherwise, the column index is returned
325    pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
326        if row_index >= self.data_frame.nrows() {
327            return Err(Error::IndexOutOfRange(row_index, self.data_frame.nrows()));
328        }
329        let Some(column_index) = self.index.get_column_index(column) else {
330            return Err(Error::NotFound(column.clone()));
331        };
332        Ok(column_index)
333    }
334
335    /// Returns the value [`DataValue`] for the given column defined by [`Key`] and row index
336    /// If the column is not found, None is returned
337    /// If the row index is out of bounds, None is returned
338    pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<&DataValue> {
339        trace!(
340            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
341            self.data_frame.len(),
342            self.data_frame.nrows()
343        );
344        trace!("{:?}", self.data_frame);
345        match self.validate_entry_access(column, row_index) {
346            Ok(column_index) => self.data_frame.get((row_index, column_index)),
347            Err(e) => {
348                trace!("Error: {e}");
349                None
350            }
351        }
352    }
353
354    /// Returns mutable reference for the value [`DataValue`] for the given column defined by [`Key`] and row index
355    /// If the column is not found, None is returned
356    /// If the row index is out of bounds, None is returned
357    pub fn get_mut_by_row_index(
358        &mut self,
359        column: &Key,
360        row_index: usize,
361    ) -> Option<&mut DataValue> {
362        trace!(
363            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
364            self.data_frame.len(),
365            self.data_frame.nrows()
366        );
367        trace!("{:?}", self.data_frame);
368        match self.validate_entry_access(column, row_index) {
369            Ok(column_index) => self.data_frame.get_mut((row_index, column_index)),
370            Err(e) => {
371                trace!("Error: {e}");
372                None
373            }
374        }
375    }
376
377    /// Returns the value [`HashMap<Key, Vec<DataValue>>`] for the given columns defined by [`Key`].
378    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
379    /// If the keys are not found, the empty [`HashMap`] is returned
380    /// Returns the [`Array2`] with the data in the row-major order
381    pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
382        let keys = keys.unwrap_or_else(|| self.index.get_keys());
383        let indexes = self.index.select(keys);
384        if indexes.is_empty() {
385            return Default::default();
386        }
387
388        let mut new_data_frame = HashMap::with_capacity(keys.len());
389
390        for key in keys.iter() {
391            if let Some(column_index_in_source) = indexes.get_column_index(key) {
392                let column = self.data_frame.column(column_index_in_source);
393                new_data_frame.insert(key.clone(), column.to_vec());
394            }
395        }
396
397        new_data_frame
398    }
399
400    /// Returns the value [`Array2<DataValue>`] for the given columns defined by [`Key`].
401    /// If the keys are not provided, the data is selected by the [`KeyIndex`] keys
402    /// If the keys are not found, the empty [`Array2`] is returned
403    /// Returns the [`Array2`] with the data in the row-major order
404    ///
405    /// This operation clones all selected data. For read-only access, consider using
406    /// [`Self::select_column`] to get a view of a single column without copying.
407    pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
408        let keys = keys.unwrap_or_else(|| self.index.get_keys());
409        let indexes = self.index.select(keys);
410        if indexes.is_empty() || keys.is_empty() {
411            return Array2::default((0, 0));
412        }
413
414        let nrows = self.data_frame.nrows();
415        let ncols = keys.len();
416
417        // Pre-allocate vector with exact capacity
418        let mut data = Vec::with_capacity(nrows * ncols);
419
420        // Collect valid column mappings
421        let col_mappings: Vec<(usize, usize)> = keys
422            .iter()
423            .enumerate()
424            .filter_map(|(dst_idx, key)| {
425                indexes
426                    .get_column_index(key)
427                    .map(|src_idx| (dst_idx, src_idx))
428            })
429            .collect();
430
431        // Fill with nulls first, then overwrite valid data
432        data.resize(nrows * ncols, DataValue::Null);
433
434        // Copy column by column - better for row-major layout
435        for row_idx in 0..nrows {
436            for (dst_col, src_col) in &col_mappings {
437                let dst_idx = row_idx * ncols + dst_col;
438                data[dst_idx] = self.data_frame[(row_idx, *src_col)].clone();
439            }
440        }
441
442        Array2::from_shape_vec((nrows, ncols), data).unwrap_or_else(|_| Array2::default((0, 0)))
443    }
444
445    /// Returns selected columns as a typed 2D array, converting each [`DataValue`]
446    /// via the [`Extract`] trait.
447    ///
448    /// This is the typed counterpart of [`select`](Self::select). If `keys` is `None`,
449    /// all columns are returned. The data is in row-major order (rows × columns).
450    ///
451    /// # Type coercion
452    ///
453    /// The [`Extract`] trait performs best-effort numeric coercion (e.g. `I32 -> f64`).
454    /// Values that cannot be meaningfully converted yield the type's default
455    /// (0 for numbers, `false` for bool, empty string for `String`).
456    pub fn select_typed<T: Extract>(&self, keys: Option<&[Key]>) -> Array2<T> {
457        self.select(keys).mapv(|x| T::extract(&x))
458    }
459
460    fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
461        self.index.store_key(key);
462        let len = self.data_frame.nrows();
463        self.data_frame.push_column(Array1::default(len).view())?;
464        Ok(())
465    }
466
467    /// Pushes the row candidate into the [`ColumnFrame`]
468    /// If the column is not found this method will add the column to the [`ColumnFrame`]
469    ///
470    /// This operation clones all values from the candidate. For batch operations,
471    /// consider using [`Self::extend`] instead which can be more efficient.
472    pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
473        // Pre-allocate with capacity to avoid reallocations
474        let num_keys = self.index.len();
475        let candidate_keys = row_candidate.keys();
476        let mut arr = Vec::with_capacity(num_keys.max(candidate_keys.len()));
477
478        // First pass: extend dataframe with any new columns
479        for key in &candidate_keys {
480            if self.index.get_column_index(key).is_none() {
481                self.extend_dataframe_for_column(key.clone())?;
482            }
483        }
484
485        // Second pass: collect values - capacity may have changed after extension
486        arr.reserve(self.index.len());
487        for index in self.index.get_keys() {
488            arr.push(
489                row_candidate
490                    .get_value_ref(index)
491                    .cloned()
492                    .unwrap_or(DataValue::Null),
493            );
494        }
495
496        self.data_frame.push_row(Array::from_vec(arr).view())?;
497        Ok(())
498    }
499
500    /// Removes the specified columns from this frame and returns them as a new [`ColumnFrame`].
501    ///
502    /// The remaining columns stay in `self` with their data intact.
503    pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
504        // fixme this is naive approach
505        let mut indexes = KeyIndex::default();
506        // take remove data
507        let data = self.select(Some(keys));
508        // remove labels from the index
509        for key in keys {
510            if let Some((current, _idx)) = self.index.remove_key(key) {
511                indexes.store_key(current);
512            }
513        }
514        // copy the rest of the data to the new data frame with new index
515        let rest = self.select(Some(self.keys()));
516        let keys = self.index.get_keys().to_vec();
517        self.data_frame = rest;
518        self.index = KeyIndex::new(keys);
519
520        //remove_self.data_frame = to_array2(columns)?;
521        Ok(Self::new(indexes, data))
522    }
523
524    fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
525        if self.index.is_empty() {
526            self.index = other.index.clone();
527            self.data_frame = other.data_frame.clone();
528            return Ok(Continue::End);
529        }
530        if other.index.is_empty() {
531            return Ok(Continue::End);
532        }
533        if self.is_empty() {
534            self.data_frame = Array2::default((other.data_frame.nrows(), self.index.len()));
535        }
536
537        Ok(Continue::Continue)
538    }
539
540    fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
541        let missing_keys: Vec<Key> = other
542            .index
543            .get_keys()
544            .iter()
545            .filter(|key| self.index.get_column_index(key).is_none())
546            .cloned()
547            .collect();
548
549        if missing_keys.is_empty() {
550            return Ok(());
551        }
552
553        for key in missing_keys {
554            self.index.store_key(key);
555        }
556
557        let nrows = self.data_frame.nrows();
558        let new_cols = self.index.len() - self.data_frame.ncols();
559
560        if new_cols > 0 {
561            // Use ndarray's optimized concatenate - faster than manual copying
562            let new_data = Array2::default((nrows, new_cols));
563            self.data_frame = concatenate(Axis(1), &[self.data_frame.view(), new_data.view()])?;
564        }
565
566        Ok(())
567    }
568
569    fn try_extend(&mut self, other: Self) -> Result<(), Error> {
570        let mut joined_keys = self.index.clone();
571        for key in other.keys() {
572            if self.index.get_column_index(key).is_none() {
573                joined_keys.store_key(key.clone());
574            }
575        }
576
577        let sum_len = self.data_frame.nrows() + other.data_frame.nrows();
578        let mut arr = Array2::default((sum_len, joined_keys.len()));
579        let increment = self.data_frame.nrows();
580
581        for key in joined_keys.get_keys() {
582            let index_result = joined_keys.get_column_index(key).ok_or_else(|| {
583                Error::UnknownError(format!(
584                    "Index lookup failed for key '{}' in try_extend",
585                    key.name()
586                ))
587            })?;
588
589            let mut col = arr.column_mut(index_result);
590
591            if let Some(index) = self.index.get_column_index(key) {
592                let src_col = self.data_frame.column(index);
593                col.slice_mut(s![..increment]).assign(&src_col);
594            }
595
596            if let Some(index) = other.index.get_column_index(key) {
597                let src_col = other.data_frame.column(index);
598                col.slice_mut(s![increment..]).assign(&src_col);
599            }
600        }
601
602        *self = ColumnFrame::new(joined_keys, arr);
603        Ok(())
604    }
605
606    /// Extends the [`ColumnFrame`] with the data from the other [`ColumnFrame`]
607    /// If the [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
608    /// If the other [`KeyIndex`] is empty, nothing happens
609    /// If the length of the [`KeyIndex`] of the other data frame is greater then current,
610    /// an error is returned [`Error::DataSetSizeDoesntMatch`]
611    /// If [`Key`] from other data frame - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
612    ///
613    pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
614        if self.check_or_init_frame(&other)?.should_end() {
615            return Ok(());
616        }
617
618        if self.index.check_order_of_indexes(&other.index).is_err() {
619            return self.try_extend(other);
620        }
621
622        trace!(
623            "Extend columns from other {:?} vs {:?}",
624            other.index.get_keys(),
625            self.index.get_keys()
626        );
627
628        if other.data_frame.ncols() < self.data_frame.ncols() {
629            other.extend_columns_from_other(self)?;
630        } else {
631            self.extend_columns_from_other(&other)?;
632        }
633        self.data_frame = concatenate(Axis(0), &[self.data_frame.view(), other.data_frame.view()])?;
634
635        Ok(())
636    }
637
638    /// Replace the [`ColumnFrame`] with the other [`ColumnFrame`]
639    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
640    /// If the other [`KeyIndex`] is empty, nothing happens
641    /// If the [`KeyIndex`] of the other data frame and current doesn't match an error is returned [`Error::DataSetSizeDoesntMatch`]
642    /// If the [`Key`] from other data frame is not present in the current [`ColumnFrame`] - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
643    pub fn replace(&mut self, other: Self) -> Result<(), Error> {
644        if self.check_or_init_frame(&other)?.should_end() {
645            return Ok(());
646        }
647
648        if self.data_frame.len() > other.data_frame.len() {
649            return Err(Error::DataSetSizeDoesntMatch(
650                self.data_frame.len(),
651                other.data_frame.len(),
652            ));
653        }
654        self.index = other.index;
655        self.data_frame = other.data_frame;
656
657        Ok(())
658    }
659
660    /// Joins the candidates by the keys in the `JoinRelation::JoinById` struct.
661    /// This function creates [`Index`] for the keys and then joins the candidates by the keys.
662    ///
663    pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
664        if self.check_or_init_frame(&right)?.should_end() {
665            return Ok(());
666        }
667
668        let timer = std::time::Instant::now();
669        let new_columns = right.index.get_complement_keys(self.index.get_keys());
670
671        // add new columns and keys into the column frame index
672        self.extend_columns_from_other(&right)?;
673        tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
674
675        // Pre-compute column index mappings AFTER extension to get correct indices
676        let column_mappings: Vec<(usize, usize)> = new_columns
677            .iter()
678            .filter_map(|key| {
679                let left_idx = self.index.get_column_index(key)?;
680                let right_idx = right.index.get_column_index(key)?;
681                Some((left_idx, right_idx))
682            })
683            .collect();
684
685        // get the indexes for the keys
686        let timer = std::time::Instant::now();
687        let index = Index::new(keys.to_vec(), self);
688        tracing::debug!("Left index build took: {}ns", timer.elapsed().as_nanos());
689        tracing::trace!("Index {index:?}");
690
691        let timer = std::time::Instant::now();
692        let right_index = Index::new(keys.to_vec(), &right);
693        let joined_idx = index.join(right_index);
694        tracing::debug!(
695            "Right index build and join took: {}ns",
696            timer.elapsed().as_nanos()
697        );
698
699        // Instead of copying all data, we only fill in the new columns
700        // The existing columns are already correct from extend_columns_from_other
701        let timer = std::time::Instant::now();
702        let joined_idx_len = joined_idx.len();
703
704        for (left_col_idx, right_col_idx) in &column_mappings {
705            let mut left_col = self.data_frame.column_mut(*left_col_idx);
706            let right_col = right.data_frame.column(*right_col_idx);
707
708            for (left_indices, right_indices) in &joined_idx {
709                for right_row_idx in right_indices {
710                    for left_idx in left_indices {
711                        left_col[*left_idx] = right_col[*right_row_idx].clone();
712                    }
713                }
714            }
715        }
716
717        let elapsed = timer.elapsed();
718        tracing::debug!(
719            "Filled {} rows in {}ms|{}s",
720            joined_idx_len,
721            elapsed.as_millis(),
722            elapsed.as_secs()
723        );
724
725        Ok(())
726    }
727
728    /// Adds the single column to the current [`ColumnFrame`]
729    /// If the column is already present, an error is returned [`Error::ColumnAlreadyExists`]
730    /// If the length of the column is different from the current data frame, an error is returned [`Error::DataSetSizeDoesntMatch`]
731    pub fn add_single_column<K: Into<Key>>(
732        &mut self,
733        key: K,
734        column: Array1<DataValue>,
735    ) -> Result<(), Error> {
736        let key = key.into();
737        if self.index.get_column_index(&key).is_some() {
738            return Err(Error::ColumnAlreadyExists(key));
739        }
740        if self.len() != column.len() && !self.is_empty() {
741            return Err(Error::DataSetSizeDoesntMatch(self.len(), column.len()));
742        }
743
744        self.index.store_key(key.clone());
745        let rows = column.len();
746        let column_index = self
747            .index
748            .get_column_index(&key)
749            .ok_or(Error::UnknownError(format!("Column {key} should exists")))?;
750        if self.is_empty() && self.index.len() == 1 {
751            self.data_frame = column.into_shape_clone((rows, 1))?;
752            assert_eq!(self.data_frame.column(column_index).len(), rows);
753        } else if self.is_empty() {
754            self.data_frame = Array2::default((column.len(), self.index.len() - 1));
755            self.data_frame.push_column(column.view())?;
756            assert_eq!(self.data_frame.column(column_index).len(), rows);
757        } else {
758            self.data_frame.push_column(column.view())?;
759        }
760        assert_eq!(self.data_frame.column(column_index).len(), rows);
761
762        Ok(())
763    }
764    /// Adds the columns from the other [`ColumnFrame`] to the current [`ColumnFrame`]
765    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
766    /// If the other [`KeyIndex`] is empty, nothing happens
767    pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
768        if self.check_or_init_frame(&other)?.should_end() {
769            return Ok(());
770        }
771
772        self.extend_columns_from_other(&other)?;
773        for (idx, key) in other.index.get_keys().iter().enumerate() {
774            if let Some(index) = self.index.get_column_index(key) {
775                trace!("Other array = {:?}", other.data_frame.dim());
776                if other.data_frame.dim() == (0, 0) {
777                    self.data_frame.column_mut(index).fill(DataValue::Null);
778                    continue;
779                }
780                let arr = other.data_frame.column(idx);
781                trace!(
782                    "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
783                    self.data_frame.nrows(),
784                    arr.len()
785                );
786                if arr.len() != self.data_frame.nrows() {
787                    self.data_frame.column_mut(index).fill(DataValue::Null);
788                } else {
789                    self.data_frame.column_mut(index).assign(&arr);
790                }
791            }
792        }
793        Ok(())
794    }
795
796    /// Broadcasts the data from the other [`ColumnFrame`] to the current [`ColumnFrame`]
797    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
798    /// If the other [`KeyIndex`] is empty, nothing happens
799    /// If the length (number of rows) of the other data frame is greater then 1 an error is returned [`Error::CannotBroadcast`]
800    pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
801        if self.check_or_init_frame(&other)?.should_end() {
802            return Ok(());
803        }
804        if other.data_frame.nrows() != 1 {
805            return Err(Error::CannotBroadcast);
806        }
807
808        // Get all keys (current + new from other)
809        let all_keys = self.index.get_keys().to_vec();
810        let other_keys: Vec<_> = other
811            .index
812            .get_keys()
813            .iter()
814            .filter(|k| self.index.get_column_index(k).is_none())
815            .cloned()
816            .collect();
817
818        // Add new keys to index
819        for key in &other_keys {
820            self.index.store_key(key.clone());
821        }
822
823        let nrows = self.len();
824        let ncols = self.index.len();
825        let ncols_old = all_keys.len();
826
827        // Pre-allocate result buffer
828        let mut data = Vec::with_capacity(nrows * ncols);
829
830        // Build result row by row - single pass allocation
831        for row_idx in 0..nrows {
832            // Copy existing columns
833            for col_idx in 0..ncols_old {
834                data.push(self.data_frame[(row_idx, col_idx)].clone());
835            }
836            // Broadcast values from other dataframe (only 1 row in other)
837            for key in &other_keys {
838                if let Some(other_idx) = other.index.get_column_index(key) {
839                    data.push(other.data_frame[(0, other_idx)].clone());
840                }
841            }
842        }
843
844        self.data_frame = Array2::from_shape_vec((nrows, ncols), data)
845            .map_err(|e| Error::UnknownError(format!("Broadcast reshape failed: {e}")))?;
846
847        Ok(())
848    }
849
850    /// Computes the Cartesian product of the input structures,
851    /// resulting in all possible combinations of elements.
852    /// The data is stored in the row-major order
853    /// The keys are stored in the order they are added - the order is preserved - new keys from the `other` [`ColumnFrame`] are added to the end
854    pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
855        if self.check_or_init_frame(&other)?.should_end() {
856            return Ok(());
857        }
858        // extend the columns
859        // let mut new_index = self.index.clone();
860        for other_key in other.keys() {
861            if self.index.get_column_index(other_key).is_none() {
862                self.index.store_key(other_key.clone());
863            } else {
864                self.index.store_key(Key::new(
865                    format!("{}-{}", other_key, other_key.id()).as_str(),
866                    other_key.ctype,
867                ));
868            }
869        }
870        let max_rows = self.len() * other.len();
871        let ncols = self.index.len();
872        // create new data frame
873        let mut new_df = Array2::default((max_rows, ncols));
874
875        let mut cur_idx = 0;
876        for cur_row in self.data_frame.rows() {
877            for other_row in other.data_frame.rows() {
878                new_df
879                    .slice_mut(s![cur_idx, ..])
880                    .assign(&concatenate(Axis(0), &[cur_row, other_row])?);
881                cur_idx += 1;
882            }
883        }
884        self.data_frame = new_df;
885        Ok(())
886    }
887
888    /// Joins the candidates with the other candidates by the [`JoinRelation`] policy.
889    /// For [`JoinBy::AddColumns`] the columns are added to the existing structure via [`Self::add_columns`]
890    /// For [`JoinBy::Replace`] the columns are replaced with the new columns
891    /// For [`JoinBy::Extend`] the candidates are extended via [`Self::extend`]
892    /// For [`JoinBy::Broadcast`] each candidate is extended with the values of the other candidates `Self::broadcast`
893    /// For [`JoinBy::CartesianProduct`] the candidates are multiplied by the other candidates
894    /// For [`JoinBy::JoinById`] the candidates are joined by the keys in the `JoinRelation::JoinById` struct see [`Self::join_by_id_inner`]
895    pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
896        use JoinBy::*;
897        match &join_type.join_type {
898            AddColumns => self.add_columns(right),
899            Replace => self.replace(right),
900            Extend => self.extend(right),
901            Broadcast => self.broadcast(right),
902            CartesianProduct => self.cartesian_product(right),
903            JoinById(join) => self.join_by_id_inner(right, &join.keys),
904        }
905    }
906
907    /// Returns a read-only view of a single column, or `None` if the key is absent.
908    ///
909    /// This is a zero-copy operation — the returned [`ArrayView1`] borrows directly
910    /// from the underlying storage.
911    pub fn get_single_column(&self, key: &Key) -> Option<ArrayView1<'_, DataValue>> {
912        self.index
913            .get_column_index(key)
914            .map(|x| self.data_frame.column(x))
915    }
916
917    /// Returns a column extracted into a typed [`Array1<T>`], where each [`DataValue`]
918    /// is converted via the [`Extract`] trait.
919    ///
920    /// This is a convenience wrapper around [`get_single_column`](Self::get_single_column)
921    /// that maps every element through `T::extract`, producing an owned array of the
922    /// target type. Returns `None` if the key does not exist in the frame.
923    ///
924    /// # Type coercion
925    ///
926    /// The [`Extract`] trait performs best-effort numeric coercion (e.g. `I32 -> f64`).
927    /// Values that cannot be meaningfully converted yield the type's default
928    /// (0 for numbers, `false` for bool, empty string for `String`).
929    pub fn get_single_column_typed<T: Extract>(&self, key: &Key) -> Option<Array1<T>> {
930        self.get_single_column(key)
931            .map(|x| x.mapv(|x| T::extract(&x)))
932    }
933
934    /// Returns a [`SortedDataFrame`](sorted_df::SortedDataFrame) view sorted ascending by the given column.
935    ///
936    /// `Null` values are pushed to the end. Ties are broken by original row order.
937    /// Use [`SortedDataFrame::topn`](sorted_df::SortedDataFrame::topn) to efficiently
938    /// extract the first/last N rows.
939    pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
940        let index = self
941            .index
942            .get_column_index(key)
943            .ok_or(Error::NotFound(key.clone()))?;
944        let column = self.data_frame.column(index);
945        let mut data_with_index = column.iter().enumerate().collect::<Vec<_>>();
946        tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
947        data_with_index.sort_by(
948            |(a_idx, a_val), (b_idx, b_val)| match a_val.partial_cmp(b_val) {
949                Some(ordering) => ordering.then_with(|| a_idx.cmp(b_idx)),
950                None => {
951                    let a_null = matches!(a_val, DataValue::Null);
952                    let b_null = matches!(b_val, DataValue::Null);
953                    match (a_null, b_null) {
954                        (true, true) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
955                        (true, false) => std::cmp::Ordering::Greater.then_with(|| a_idx.cmp(b_idx)),
956                        (false, true) => std::cmp::Ordering::Less.then_with(|| a_idx.cmp(b_idx)),
957                        (false, false) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
958                    }
959                }
960            },
961        );
962
963        tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
964        let indicies = data_with_index
965            .into_iter()
966            .map(|(idx, _)| idx)
967            .collect::<Vec<_>>();
968
969        Ok(sorted_df::SortedDataFrame::new(self, indicies))
970    }
971
972    /// Returns a new [`ColumnFrame`] containing only rows that match the filter expression.
973    ///
974    /// The filter is applied against each row and matching row indices are collected,
975    /// de-duplicated, and used to build the result.
976    pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
977        let mut final_indices = Vec::new();
978        let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
979        for rule in &filter.rules {
980            final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
981        }
982
983        final_indices.sort_unstable();
984        final_indices.dedup();
985        let mut new_df = ColumnFrame::new(
986            self.index.clone(),
987            Array2::default((final_indices.len(), self.index.len())),
988        );
989        final_indices
990            .iter()
991            .enumerate()
992            .for_each(|(cur_idx, row_idx)| {
993                new_df
994                    .data_frame
995                    .slice_mut(s![cur_idx, ..])
996                    .assign(&self.data_frame.slice(s![*row_idx, ..]));
997            });
998
999        Ok(new_df)
1000    }
1001}
1002
1003/// Converts a `Vec` of 1D arrays into a 2D array.
1004///
1005/// Each [`Array1`] in `source` becomes one row of the resulting [`Array2`].
1006pub fn to_array2<T: Clone>(source: Vec<Array1<T>>) -> Result<Array2<T>, Error> {
1007    let width = source.len();
1008    let flattened: Array1<T> = source.into_iter().flat_map(|row| row.to_vec()).collect();
1009    let height = flattened.len() / width;
1010    Ok(flattened.into_shape_with_order((width, height))?)
1011}
1012#[macro_export]
1013macro_rules! df {
1014    ($($everything:tt)*) => {
1015        $crate::DataFrame::new($crate::column_frame!($($everything)*))
1016    };
1017}
1018
1019#[macro_export]
1020macro_rules! column_frame {
1021    // case { "a" => 1, }
1022    ($($key:expr => $value:expr,)+) => { $crate::column_frame!($($key => $value),+) };
1023    // case { "a" => vec![1, 2, 3] }
1024    ($($key:expr => vec![$($value:expr),*]),*) => {
1025        $crate::column_frame!($($key => [$($value),*]),*)
1026    };
1027    // case { "a" => [1, 2, 3] }
1028    ($($key:expr => [$($value:expr),*]),*) => {
1029        {
1030           let data = ::ndarray::arr2(&[$(
1031                [$($value.into(),)*],
1032            )*]);
1033
1034           let _keys = vec![$($key.into(),)*];
1035
1036            $crate::ColumnFrame::new(
1037                $crate::KeyIndex::new(_keys),
1038                data.reversed_axes()
1039            )
1040        }
1041    };
1042    // case { "a" => 1 }
1043    ($($key:expr => $value:expr),*) => {
1044        {
1045           let _data = ::ndarray::arr2(&[[$($value.into(),)*]]);
1046           let _keys = vec![$($key.into(),)*];
1047
1048            $crate::ColumnFrame::new(
1049                $crate::KeyIndex::new(_keys),
1050                _data,
1051            )
1052        }
1053    };
1054}
1055
1056#[cfg(test)]
1057mod test {
1058    use crate::{filter::FilterRules, JoinById};
1059
1060    use super::*;
1061    use data_value::stdhashmap;
1062    use ndarray::ArrayView;
1063    use rstest::*;
1064    use tracing_test::traced_test;
1065
1066    #[rstest]
1067    #[case(
1068        column_frame! {
1069            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1070            "b" => [4, 5, 6],
1071            "c" => [7, 8, 9]
1072        },
1073        column_frame! {
1074            "t" => [1752001987000000u64],
1075            "b" => [5],
1076            "c" => [8]
1077        },
1078        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1079    )]
1080    #[case(
1081        column_frame! {
1082            "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
1083            "b" => [4, 5, 6],
1084            "c" => [7, 8, 9]
1085        },
1086        column_frame! {
1087            "t" => [1752001987000000f64],
1088            "b" => [5],
1089            "c" => [8]
1090        },
1091        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1092    )]
1093    #[case(
1094        column_frame! {
1095            "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
1096            "b" => [4, 5, 6],
1097            "c" => [7, 8, 9]
1098        },
1099        column_frame! {
1100            "t" => [1752001987000000i64],
1101            "b" => [5],
1102            "c" => [8]
1103        },
1104        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1105    )]
1106    #[case(
1107        column_frame! {
1108            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1109            "b" => [4, 5, 6],
1110            "c" => [7, 8, 9]
1111        },
1112        column_frame! {
1113            "t" => [1751001987000000u64],
1114            "b" => [4],
1115            "c" => [7]
1116        },
1117        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1118    )]
1119    #[case(
1120        column_frame! {
1121            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1122            "b" => [4, 5, 6],
1123            "c" => [7, 8, 9]
1124        },
1125        column_frame! {
1126            "t" => ["2025-07-08 18:13:07"],
1127            "b" => [4],
1128            "c" => [7]
1129        },
1130        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1131    )]
1132    #[case(
1133        column_frame! {
1134            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1135            "b" => [4, 5, 6],
1136            "c" => [7, 8, 9]
1137        },
1138        column_frame! {
1139            "t" => [],
1140            "b" => [],
1141            "c" => []
1142        },
1143        FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
1144    )]
1145    #[case(
1146        column_frame! {
1147            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1148            "b" => [4, 5, 6],
1149            "c" => [7, 8, 9]
1150        },
1151        column_frame! {
1152            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1153            "b" => [4, 5, 6],
1154            "c" => [7, 8, 9]
1155        },
1156        FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
1157    )]
1158    #[case(
1159        column_frame! {
1160            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1161            "b" => [4, 5, 6],
1162            "c" => [7, 8, 9]
1163        },
1164        column_frame! {
1165            "t" => [DataValue::Vec(vec![])],
1166            "b" => [5],
1167            "c" => [ 8]
1168        },
1169        FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
1170    )]
1171    #[case(
1172        column_frame! {
1173            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1174            "b" => [4, 5, 6],
1175            "c" => [7, 8, 9]
1176        },
1177        column_frame! {
1178            "t" => [DataValue::Vec(vec![1.into()])],
1179            "b" => [6],
1180            "c" => [9]
1181        },
1182        FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
1183    )]
1184    #[case(
1185        column_frame! {
1186            "a" => [1, 2, 3],
1187            "b" => [4, 5, 6],
1188            "c" => [7, 8, 9]
1189        },
1190        column_frame! {
1191            "a" => [1, 2],
1192            "b" => [4, 5],
1193            "c" => [7, 8]
1194        },
1195        FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
1196    )]
1197    #[case(
1198        column_frame! {
1199            "a" => [1, 2, 3],
1200            "b" => [4, 5, 6],
1201            "c" => [7, 8, 9]
1202        },
1203        column_frame! {
1204            "a" => [2],
1205            "b" => [5],
1206            "c" => [8]
1207        },
1208        FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
1209    )]
1210    #[case(
1211        column_frame! {
1212            "a" => [1, 2, 3],
1213            "b" => [4, 5, 6],
1214            "c" => [7, 8, 9]
1215        },
1216        column_frame! {
1217            "a" => [],
1218            "b" => [],
1219            "c" => []
1220        },
1221        FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
1222    )]
1223    #[case(
1224        column_frame! {
1225            "a" => [1, 2, 3],
1226            "b" => [4, 5, 6],
1227            "c" => [7, 8, 9]
1228        },
1229        column_frame! {
1230            "a" => [1, 2],
1231            "b" => [4, 5],
1232            "c" => [7, 8]
1233        },
1234        FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1235    )]
1236    #[case(
1237        column_frame! {
1238            "a" => [1, 2, 3],
1239            "b" => [4, 5, 6],
1240            "c" => [7, 8, 9]
1241        },
1242        column_frame! {
1243            "a" => [2],
1244            "b" => [5],
1245            "c" => [8]
1246        },
1247        FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1248    )]
1249    #[case(
1250        column_frame! {
1251            "a" => ["abcd", "ab", "abcdefg"],
1252            "b" => [4, 5, 6],
1253            "c" => [7, 8, 9]
1254        },
1255        column_frame! {
1256            "a" => ["abcd","abcdefg"],
1257            "b" => [4, 6],
1258            "c" => [7, 9]
1259        },
1260        FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1261    )]
1262    #[case(
1263        column_frame! {
1264            "a" => [1, 2, 3],
1265            "b" => [4, 5, 6],
1266            "c" => [7, 8, 9]
1267        },
1268        column_frame! {
1269            "a" => [1],
1270            "b" => [4],
1271            "c" => [7]
1272        },
1273        FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1274    )]
1275    #[case(
1276        column_frame! {
1277            "a" => [1, 2, 3],
1278            "b" => [4, 5, 6],
1279            "c" => [7, 8, 9]
1280        },
1281        column_frame! {
1282            "a" => [2, 3],
1283            "b" => [5, 6],
1284            "c" => [8, 9]
1285        },
1286        FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1287    )]
1288    #[case(
1289        column_frame! {
1290            "a" => [1f64, 2f64, 3f64],
1291            "b" => [4, 5, 6],
1292            "c" => [7, 8, 9]
1293        },
1294        column_frame! {
1295            "a" => [1f64, 2f64],
1296            "b" => [4, 5],
1297            "c" => [7, 8]
1298        },
1299        FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1300    )]
1301    #[case(
1302        column_frame! {
1303            "a" => [1f64, 2f64, 3f64],
1304            "b" => [4i64, 5i64, 6i64],
1305            "c" => [7i64, 8i64, 9i64]
1306        },
1307        column_frame! {
1308            "a" => [1f64, 2f64],
1309            "b" => [4i64, 5i64],
1310            "c" => [7i64, 8i64]
1311        },
1312        FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1313    )]
1314    #[traced_test]
1315    fn filter_test(
1316        #[case] df: ColumnFrame,
1317        #[case] expected: ColumnFrame,
1318        #[case] filter: FilterRules,
1319    ) {
1320        let filtered = df.filter(&filter).expect("BUG: cannot filter");
1321        assert_eq!(filtered, expected);
1322    }
1323
1324    #[rstest]
1325    #[traced_test]
1326    fn test_macro() {
1327        let df = column_frame! {
1328            "a" => 1,
1329            "b" => 2,
1330            "c" => 3,
1331            "d" => 4,
1332        };
1333
1334        assert_eq!(df.len(), 1);
1335        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1336        let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1337            .expect("BUG: cannot create array");
1338        assert_eq!(df.select(None), f);
1339
1340        let df = column_frame! {
1341            "a" => [1, 2, 3],
1342            "b" => [4, 5, 6],
1343            "c" => [7, 8, 9]
1344        };
1345
1346        assert_eq!(df.len(), 3);
1347        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1348        let f = Array2::from_shape_vec(
1349            (3, 3),
1350            vec![
1351                1.into(),
1352                4.into(),
1353                7.into(),
1354                2.into(),
1355                5.into(),
1356                8.into(),
1357                3.into(),
1358                6.into(),
1359                9.into(),
1360            ],
1361        )
1362        .expect("BUG: cannot create array");
1363        let selected = df.select(None);
1364        trace!("{selected:?}");
1365        assert_eq!(selected, f);
1366
1367        let df1 = df! {
1368            "a" => [1, 2, 3],
1369            "b" => [4, 5, 6],
1370            "c" => [7, 8, 9]
1371        };
1372
1373        // just to make sure we've tested Display
1374        let formatted = format!("{}", df);
1375        debug!("{}", formatted);
1376
1377        assert_eq!(df1, crate::DataFrame::from(df));
1378    }
1379
1380    #[rstest]
1381    #[case(
1382        column_frame! {
1383            "a" => [1, 2, 3],
1384            "b" => [4, 5, 6],
1385            "c" => [7, 8, 9]
1386        },
1387        column_frame! {
1388            "a_new" => [1, 2, 3],
1389            "b" => [4, 5, 6],
1390            "c" => [7, 8, 9]
1391        },
1392        vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1393        vec![("a", "a_new".into())]
1394    )]
1395    #[traced_test]
1396    fn rename_test(
1397        #[case] df: ColumnFrame,
1398        #[case] expected: ColumnFrame,
1399        #[case] keys: Vec<Key>,
1400        #[case] renames: Vec<(&str, Key)>,
1401    ) {
1402        let mut df = df;
1403        for (old, new) in renames {
1404            df.rename_key(old, new).expect("BUG: cannot rename key");
1405        }
1406        assert_eq!(df, expected);
1407        assert_eq!(df.keys(), keys.as_slice());
1408    }
1409
1410    #[rstest]
1411    #[case(
1412        column_frame!("a" => [1, 2, 3]),
1413        Key::new("a", crate::DataType::I32),
1414        column_frame!("a" => [1i32, 2i32, 3i32])
1415    )]
1416    #[case(
1417        column_frame!("a" => [1, 2, 3]),
1418        Key::new("a", crate::DataType::U32),
1419        column_frame!("a" => [1u32, 2u32, 3u32])
1420    )]
1421    #[case(
1422        column_frame!("a" => [1, 2, 3]),
1423        Key::new("a", crate::DataType::I64),
1424        column_frame!("a" => [1i64, 2i64, 3i64])
1425    )]
1426    #[case(
1427        column_frame!("a" => [1, 2, 3]),
1428        Key::new("a", crate::DataType::U64),
1429        column_frame!("a" => [1u64, 2u64, 3u64])
1430    )]
1431    #[case(
1432        column_frame!("a" => [1, 2, 3]),
1433        Key::new("a", crate::DataType::F64),
1434        column_frame!("a" => [1f64, 2f64, 3f64])
1435    )]
1436    #[case(
1437        column_frame!("a" => [1, 2, 3]),
1438        Key::new("a", crate::DataType::F32),
1439        column_frame!("a" => [1f32, 2f32, 3f32])
1440    )]
1441    // #[case(
1442    //     column_frame!("a" => [1, 2, 3]),
1443    //     Key::new("a", crate::DataType::String),
1444    //     column_frame!("a" => ["1", "2", "3"])
1445    // )]
1446    fn test_try_fix_dtype(
1447        #[case] mut df: ColumnFrame,
1448        #[case] key: Key,
1449        #[case] expected: ColumnFrame,
1450    ) {
1451        assert!(df.try_fix_column_by_key(&key).is_ok());
1452        assert_eq!(
1453            df.select(Some(&[key.clone()])),
1454            expected.select(Some(&[key.clone()]))
1455        );
1456    }
1457
1458    #[fixture]
1459    fn unknown_df() -> ColumnFrame {
1460        let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1461
1462        hm.insert("a".into(), vec![1u32.into()]);
1463        hm.insert("b".into(), vec![3i64.into()]);
1464        hm.insert("c".into(), vec![1f64.into()]);
1465        hm.insert("d".into(), vec![1u64.into()]);
1466
1467        hm.into()
1468    }
1469    #[rstest]
1470    #[case(stdhashmap!(
1471        "a" => crate::DataType::U32,
1472        "b" => crate::DataType::I64,
1473        "c" => crate::DataType::F64,
1474        "d" => crate::DataType::U64)
1475    )]
1476    fn test_try_fix_dtype_unknown(
1477        mut unknown_df: ColumnFrame,
1478        #[case] dtypes: HashMap<String, crate::DataType>,
1479    ) {
1480        for dtype in dtypes.iter() {
1481            let t: &Key = unknown_df
1482                .keys()
1483                .iter()
1484                .find(|x| x.name() == dtype.0)
1485                .unwrap();
1486            assert_ne!(t.ctype, crate::DataType::Unknown);
1487        }
1488        assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1489        for dtype in dtypes.iter() {
1490            let t: &Key = unknown_df
1491                .keys()
1492                .iter()
1493                .find(|x| x.name() == dtype.0)
1494                .unwrap();
1495            assert_eq!(t.ctype, *dtype.1);
1496            assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1497        }
1498        assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1499    }
1500
1501    #[rstest]
1502    #[case(
1503        column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1504        Key::new("a", crate::DataType::F32),
1505        column_frame!("a" => [1f32, 2f32, 3f32])
1506    )]
1507    #[traced_test]
1508    fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1509        assert!(df.try_fix_dtype().is_ok());
1510        assert_eq!(
1511            df.select(Some(&[key.clone()])),
1512            expected.select(Some(&[key]))
1513        )
1514    }
1515
1516    #[rstest]
1517    #[traced_test]
1518    fn test_not_key_fix() {
1519        let mut cf = column_frame!("a" => [1]);
1520        let non_existing = Key::new("b", crate::DataType::I32);
1521        assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1522    }
1523
1524    #[rstest]
1525    #[case(
1526        column_frame! {
1527            "a" => [1, 2, 3],
1528            "b" => [4, 5, 6],
1529            "c" => [7, 8, 9]
1530        },
1531        vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1532        vec![("a", "a_alias")]
1533    )]
1534    #[traced_test]
1535    fn alias_test(
1536        #[case] df: ColumnFrame,
1537        #[case] keys: Vec<Key>,
1538        #[case] aliases: Vec<(&str, &str)>,
1539    ) {
1540        let mut df = df;
1541        for (old, new) in aliases {
1542            df.add_alias(old, new).expect("BUG: cannot rename key");
1543        }
1544        let origin_keys = df.keys().to_vec();
1545        let selected_aliases = df.select(Some(keys.as_slice()));
1546        let selected = df.select(Some(origin_keys.as_slice()));
1547        assert_eq!(selected, selected_aliases);
1548    }
1549
1550    #[rstest]
1551    #[traced_test]
1552    fn test_mut_view() {
1553        let data = vec![
1554            DataValue::from(1f64),
1555            DataValue::from(4f32),
1556            DataValue::from(2f64),
1557            DataValue::from(f32::NAN),
1558            DataValue::from(f64::NAN),
1559            DataValue::from(f32::INFINITY),
1560        ];
1561        let keys: Vec<Key> = vec!["a".into(), "b".into()];
1562
1563        let index = KeyIndex::new(keys.clone());
1564        let df = Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create array");
1565        let mut df = ColumnFrame::new(index.clone(), df);
1566        df.get_mut_view().mapv_inplace(|x| match x {
1567            DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1568            DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1569            e => e,
1570        });
1571        let data = vec![
1572            DataValue::from(1f64),
1573            DataValue::from(4f32),
1574            DataValue::from(2f64),
1575            DataValue::from(0f32),
1576            DataValue::from(0f64),
1577            DataValue::from(0f32),
1578        ];
1579        let expected = ColumnFrame::new(
1580            index,
1581            Array2::from_shape_vec((3, keys.len()), data).expect("BUG: cannot create ndarray"),
1582        );
1583        assert_eq!(df, expected);
1584    }
1585
1586    #[rstest]
1587    #[traced_test]
1588    fn dummy_test() {
1589        let data = vec![
1590            DataValue::U32(1),
1591            DataValue::I32(2),
1592            DataValue::I64(3),
1593            DataValue::U64(4),
1594        ];
1595
1596        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1597
1598        let index = KeyIndex::new(keys.clone());
1599        let mut data_frame = Array2::default((1, keys.len()));
1600        for (idx, entry) in data.iter().enumerate() {
1601            data_frame
1602                .column_mut(idx)
1603                .assign(&ArrayView::from(&[entry.clone()]));
1604        }
1605
1606        let frame = ColumnFrame::new(index, data_frame);
1607        assert_eq!(
1608            frame.get_by_row_index(&"a".into(), 0),
1609            Some(&DataValue::U32(1))
1610        );
1611        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1612        assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
1613        assert_eq!(
1614            frame.select(Some(&["a".into(), "b".into()])),
1615            Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
1616                .expect("BUG: cannot create array")
1617        );
1618    }
1619
1620    #[rstest]
1621    #[traced_test]
1622    fn dummy_test_multiple_rows() {
1623        let data = vec![
1624            DataValue::U32(1),
1625            DataValue::I32(2),
1626            DataValue::I64(3),
1627            DataValue::U64(4),
1628            DataValue::U32(12),
1629            DataValue::I32(22),
1630            DataValue::I64(32),
1631            DataValue::U64(42),
1632        ];
1633
1634        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1635
1636        let index = KeyIndex::new(keys.clone());
1637        let data_frame =
1638            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1639
1640        let frame = ColumnFrame::new(index, data_frame);
1641        assert_eq!(
1642            frame.get_by_row_index(&"a".into(), 0),
1643            Some(&DataValue::U32(1))
1644        );
1645        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
1646        assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
1647        let arr = Array2::from_shape_vec(
1648            (2, 2),
1649            vec![
1650                DataValue::U32(1),
1651                DataValue::I32(2),
1652                DataValue::U32(12),
1653                DataValue::I32(22),
1654            ],
1655        )
1656        .expect("BUG: cannot create array");
1657        trace!("{arr:?}");
1658        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1659    }
1660
1661    #[rstest]
1662    #[traced_test]
1663    fn dummy_test_multiple_rows_push() {
1664        let data = vec![
1665            DataValue::U32(1),
1666            DataValue::I32(2),
1667            DataValue::I64(3),
1668            DataValue::U64(4),
1669            DataValue::U32(12),
1670            DataValue::I32(22),
1671            DataValue::I64(32),
1672            DataValue::U64(42),
1673        ];
1674        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1675
1676        let index = KeyIndex::new(keys.clone());
1677        let data_frame =
1678            Array2::from_shape_vec((2, keys.len()), data).expect("BUG: cannot create array");
1679
1680        let mut frame = ColumnFrame::new(index, data_frame);
1681        assert!(frame
1682            .push(data_value::stdhashmap!(
1683                "a" => DataValue::U32(2),
1684                "b" => DataValue::I32(3),
1685                "c" => DataValue::I64(4),
1686                "d" => DataValue::U64(5)
1687            ))
1688            .is_ok());
1689        let arr = Array2::from_shape_vec(
1690            (3, 2),
1691            vec![
1692                DataValue::U32(1),
1693                DataValue::I32(2),
1694                DataValue::U32(12),
1695                DataValue::I32(22),
1696                DataValue::U32(2),
1697                DataValue::I32(3),
1698            ],
1699        )
1700        .expect("BUG: cannot create array");
1701        trace!("{arr:?}");
1702        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
1703        let result = frame.push(data_value::stdhashmap!(
1704            "a" => DataValue::U32(34),
1705            "b" => DataValue::I32(44),
1706            "c" => DataValue::I64(54),
1707            "e" => DataValue::F32(6f32)
1708        ));
1709        assert!(result.is_ok(), "{result:?}");
1710        let arr = Array2::from_shape_vec(
1711            (4, 2),
1712            vec![
1713                DataValue::U64(4),
1714                DataValue::Null,
1715                DataValue::U64(42),
1716                DataValue::Null,
1717                DataValue::U64(5),
1718                DataValue::Null,
1719                DataValue::Null,
1720                DataValue::F32(6f32),
1721            ],
1722        )
1723        .expect("BUG: cannot create array");
1724        trace!("{arr:?}");
1725        assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
1726    }
1727
1728    #[rstest]
1729    #[case(
1730        column_frame! {
1731            "group_id" => vec![1, 2],
1732            "feed_tag" => vec![3, 4]
1733        },
1734        Some(vec![Key::from("group_id")]),
1735        ndarray::array!([1.into()], [2.into()])
1736    )]
1737    #[case(
1738        column_frame! {
1739            "group_id" => vec![1, 2],
1740            "feed_tag" => vec![3, 4]
1741        },
1742        Some(vec!["group_id".into(), "feed_tag".into()]),
1743        ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
1744    )]
1745    #[case(
1746        column_frame! {
1747            "group_id" => vec![1, 2],
1748            "feed_tag" => vec![3, DataValue::Null]
1749        },
1750        Some(vec!["feed_tag".into()]),
1751        ndarray::array![[3.into()], [DataValue::Null]]
1752    )]
1753    #[case(
1754        column_frame! {
1755            "group_id" => vec![1, 2],
1756            "feed_tag" => vec![1, DataValue::Null]
1757        },
1758        Some(vec!["feed_tag2".into()]),
1759       Array2::<DataValue>::default((0, 0))
1760    )]
1761    #[traced_test]
1762    fn test_select(
1763        #[case] input: ColumnFrame,
1764        #[case] keys: Option<Vec<Key>>,
1765        #[case] expected: Array2<DataValue>,
1766    ) {
1767        trace!("input={input:?}");
1768        let keys_slice = keys.as_deref();
1769        let selected = input.select(keys_slice);
1770        trace!("selected={selected:?}");
1771        assert_eq!(selected, expected);
1772        let selected = input.select_transposed(keys_slice);
1773        trace!("selected_transposed={selected:?}");
1774        assert!(selected.is_ok());
1775        assert_eq!(selected.unwrap(), expected.t());
1776    }
1777
1778    #[rstest]
1779    #[case(
1780        column_frame! {
1781            "group_id" => vec![1, 2],
1782            "feed_tag" => vec![3, 4]
1783        },
1784        Key::from("group_id"),
1785        Some(ndarray::array!(1.into(), 2.into()))
1786    )]
1787    #[case(
1788        column_frame! {
1789            "group_id" => vec![1, 2, 5, 6],
1790            "feed_tag" => vec![3, 4, 7, 8]
1791        },
1792        Key::from("group_id"),
1793        Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
1794    )]
1795    #[case(
1796        column_frame! {
1797            "group_id" => vec![1, 2],
1798            "feed_tag" => vec![1, 1]
1799        },
1800        Key::from("feed_tag1"),
1801        None
1802    )]
1803    #[traced_test]
1804    fn test_select_column(
1805        #[case] input: ColumnFrame,
1806        #[case] key: Key,
1807        #[case] expected: Option<Array1<DataValue>>,
1808    ) {
1809        let selected = input.select_column(&key);
1810        trace!("selected={selected:?}");
1811        match expected {
1812            Some(expected) => {
1813                assert!(selected.is_some());
1814                assert_eq!(selected.expect("BUG: checked above"), expected);
1815            }
1816            None => assert!(selected.is_none()),
1817        }
1818    }
1819
1820    #[test]
1821    #[traced_test]
1822    fn empty_join_test() {
1823        let join = JoinRelation::add_columns();
1824        let mut column_frame = ColumnFrame::default();
1825        column_frame
1826            .add_single_column("group_id", Array1::from_vec(vec![]))
1827            .expect("BUG: cannot add column");
1828        let column_frame2 = column_frame! {
1829            "group_id" => vec![2, 1, 3],
1830            "feed_tag" => vec![1, 1, 1],
1831            "clicks" => vec![100, 10, 10],
1832            "imps" => vec![1000, 200, 200]
1833        };
1834        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1835
1836        let joined = column_frame.join(column_frame2, &join);
1837        assert!(joined.is_ok(), "{joined:?}");
1838
1839        trace!("{column_frame:?}");
1840        assert_eq!(
1841            column_frame.select(Some(&[
1842                "group_id".into(),
1843                "feed_tag".into(),
1844                "clicks".into(),
1845                "imps".into()
1846            ])),
1847            ndarray::array!(
1848                [2.into(), 1.into(), 100.into(), 1000.into()],
1849                [1.into(), 1.into(), 10.into(), 200.into()],
1850                [3.into(), 1.into(), 10.into(), 200.into()],
1851            )
1852        );
1853
1854        let mut column_frame2 = column_frame! {
1855            "feed_tag" => vec![1, 1, 1],
1856            "clicks" => vec![100, 10, 10],
1857            "imps" => vec![1000, 200, 200]
1858        };
1859        let mut column_frame = ColumnFrame::default();
1860        column_frame
1861            .add_single_column("group_id", Array1::from_vec(vec![]))
1862            .expect("BUG: cannot add column");
1863        let joined = column_frame2.join(column_frame, &join);
1864        assert!(joined.is_ok(), "{joined:?}");
1865
1866        trace!("{column_frame2:?}");
1867        assert_eq!(
1868            column_frame2.select(Some(&[
1869                "group_id".into(),
1870                "feed_tag".into(),
1871                "clicks".into(),
1872                "imps".into()
1873            ])),
1874            ndarray::array!(
1875                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1876                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1877                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1878            )
1879        );
1880
1881        let mut column_frame = ColumnFrame::default();
1882        column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
1883        let joined = column_frame2.join(column_frame, &join);
1884        assert!(joined.is_ok(), "{joined:?}");
1885
1886        trace!("{column_frame2:?}");
1887        assert_eq!(
1888            column_frame2.select(Some(&[
1889                "group_id2".into(),
1890                "feed_tag".into(),
1891                "clicks".into(),
1892                "imps".into()
1893            ])),
1894            ndarray::array!(
1895                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
1896                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1897                [DataValue::Null, 1.into(), 10.into(), 200.into()],
1898            )
1899        );
1900    }
1901
1902    #[test]
1903    #[traced_test]
1904    fn join_test_multiple() {
1905        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1906        let mut column_frame = column_frame! {
1907            "group_id" => vec![1, 1, 3]
1908        };
1909        let column_frame2 = column_frame! {
1910            "group_id" => vec![2, 1, 1],
1911            "clicks" => vec![100, 10, 10],
1912            "imps" => vec![1000, 200, 200]
1913        };
1914
1915        let joined = column_frame.join(column_frame2, &join);
1916        assert!(joined.is_ok(), "{joined:?}");
1917
1918        trace!("{column_frame:?}");
1919        assert_eq!(
1920            column_frame.select(Some(&["group_id".into(), "clicks".into(), "imps".into(),])),
1921            ndarray::array!(
1922                [1.into(), 10.into(), 200.into()],
1923                [1.into(), 10.into(), 200.into()],
1924                [3.into(), DataValue::Null, DataValue::Null],
1925            )
1926        )
1927    }
1928
1929    #[test]
1930    #[traced_test]
1931    fn join_test_no_matches() {
1932        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
1933        let mut column_frame = column_frame! {
1934            "group_id" => vec![DataValue::I32(1), DataValue::I32(2), DataValue::I32(3)]
1935        };
1936        let column_frame2 = column_frame! {
1937            "group_id" => vec![DataValue::I32(4), DataValue::I32(5), DataValue::I32(6)],
1938            "clicks" => vec![DataValue::I32(100), DataValue::I32(200), DataValue::I32(300)],
1939        };
1940
1941        let joined = column_frame.join(column_frame2, &join);
1942        assert!(joined.is_ok(), "{joined:?}");
1943
1944        trace!("{column_frame:?}");
1945        assert_eq!(
1946            column_frame.select(Some(&["group_id".into(), "clicks".into()])),
1947            ndarray::array!(
1948                [DataValue::I32(1), DataValue::Null],
1949                [DataValue::I32(2), DataValue::Null],
1950                [DataValue::I32(3), DataValue::Null],
1951            )
1952        )
1953    }
1954    #[test]
1955    #[traced_test]
1956    fn join_test() {
1957        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1958            "group_id".into(),
1959            "feed_tag".into(),
1960        ])));
1961        let mut column_frame = column_frame! {
1962            "group_id" => vec![1, 2, 8],
1963            "feed_tag" => vec![1, 1, 10]
1964        };
1965        let column_frame2 = column_frame! {
1966            "group_id" => vec![2, 1, 3],
1967            "feed_tag" => vec![1, 1, 1],
1968            "clicks" => vec![100, 10, 10],
1969            "imps" => vec![1000, 200, 200]
1970        };
1971        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
1972
1973        let joined = column_frame.join(column_frame2, &join);
1974        assert!(joined.is_ok(), "{joined:?}");
1975
1976        trace!("{column_frame:?}");
1977        assert_eq!(
1978            column_frame.select(Some(&[
1979                "group_id".into(),
1980                "feed_tag".into(),
1981                "clicks".into(),
1982                "imps".into()
1983            ])),
1984            ndarray::array!(
1985                [1.into(), 1.into(), 10.into(), 200.into()],
1986                [2.into(), 1.into(), 100.into(), 1000.into()],
1987                [8.into(), 10.into(), DataValue::Null, DataValue::Null]
1988            )
1989        )
1990    }
1991
1992    #[test]
1993    #[traced_test]
1994    fn join_test_with_additional() {
1995        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
1996            "group_id".into(),
1997            "feed_tag".into(),
1998        ])));
1999        let mut column_frame = column_frame! {
2000            "group_id" => vec![1, 2, 8],
2001            "feed_tag" => vec![1, 1, 10],
2002            "clicked" => vec![0, 0, 1]
2003        };
2004        let column_frame2 = column_frame! {
2005            "group_id" => vec![2, 1, 3],
2006            "feed_tag" => vec![1, 1, 1],
2007            "clicks" => vec![100, 10, 10],
2008            "imps" => vec![1000, 200, 200]
2009        };
2010        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2011
2012        let joined = column_frame.join(column_frame2, &join);
2013        assert!(joined.is_ok(), "{joined:?}");
2014
2015        trace!("{column_frame:?}");
2016        assert_eq!(
2017            column_frame.select(Some(&[
2018                "group_id".into(),
2019                "feed_tag".into(),
2020                "clicks".into(),
2021                "imps".into(),
2022                "clicked".into()
2023            ])),
2024            ndarray::array!(
2025                [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
2026                [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
2027                [
2028                    8.into(),
2029                    10.into(),
2030                    DataValue::Null,
2031                    DataValue::Null,
2032                    1.into()
2033                ]
2034            )
2035        )
2036    }
2037
2038    #[test]
2039    #[traced_test]
2040    fn join_test_with_additional_single() {
2041        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
2042            "group_id".into(),
2043            "feed_tag".into(),
2044        ])));
2045        let mut column_frame = column_frame! {
2046            "group_id" => vec![1, 2, 8],
2047            "feed_tag" => vec![1, 1, 10],
2048            "clicked" => vec![0, 0, 1]
2049        };
2050        let column_frame2 = column_frame! {
2051            "a" => vec![1],
2052            "group_id" => vec![2],
2053            "feed_tag" => vec![1],
2054            "clicks" => vec![10],
2055            "imps" => vec![200]
2056        };
2057        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2058
2059        let joined = column_frame.join(column_frame2, &join);
2060        assert!(joined.is_ok(), "{joined:?}");
2061
2062        trace!("{column_frame:?}");
2063        assert_eq!(
2064            column_frame.select(Some(&[
2065                "group_id".into(),
2066                "feed_tag".into(),
2067                "clicks".into(),
2068                "imps".into(),
2069                "clicked".into()
2070            ])),
2071            ndarray::array!(
2072                [
2073                    1.into(),
2074                    1.into(),
2075                    DataValue::Null,
2076                    DataValue::Null,
2077                    0.into(),
2078                ],
2079                [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
2080                [
2081                    8.into(),
2082                    10.into(),
2083                    DataValue::Null,
2084                    DataValue::Null,
2085                    1.into()
2086                ]
2087            )
2088        )
2089    }
2090
2091    #[rstest]
2092    #[traced_test]
2093    fn cartesian_product_join() {
2094        let mut df = column_frame! {
2095            "group_id" => vec![1, 2, 3],
2096            "feed_tag" => vec![1, 2, 3]
2097        };
2098        let df2 = column_frame! {
2099            "zone_id" => vec![111111, 111133],
2100            "zone_avg_ctr" => vec![0.1, 0.001]
2101        };
2102        assert!(df
2103            .join(
2104                ColumnFrame::default(),
2105                &JoinRelation::new(JoinBy::CartesianProduct)
2106            )
2107            .is_ok());
2108        let join = JoinRelation::new(JoinBy::CartesianProduct);
2109        let result = df.join(df2, &join);
2110        assert!(result.is_ok(), "{result:?}");
2111        let selected = df.select(None);
2112        trace!("{selected:?}");
2113        assert_eq!(
2114            selected,
2115            ndarray::array!(
2116                [1.into(), 1.into(), 111111.into(), 0.1.into()],
2117                [1.into(), 1.into(), 111133.into(), 0.001.into()],
2118                [2.into(), 2.into(), 111111.into(), 0.1.into()],
2119                [2.into(), 2.into(), 111133.into(), 0.001.into()],
2120                [3.into(), 3.into(), 111111.into(), 0.1.into()],
2121                [3.into(), 3.into(), 111133.into(), 0.001.into()],
2122            )
2123        );
2124
2125        let df2 = column_frame! {
2126            "zone_id" => vec![111]
2127        };
2128        let result = df.join(df2, &join);
2129        assert!(result.is_ok(), "{result:?}");
2130        let selected = df.select(None);
2131        trace!("{selected:?}");
2132        assert_eq!(
2133            selected,
2134            ndarray::array!(
2135                [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
2136                [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
2137                [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
2138                [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
2139                [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
2140                [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
2141            )
2142        );
2143    }
2144
2145    #[rstest]
2146    #[traced_test]
2147    fn broadcast_join() {
2148        let mut df = column_frame! {
2149            "group_id" => vec![1, 2, 3],
2150            "feed_tag" => vec![1, 2, 3]
2151        };
2152        let df2 = column_frame! {
2153            "zone_id" => vec![111111]
2154        };
2155        assert!(df
2156            .join(
2157                ColumnFrame::default(),
2158                &JoinRelation::new(JoinBy::Broadcast)
2159            )
2160            .is_ok());
2161        let join = JoinRelation::new(JoinBy::Broadcast);
2162        assert!(df.join(df2, &join).is_ok());
2163        let selected = df.select(None);
2164        trace!("{selected:?}");
2165        assert_eq!(
2166            selected,
2167            ndarray::array!(
2168                [1.into(), 1.into(), 111111.into()],
2169                [2.into(), 2.into(), 111111.into()],
2170                [3.into(), 3.into(), 111111.into()]
2171            )
2172        );
2173    }
2174    #[rstest]
2175    #[traced_test]
2176    fn merge_test() {
2177        let mut df = column_frame! {
2178            "group_id" => vec![1, 2, 3],
2179            "feed_tag" => vec![1, 2, 3]
2180        };
2181        let df2 = column_frame! {
2182            "group_id" => vec![11, 21, 31],
2183            "feed_tag" => vec![12, 22, 32]
2184        };
2185
2186        let join = JoinRelation::new(JoinBy::Replace);
2187        assert!(df.join(df2, &join).is_ok());
2188        let selected = df.select(None);
2189        trace!("{selected:?}");
2190        assert_eq!(
2191            selected,
2192            ndarray::array!(
2193                [11.into(), 12.into()],
2194                [21.into(), 22.into()],
2195                [31.into(), 32.into()]
2196            )
2197        );
2198    }
2199
2200    #[rstest]
2201    #[traced_test]
2202    fn extend_test() {
2203        let mut df = column_frame! {
2204            "group_id" => vec![1, 2, 3],
2205            "feed_tag" => vec![1, 2, 3]
2206        };
2207        let df2 = column_frame! {
2208            "group_id" => vec![11, 21, 31],
2209            "feed_tag" => vec![5, 6, 7]
2210        };
2211        assert!(df
2212            .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
2213            .is_ok());
2214
2215        let join = JoinRelation::new(JoinBy::Extend);
2216        assert!(df.join(df2, &join).is_ok());
2217        let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
2218        trace!("{selected:?}");
2219        assert_eq!(
2220            selected,
2221            ndarray::array!(
2222                [1.into(), 1.into()],
2223                [2.into(), 2.into()],
2224                [3.into(), 3.into()],
2225                [5.into(), 11.into()],
2226                [6.into(), 21.into()],
2227                [7.into(), 31.into()]
2228            )
2229        );
2230        let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
2231        trace!("{as_map:?}");
2232        assert_eq!(
2233            as_map,
2234            stdhashmap!(
2235                "feed_tag" => vec![1, 2, 3, 5, 6, 7],
2236                "group_id" => vec![1, 2, 3, 11, 21, 31]
2237            )
2238        );
2239
2240        let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
2241        trace!("{as_map:?}");
2242        assert_eq!(as_map, HashMap::default());
2243    }
2244
2245    #[rstest]
2246    #[traced_test]
2247    fn extend_test_with_non_existing_cols() {
2248        let mut df = column_frame! {
2249            "group_id" => vec![1, 2, 3],
2250            "feed_tag" => vec![1, 2, 3]
2251        };
2252        let mut df2 = column_frame! {
2253            "group_id" => vec![11, 21, 31],
2254            "feed_tag" => vec![5, 6, 7],
2255            "clicks" => vec![100, 200, 300],
2256            "impressions" => vec![1000, 2000, 3000]
2257        };
2258        let df_bckp = df.clone();
2259        let join = JoinRelation::new(JoinBy::Extend);
2260        assert!(df.join(df2.clone(), &join).is_ok());
2261        let selected = df.select(None);
2262        trace!("{selected:?}");
2263        assert_eq!(
2264            selected,
2265            ndarray::array!(
2266                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2267                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2268                [3.into(), 3.into(), DataValue::Null, DataValue::Null],
2269                [11.into(), 5.into(), 100.into(), 1000.into()],
2270                [21.into(), 6.into(), 200.into(), 2000.into()],
2271                [31.into(), 7.into(), 300.into(), 3000.into()]
2272            )
2273        );
2274        let join = JoinRelation::new(JoinBy::Extend);
2275        let r = df2.join(df_bckp, &join);
2276        assert!(r.is_ok(), "{r:?}");
2277        let selected = df2.select(None);
2278        trace!("{selected:?}");
2279        assert_eq!(
2280            selected,
2281            ndarray::array!(
2282                [11.into(), 5.into(), 100.into(), 1000.into()],
2283                [21.into(), 6.into(), 200.into(), 2000.into()],
2284                [31.into(), 7.into(), 300.into(), 3000.into()],
2285                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2286                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2287                [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2288            )
2289        );
2290    }
2291
2292    #[rstest]
2293    #[traced_test]
2294    fn extend_test_with_non_existing_cols_wrong_order() {
2295        let mut df = column_frame! {
2296            "group_id" => vec![1, 2, 3],
2297            "feed_tag" => vec![1, 2, 3]
2298        };
2299        let df2 = column_frame! {
2300            "feed_tag" => vec![5, 6, 7],
2301            "group_id" => vec![11, 21, 31]
2302        };
2303        let join = JoinRelation::new(JoinBy::Extend);
2304        let err = df.join(df2, &join);
2305        assert!(err.is_ok(), "{err:?}");
2306    }
2307
2308    #[rstest]
2309    #[traced_test]
2310    fn test_replace_not_compatible() {
2311        let mut df = column_frame! {
2312            "group_id" => vec![1, 2, 3],
2313            "feed_tag" => vec![1, 2, 3]
2314        };
2315        let df2 = column_frame! {
2316            "feed_tag" => vec![5, 6],
2317            "group_id" => vec![11, 21]
2318        };
2319        let join = JoinRelation::new(JoinBy::Replace);
2320        let err = df.join(df2, &join);
2321        assert!(err.is_err(), "{err:?}");
2322        let empty = ColumnFrame::default();
2323        let err = df.join(empty, &join);
2324        assert!(err.is_ok(), "{err:?}");
2325    }
2326
2327    #[rstest]
2328    #[traced_test]
2329    fn test_different_data() {
2330        let mut df = column_frame! {
2331            "group_id" => vec![1, 2, 3],
2332            "feed_tag" => vec![1, 2, 3]
2333        };
2334        let df2 = column_frame! {
2335            "group_id" => vec![11, 21],
2336            "a" => vec![5, 6]
2337        };
2338        let join = JoinRelation::new(JoinBy::Extend);
2339        let err = df.join(df2, &join);
2340        assert!(err.is_ok(), "{err:?}");
2341        println!("{df:?}");
2342        let expected_df = ColumnFrame::new(
2343            KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2344            ndarray::array!(
2345                [1.into(), 1.into(), DataValue::Null],
2346                [2.into(), 2.into(), DataValue::Null],
2347                [3.into(), 3.into(), DataValue::Null],
2348                [11.into(), DataValue::Null, 5.into()],
2349                [21.into(), DataValue::Null, 6.into()]
2350            ),
2351        );
2352        assert_eq!(df, expected_df)
2353    }
2354
2355    #[rstest]
2356    #[traced_test]
2357    fn serde_column_frame() {
2358        let df = column_frame! {
2359            "group_id" => vec![1u64, 2u64, 3u64],
2360            "feed_tag" => vec![1u64, 2u64, 3u64]
2361        };
2362        let key_idx = df.index.clone();
2363        let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2364        let deserialized: KeyIndex =
2365            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2366        assert_eq!(key_idx, deserialized);
2367        assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2368        let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2369        let deserialized: ColumnFrame =
2370            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2371        assert_eq!(df, deserialized);
2372    }
2373
2374    #[rstest]
2375    #[traced_test]
2376    fn update_value() {
2377        let mut df = column_frame! {
2378            "group_id" => vec![1, 2, 3],
2379            "feed_tag" => vec![1, 2, 3]
2380        };
2381        let group_id: Key = "group_id".into();
2382        let v = df.get_mut_by_row_index(&group_id, 1);
2383        assert!(v.is_some());
2384        let v = v.unwrap();
2385        assert_eq!(v, &DataValue::I32(2));
2386        *v = DataValue::U64(22);
2387        let v = df.get_by_row_index(&group_id, 1);
2388        assert!(v.is_some());
2389        let v = v.unwrap();
2390        assert_eq!(v, &DataValue::U64(22));
2391
2392        assert!(df.get_mut_by_row_index(&"group_id2".into(), 1).is_none());
2393    }
2394
2395    #[rstest]
2396    fn get_single_column_typed_f64() {
2397        let df = column_frame! {
2398            "a" => [1i32, 2i32, 3i32],
2399            "b" => [10u64, 20u64, 30u64]
2400        };
2401        let key: Key = "a".into();
2402        let col = df.get_single_column_typed::<f64>(&key).unwrap();
2403        assert_eq!(col, ndarray::arr1(&[1.0, 2.0, 3.0]));
2404    }
2405
2406    #[rstest]
2407    fn get_single_column_typed_i64() {
2408        let df = column_frame! {
2409            "x" => [10u32, 20u32, 30u32]
2410        };
2411        let key: Key = "x".into();
2412        let col = df.get_single_column_typed::<i64>(&key).unwrap();
2413        assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
2414    }
2415
2416    #[rstest]
2417    fn get_single_column_typed_string() {
2418        let df = column_frame! {
2419            "name" => ["alice", "bob", "carol"]
2420        };
2421        let key: Key = "name".into();
2422        let col = df.get_single_column_typed::<String>(&key).unwrap();
2423        assert_eq!(
2424            col,
2425            ndarray::arr1(&["alice".to_string(), "bob".to_string(), "carol".to_string()])
2426        );
2427    }
2428
2429    #[rstest]
2430    fn get_single_column_typed_bool() {
2431        let df = column_frame! {
2432            "flag" => [1i32, 0i32, 1i32]
2433        };
2434        let key: Key = "flag".into();
2435        let col = df.get_single_column_typed::<bool>(&key).unwrap();
2436        assert_eq!(col, ndarray::arr1(&[true, false, true]));
2437    }
2438
2439    #[rstest]
2440    fn get_single_column_typed_missing_key_returns_none() {
2441        let df = column_frame! {
2442            "a" => [1, 2, 3]
2443        };
2444        let missing: Key = "nonexistent".into();
2445        assert!(df.get_single_column_typed::<f64>(&missing).is_none());
2446    }
2447
2448    #[rstest]
2449    fn get_single_column_typed_numeric_coercion_from_mixed() {
2450        let df = column_frame! {
2451            "vals" => [1.5f64, 2.7f64, 3.9f64]
2452        };
2453        let key: Key = "vals".into();
2454        // f64 -> i32 truncates via `as`
2455        let col = df.get_single_column_typed::<i32>(&key).unwrap();
2456        assert_eq!(col, ndarray::arr1(&[1i32, 2i32, 3i32]));
2457    }
2458
2459    #[rstest]
2460    fn get_single_column_typed_selects_correct_column() {
2461        let df = column_frame! {
2462            "x" => [1, 2, 3],
2463            "y" => [10, 20, 30],
2464            "z" => [100, 200, 300]
2465        };
2466        let key: Key = "y".into();
2467        let col = df.get_single_column_typed::<i64>(&key).unwrap();
2468        assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
2469    }
2470
2471    #[rstest]
2472    fn get_single_column_typed_u64_identity() {
2473        let df = column_frame! {
2474            "id" => [100u64, 200u64, 300u64]
2475        };
2476        let key: Key = "id".into();
2477        let col = df.get_single_column_typed::<u64>(&key).unwrap();
2478        assert_eq!(col, ndarray::arr1(&[100u64, 200u64, 300u64]));
2479    }
2480
2481    #[rstest]
2482    fn get_single_column_typed_single_row() {
2483        let df = column_frame! {
2484            "x" => [42i32]
2485        };
2486        let key: Key = "x".into();
2487        let col = df.get_single_column_typed::<f64>(&key).unwrap();
2488        assert_eq!(col, ndarray::arr1(&[42.0f64]));
2489    }
2490
2491    #[rstest]
2492    fn get_single_column_typed_empty_frame() {
2493        let df = ColumnFrame::default();
2494        let key: Key = "x".into();
2495        let col = df.get_single_column_typed::<f64>(&key);
2496        assert!(col.is_none());
2497    }
2498
2499    #[rstest]
2500    fn select_typed_all_columns() {
2501        let df = column_frame! {
2502            "a" => [1i32, 2i32],
2503            "b" => [3i32, 4i32]
2504        };
2505        let result = df.select_typed::<f64>(None);
2506        assert_eq!(result.nrows(), 2);
2507        assert_eq!(result.ncols(), 2);
2508        assert_eq!(result[[0, 0]], 1.0);
2509        assert_eq!(result[[0, 1]], 3.0);
2510        assert_eq!(result[[1, 0]], 2.0);
2511        assert_eq!(result[[1, 1]], 4.0);
2512    }
2513
2514    #[rstest]
2515    fn select_typed_subset_of_columns() {
2516        let df = column_frame! {
2517            "a" => [10u64, 20u64],
2518            "b" => [30u64, 40u64],
2519            "c" => [50u64, 60u64]
2520        };
2521        let keys: Vec<Key> = vec!["a".into(), "c".into()];
2522        let result = df.select_typed::<i64>(Some(&keys));
2523        assert_eq!(result.nrows(), 2);
2524        assert_eq!(result.ncols(), 2);
2525        assert_eq!(result[[0, 0]], 10i64);
2526        assert_eq!(result[[0, 1]], 50i64);
2527        assert_eq!(result[[1, 0]], 20i64);
2528        assert_eq!(result[[1, 1]], 60i64);
2529    }
2530
2531    #[rstest]
2532    fn select_typed_nonexistent_keys_returns_empty() {
2533        let df = column_frame! {
2534            "a" => [1i32, 2i32]
2535        };
2536        let keys: Vec<Key> = vec!["z".into()];
2537        let result = df.select_typed::<f64>(Some(&keys));
2538        assert_eq!(result.shape(), &[0, 0]);
2539    }
2540
2541    #[rstest]
2542    fn select_typed_string_extraction() {
2543        let df = column_frame! {
2544            "name" => ["hello", "world"]
2545        };
2546        let result = df.select_typed::<String>(None);
2547        assert_eq!(result[[0, 0]], "hello");
2548        assert_eq!(result[[1, 0]], "world");
2549    }
2550
2551    #[rstest]
2552    fn select_typed_matches_manual_mapv() {
2553        let df = column_frame! {
2554            "x" => [1i32, 2i32, 3i32],
2555            "y" => [4i32, 5i32, 6i32]
2556        };
2557        let typed = df.select_typed::<f64>(None);
2558        let manual = df.select(None).mapv(|v| f64::extract(&v));
2559        assert_eq!(typed, manual);
2560    }
2561}