Skip to main content

trs_dataframe/dataframe/
column_store.rs

1use ndarray::{Array, Array1, Array2, ArrayView1, ArrayView2, Axis, ShapeBuilder};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5use crate::dataframe::column_store::typed_array::{TypedData, TypedDataArray};
6use crate::error::Error;
7use crate::{dataframe::index::Index, CandidateData, JoinBy, JoinRelation, Key};
8use data_value::{DataValue, Extract};
9use tracing::*;
10mod from;
11mod key_index;
12mod ops;
13/// Sorted-view wrapper over a [`ColumnFrame`] for top-N queries.
14pub mod sorted_df;
15pub use key_index::KeyIndex;
16/// Expression-based row filtering applied to a [`ColumnFrame`].
17pub mod filter_df;
18/// Typed column storage ([`TypedData`]) and nullable wrapper ([`TypedDataArray`]).
19pub mod typed_array;
20
21/// Column-oriented in-memory store for candidate data.
22///
23/// Each column is a [`TypedData`] — a tagged union over the supported
24/// primitives (`bool`, `u8`/`u32`/`u64`, `i32`/`i64`, `f32`/`f64`, `String`)
25/// with a `Generic` fallback for heterogeneous values. Columns are looked up
26/// by [`Key`] through the embedded [`KeyIndex`].
27///
28/// Most operations work directly against the typed columns (zero-copy where
29/// possible). Use [`ColumnFrame::select`] to materialize a row-major
30/// [`Array2<DataValue>`] when you need a 2-D view.
31#[derive(Debug, Clone, Default, PartialEq, Serialize)]
32pub struct ColumnFrame {
33    pub index: KeyIndex,
34    pub data_frame: Vec<TypedDataArray>,
35}
36
37/// Custom Deserialize: accept row-major `Array2<DataValue>` and convert it to
38/// the column-oriented [`TypedData`] storage, promoting each column to its
39/// detected dtype where possible.
40impl<'de> Deserialize<'de> for ColumnFrame {
41    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
42    where
43        D: serde::Deserializer<'de>,
44    {
45        /// Serde tries each variant top-to-bottom, picks the first that matches
46        /// the structure of the incoming data. Order is load-bearing.
47        #[derive(Debug, Deserialize)]
48        #[serde(untagged)]
49        enum WireDataframe {
50            V3(Vec<TypedDataArray>),
51            V2(Vec<TypedData>),
52            V1(Array2<DataValue>),
53        }
54        #[derive(Debug, Deserialize)]
55        struct WireData {
56            index: KeyIndex,
57            data_frame: WireDataframe,
58        }
59
60        let helper = WireData::deserialize(deserializer)?;
61        match helper.data_frame {
62            WireDataframe::V1(data_frame) => {
63                let ncols = data_frame.ncols();
64                let data_frame: Vec<TypedDataArray> = (0..ncols)
65                    .map(|i| {
66                        let values: Vec<DataValue> = data_frame.column(i).iter().cloned().collect();
67                        let dtype = helper
68                            .index
69                            .get_keys()
70                            .get(i)
71                            .map(|k| k.ctype)
72                            .unwrap_or(crate::DataType::Unknown);
73                        TypedDataArray::new(dtype, values)
74                    })
75                    .collect();
76                Ok(ColumnFrame {
77                    index: helper.index,
78                    data_frame,
79                })
80            }
81            WireDataframe::V2(data_frame) => {
82                let data_frame = data_frame.into_iter().map(TypedDataArray::from).collect();
83                Ok(ColumnFrame {
84                    index: helper.index,
85                    data_frame,
86                })
87            }
88            WireDataframe::V3(data_frame) => Ok(ColumnFrame {
89                index: helper.index,
90                data_frame,
91            }),
92        }
93    }
94}
95
96enum Continue {
97    Continue,
98    End,
99}
100
101impl Continue {
102    pub fn should_end(&self) -> bool {
103        matches!(self, Self::End)
104    }
105}
106
107use std::fmt;
108
109impl fmt::Display for ColumnFrame {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        // Display keys and indices
112        write!(f, "\n|")?;
113
114        for key in &self.index.keys {
115            write!(f, " {key} |")?;
116        }
117
118        if self.index.is_empty() {
119            writeln!(f, "|")?;
120        }
121
122        // Display type for each key
123        write!(f, "\n|")?;
124        for value in self.index.keys.iter() {
125            // Display types during first iteration
126            write!(f, " {:10?} |", value.ctype)?;
127        }
128        writeln!(f)?;
129
130        writeln!(f, "---")?;
131
132        // Display items, limit output to 256 rows
133        for row_idx in 0..std::cmp::min(self.nrows(), 257) {
134            write!(f, "|")?;
135            for col in self.data_frame.iter() {
136                write!(f, " {} |", col.get(row_idx).unwrap_or_default())?;
137            }
138            writeln!(f)?;
139            if row_idx >= 256 {
140                writeln!(f, "... (dataframe is too long)")?;
141                break;
142            }
143        }
144
145        writeln!(f, "---")
146    }
147}
148/// Converts a [`DataValue`] to the canonical representation for the given [`crate::DataType`].
149///
150/// Uses the [`Extract`] trait for numeric coercion. If `dtype` is [`crate::DataType::Unknown`]
151/// and the value is not `Null`, the type is auto-detected and conversion retried.
152pub fn convert_data_value(item: DataValue, dtype: crate::DataType) -> DataValue {
153    let x = &item;
154    match dtype {
155        crate::DataType::Bool => DataValue::Bool(bool::extract(x)),
156        crate::DataType::U32 => DataValue::U32(u32::extract(x)),
157        crate::DataType::I32 => DataValue::I32(i32::extract(x)),
158        crate::DataType::U64 => DataValue::U64(u64::extract(x)),
159        crate::DataType::I64 => DataValue::I64(i64::extract(x)),
160        crate::DataType::F32 => DataValue::F32(f32::extract(x)),
161        crate::DataType::U128 => DataValue::U128(u128::extract(x)),
162        crate::DataType::I128 => DataValue::I128(i128::extract(x)),
163        crate::DataType::F64 => DataValue::F64(f64::extract(x)),
164        crate::DataType::U8 => DataValue::U8(u8::extract(x)),
165        crate::DataType::String => DataValue::String(String::extract(x).into()),
166        crate::DataType::Bytes => item,
167        crate::DataType::Map => item,
168        crate::DataType::Vec => item,
169        crate::DataType::Unknown => {
170            if matches!(item, DataValue::Null) {
171                return item;
172            }
173            let dtype = crate::detect_dtype(&item);
174            // this situation should not ever happen
175            if matches!(dtype, crate::DataType::Unknown) {
176                tracing::error!("Unknown datatype {dtype:?} - {item:?}");
177                return item;
178            }
179            convert_data_value(item, dtype)
180        }
181    }
182}
183/// Converts a [`DataValue`] to match the dtype declared by the column [`Key`].
184///
185/// Shorthand for `convert_data_value(item, key.ctype)`.
186pub fn convert_dv_to_dtype(key: &Key, item: DataValue) -> DataValue {
187    convert_data_value(item, key.ctype)
188}
189
190/// Either a borrowed [`ArrayView2`] or an owned [`Array2`] of [`DataValue`].
191///
192/// Returned by [`ColumnFrame::select_view`] to avoid unnecessary copies when
193/// all columns are requested (the owned variant) while still supporting zero-copy
194/// views for single-column and subset selections.
195///
196/// Call [`row_view`](MaybeView::row_view) to obtain a unified read-only
197/// [`ArrayView2`] regardless of which variant is held.
198pub enum MaybeView<'v> {
199    /// A borrowed view into the underlying storage.
200    View(ArrayView2<'v, DataValue>),
201    /// An owned array (produced when data must be re-shaped or stacked).
202    Array(Array2<DataValue>),
203}
204impl MaybeView<'_> {
205    /// Returns a read-only 2-D view over the data in row-major order.
206    ///
207    /// For the [`Array`](MaybeView::Array) variant the underlying storage is
208    /// transposed so that the resulting view always has shape `(nrows, ncols)`.
209    pub fn row_view(&self) -> ArrayView2<'_, DataValue> {
210        match self {
211            Self::View(v) => v.view(),
212            Self::Array(a) => a.t(),
213        }
214    }
215}
216impl ColumnFrame {
217    /// Creates a new [`ColumnFrame`] from a column index and column-oriented
218    /// data.
219    ///
220    /// Each element of `data_frame` becomes one column and must convert into a
221    /// [`TypedData`]. The number of entries must match the number of keys in
222    /// `index`.
223    ///
224    /// Accepted column shapes out of the box include `Vec<DataValue>`,
225    /// `Vec<T>` for every supported primitive, `Array1<DataValue>`, and raw
226    /// [`TypedData`] values.
227    ///
228    /// The column data is stored as-is — no coercion between the value type
229    /// and the key's declared dtype happens here. Use
230    /// [`ColumnFrame::new_coerced`] or [`Self::try_fix_dtype`] if you want the
231    /// column promoted to the key's dtype.
232    pub fn new<K, V>(index: K, data_frame: Vec<V>) -> Self
233    where
234        K: Into<KeyIndex>,
235        V: Into<TypedDataArray>,
236    {
237        let index = index.into();
238        let data_frame = data_frame.into_iter().map(Into::into).collect();
239        Self { data_frame, index }
240    }
241
242    /// Creates a new [`ColumnFrame`] and coerces each column to the dtype
243    /// declared by its matching [`Key`].
244    ///
245    /// This is a convenience constructor for the common case where the input
246    /// data is generic (`Vec<DataValue>`) but the keys carry authoritative
247    /// dtypes.
248    pub fn new_coerced<K, V>(index: K, data_frame: Vec<V>) -> Self
249    where
250        K: Into<KeyIndex>,
251        V: Into<TypedDataArray>,
252    {
253        let index = index.into();
254        let data_frame = data_frame
255            .into_iter()
256            .zip(index.keys.iter())
257            .map(|(value, key)| {
258                let mut col: TypedDataArray = value.into();
259                if !matches!(key.ctype, crate::DataType::Unknown) && col.data_type() != key.ctype {
260                    let _ = col.try_convert_to_dtype(key.ctype);
261                }
262                col
263            })
264            .collect();
265        Self { data_frame, index }
266    }
267
268    /// Returns the ordered slice of column [`Key`]s in this frame.
269    pub fn keys(&self) -> &[Key] {
270        self.index.get_keys()
271    }
272
273    /// Returns the number of rows in this frame.
274    pub fn nrows(&self) -> usize {
275        self.data_frame.first().map(|x| x.len()).unwrap_or_default()
276    }
277    /// Returns the number of columns in this frame.
278    pub fn ncolumns(&self) -> usize {
279        self.data_frame.len()
280    }
281
282    /// Returns `true` if the frame contains no rows.
283    pub fn is_empty(&self) -> bool {
284        self.nrows() == 0
285    }
286
287    /// Compacts internal storage to reduce memory usage after mutations
288    /// that may have left over-allocated capacity.
289    pub fn shrink(&mut self) {}
290
291    /// This method will try to fix dtype based on data stored in each column. If dtype is [`crate::DataType::Unknown`]
292    /// this method will replace dtype for "correct" one based on [`DataValue`].
293    /// NOTE: flag `force` will enforce this dtype even if dtype is known
294    pub fn try_fix_dtype_for_keys(&mut self, force: bool) -> Result<(), Error> {
295        for i in 0..self.index.keys.len() {
296            let should_fix = force || matches!(self.index.keys[i].ctype, crate::DataType::Unknown);
297
298            if should_fix {
299                let column = self
300                    .get_column(&self.index.keys[i])
301                    .map_err(|_| Error::EmptyData)?;
302                let first = column.get(0).ok_or(Error::EmptyData)?;
303                let dtype = crate::detect_dtype(&first);
304                self.index.keys[i].ctype = dtype;
305            }
306        }
307
308        Ok(())
309    }
310    /// Attempts to fix the dtype of every column by inspecting stored values.
311    ///
312    /// Columns whose data does not match their declared dtype are cast in-place.
313    /// Returns `Err` with a list of `(key, message)` pairs for columns that failed casting.
314    pub fn try_fix_dtype(&mut self) -> Result<(), Error> {
315        let mut errors = vec![];
316        let keys = self.index.keys.clone();
317        for key in keys {
318            tracing::trace!("key: {key:?}- {:?}", key.ctype);
319            if let Err(e) = self.try_fix_column_by_key(&key) {
320                errors.push((key, e.to_string()));
321            }
322        }
323        if errors.is_empty() {
324            Ok(())
325        } else {
326            Err(Error::CastFailed(errors))
327        }
328    }
329
330    /// Returns a shared reference to the column identified by `key`.
331    pub fn get_column(&self, key: &Key) -> Result<&TypedDataArray, Error> {
332        let idx = self
333            .index
334            .get_column_index(key)
335            .ok_or(Error::MissingField(format!("{key}").into()))?;
336        self.get_column_by_idx(idx)
337    }
338
339    /// Returns a mutable reference to the column identified by `key`.
340    pub fn get_column_mut(&mut self, key: &Key) -> Result<&mut TypedDataArray, Error> {
341        let idx = self
342            .index
343            .get_column_index(key)
344            .ok_or(Error::MissingField(format!("{key}").into()))?;
345        self.get_column_by_idx_mut(idx)
346    }
347    /// Returns a shared reference to the column at positional `idx`.
348    pub fn get_column_by_idx(&self, idx: usize) -> Result<&TypedDataArray, Error> {
349        self.data_frame
350            .get(idx)
351            .ok_or_else(|| Error::IndexOutOfRange(idx, self.nrows()))
352    }
353
354    /// Returns a mutable reference to the column at positional `idx`.
355    pub fn get_column_by_idx_mut(&mut self, idx: usize) -> Result<&mut TypedDataArray, Error> {
356        let n_cols = self.ncolumns();
357        self.data_frame
358            .get_mut(idx)
359            .ok_or_else(|| Error::IndexOutOfRange(idx, n_cols))
360    }
361
362    /// Returns a single row as a `Vec<DataValue>`, one element per column.
363    pub fn get_row(&self, idx: usize) -> Result<Vec<DataValue>, Error> {
364        let mut row = Vec::with_capacity(self.ncolumns());
365        for col in self.data_frame.iter() {
366            row.push(col.get(idx).unwrap_or_default())
367        }
368        Ok(row)
369    }
370
371    /// Materializes the column-oriented storage into a row-major Array2.
372    /// Returns shape (nrows, ncols).
373    fn finish(&self) -> Result<Array2<DataValue>, Error> {
374        let ncols = self.ncolumns();
375        if ncols == 0 {
376            return Ok(Array2::default((0, 0)));
377        }
378        let nrows = self.nrows();
379        let mut data = Vec::with_capacity(nrows * ncols);
380        let mut selected_cols: Vec<Box<dyn Iterator<Item = DataValue>>> = self
381            .index
382            .indexes()
383            .iter()
384            .map(|col_idx| {
385                self.get_column_by_idx(*col_idx)
386                    .expect("Cannot get column on index")
387                    .iter_values()
388            })
389            .collect::<Vec<_>>();
390
391        for _ in 0..nrows {
392            for col in selected_cols.iter_mut() {
393                data.push(col.next().unwrap_or(DataValue::Null));
394            }
395        }
396
397        Array2::from_shape_vec((nrows, ncols), data)
398            .map_err(|e| Error::UnknownError(format!("finish reshape: {e}")))
399    }
400
401    fn push_row(&mut self, values: Vec<DataValue>) -> Result<(), Error> {
402        let n_cols = self.ncolumns();
403        for (idx, value) in values.into_iter().enumerate() {
404            let current_ptr = self
405                .data_frame
406                .get_mut(idx)
407                .ok_or(Error::IndexOutOfRange(idx, n_cols))?;
408            current_ptr.push(value)?;
409        }
410        Ok(())
411    }
412
413    /// Casts all values in the column identified by `key` to match the key's declared dtype.
414    ///
415    /// Returns an error if the key is not found in the frame.
416    pub fn try_fix_column_by_key(&mut self, key: &Key) -> Result<(), Error> {
417        let col = self.get_column_mut(key)?;
418        col.try_convert_to_dtype(key.ctype)?;
419        Ok(())
420    }
421
422    /// Forces all values in the named column to the specified `dtype`, updating
423    /// both the stored data and the column's [`Key`] metadata.
424    ///
425    /// Returns an error if the column name does not exist.
426    pub fn enforce_dtype_for_column(
427        &mut self,
428        key: &str,
429        dtype: crate::DataType,
430    ) -> Result<(), Error> {
431        if let Some(idx) = self.index.get_column_index_by_name(key) {
432            let new_key = Key::new(key, dtype);
433            let col = self.get_column_by_idx_mut(idx)?;
434            col.try_convert_to_dtype(new_key.ctype)?;
435            self.index.rename_key(key, new_key)?;
436            Ok(())
437        } else {
438            Err(Error::NotFound(Key::new(key, crate::DataType::Unknown)))
439        }
440    }
441
442    /// Renames a column from `old` (matched by name) to `new`.
443    ///
444    /// The new [`Key`] can carry a different [`DataType`](crate::DataType), which
445    /// effectively re-declares the column's type without casting existing values.
446    pub fn rename_key(&mut self, old: &str, new: Key) -> Result<(), Error> {
447        self.index.rename_key(old, new)
448    }
449
450    /// Registers `alias` as an alternative name for the column named `key`.
451    ///
452    /// After aliasing, `select` and friends accept both the original name and the alias.
453    pub fn add_alias(&mut self, key: &str, alias: &str) -> Result<(), Error> {
454        self.index.add_alias(key, alias)
455    }
456
457    /// Returns the requested columns as `Vec<Vec<D>>` in row-major order —
458    /// each outer `Vec` is one row, each inner `Vec` holds one cell per
459    /// selected key. Values are coerced to `D` via [`Extract`].
460    ///
461    /// Despite the name, the result is **not** transposed. Returns an empty
462    /// `Vec` if no key resolves.
463    pub fn select_transposed_typed<D: Extract>(&self, keys: &[Key]) -> Vec<Vec<D>> {
464        let selected = self.select(Some(keys));
465        let mut result = Vec::with_capacity(selected.nrows());
466        for row in selected.rows() {
467            let mut r = Vec::with_capacity(selected.ncols());
468            for value in row.iter() {
469                r.push(D::extract(value));
470            }
471            result.push(r);
472        }
473        result
474    }
475
476    /// Selects columns and stacks them into a 2-D array of shape
477    /// `(ncols, nrows)` — i.e. each row of the output is one column from the
478    /// frame.
479    ///
480    /// Despite the name, this is the only method that returns the
481    /// column-as-row layout; [`Self::select`] returns the row-major
482    /// `(nrows, ncols)` form.
483    ///
484    /// When `keys` is `None`, every column from the [`KeyIndex`] is included.
485    /// Returns an empty `Array2` if no key resolves.
486    pub fn select_transposed(&self, keys: Option<&[Key]>) -> Result<Array2<DataValue>, Error> {
487        let keys = keys.unwrap_or_else(|| self.index.get_keys());
488        let key_indexes = self.index.select(keys);
489        if key_indexes.is_empty() {
490            return Ok(Array2::default((0, 0)));
491        }
492        let data_vec: Result<Vec<Array1<DataValue>>, Error> = key_indexes
493            .indexes()
494            .iter()
495            .map(|x| self.get_column_by_idx(*x).map(|col| col.as_generic_array()))
496            .collect();
497        let data_vec = data_vec?;
498        let views: Vec<ArrayView1<DataValue>> = data_vec.iter().map(|a| a.view()).collect();
499        Ok(ndarray::stack(Axis(0), &views)?)
500    }
501
502    /// Selects a whole column from the [`ColumnFrame`] by the given key as an
503    /// owned [`Array1<DataValue>`].
504    ///
505    /// Returns `None` if the key is missing. Typed columns are materialized on
506    /// the fly; for zero-copy access to typed data, use
507    /// [`Self::get_column`] to obtain the [`TypedDataArray`] directly.
508    #[deprecated(note = "allocates O(n); use get_column() for zero-copy typed access")]
509    pub fn select_column(&self, key: &Key) -> Option<Array1<DataValue>> {
510        self.index
511            .get_column_index(key)
512            .and_then(|x| self.get_column_by_idx(x).ok())
513            .map(|col| col.as_generic_array())
514    }
515
516    /// Applies a user-defined closure to this frame.
517    ///
518    /// The closure receives the provided `keys` and a mutable reference to the
519    /// frame, allowing arbitrary in-place transformations.
520    pub fn apply_function<F>(&mut self, keys: &[Key], mut func: F) -> Result<(), Error>
521    where
522        F: FnMut(&[Key], &mut ColumnFrame) -> Result<(), Error>,
523    {
524        func(keys, self)
525    }
526
527    /// Validates that `(column, row_index)` is in range and returns the
528    /// corresponding column position.
529    ///
530    /// # Errors
531    /// - [`Error::NotFound`] if `column` is not present in the frame.
532    /// - [`Error::IndexOutOfRange`] if `row_index >= nrows`.
533    pub fn validate_entry_access(&self, column: &Key, row_index: usize) -> Result<usize, Error> {
534        if row_index >= self.nrows() {
535            return Err(Error::IndexOutOfRange(row_index, self.nrows()));
536        }
537        let Some(column_index) = self.index.get_column_index(column) else {
538            return Err(Error::NotFound(column.clone()));
539        };
540        Ok(column_index)
541    }
542
543    /// Returns an owned [`DataValue`] for the given column and row index, or
544    /// `None` if either is out of range.
545    ///
546    /// Typed columns allocate a [`DataValue`] on the fly. For zero-copy typed
547    /// access see [`Self::get_column`] + [`TypedData::as_slice_i32`] (and its
548    /// siblings for other primitive types).
549    pub fn get_by_row_index(&self, column: &Key, row_index: usize) -> Option<DataValue> {
550        trace!(
551            "Column: {column} row_index: {row_index} data_frame: cols:{}-rows:{}",
552            self.ncolumns(),
553            self.nrows()
554        );
555        trace!("{:?}", self.data_frame);
556        match self.validate_entry_access(column, row_index) {
557            Ok(column_index) => self.data_frame.get(column_index)?.get(row_index),
558            Err(e) => {
559                trace!("Error: {e}");
560                None
561            }
562        }
563    }
564
565    /// Sets the value at `(column, row_index)`, coercing `value` to the
566    /// column's dtype via [`Extract`].
567    ///
568    /// Returns [`Error::NotFound`] if the column is missing or
569    /// [`Error::IndexOutOfRange`] if the row index is out of range.
570    pub fn set_by_row_index(
571        &mut self,
572        column: &Key,
573        row_index: usize,
574        value: DataValue,
575    ) -> Result<(), Error> {
576        let column_index = self.validate_entry_access(column, row_index)?;
577        let ncols = self.ncolumns();
578        self.data_frame
579            .get_mut(column_index)
580            .ok_or(Error::IndexOutOfRange(column_index, ncols))?
581            .set(row_index, value)
582    }
583
584    /// Returns the requested columns as a `HashMap<Key, Vec<DataValue>>`.
585    ///
586    /// When `keys` is `None`, every column from the [`KeyIndex`] is included.
587    /// Keys absent from the frame are simply omitted from the result; an empty
588    /// map is returned when no key resolves.
589    pub fn select_as_map(&self, keys: Option<&[Key]>) -> HashMap<Key, Vec<DataValue>> {
590        let keys = keys.unwrap_or_else(|| self.index.get_keys());
591        let indexes = self.index.select(keys);
592        if indexes.is_empty() {
593            return Default::default();
594        }
595
596        let mut new_data_frame = HashMap::with_capacity(keys.len());
597
598        for key in keys.iter() {
599            if let Some(column_index_in_source) = indexes.get_column_index(key) {
600                let column = self
601                    .data_frame
602                    .get(column_index_in_source)
603                    .map(|x| x.to_vec())
604                    .unwrap_or_else(|| vec![DataValue::Null; self.nrows()]);
605                new_data_frame.insert(key.clone(), column);
606            }
607        }
608
609        new_data_frame
610    }
611
612    /// Materializes the requested columns into a row-major
613    /// `Array2<DataValue>` of shape `(nrows, ncols)`.
614    ///
615    /// When `keys` is `None`, every column from the [`KeyIndex`] is included.
616    /// Keys absent from the frame produce a `Null`-filled column at the
617    /// corresponding output position. Returns an empty `Array2` if no key
618    /// resolves.
619    ///
620    /// This always allocates a fresh buffer — values are cloned into the
621    /// output array. For zero-copy single-column access, use
622    /// [`Self::select_column`] (or [`Self::get_column`] for direct
623    /// [`TypedData`] access).
624    pub fn select(&self, keys: Option<&[Key]>) -> Array2<DataValue> {
625        if keys.is_none() && !self.is_empty() {
626            return self.finish().expect("BUG: There has to be some data");
627        }
628        let keys = keys.unwrap_or_else(|| self.index.get_keys());
629        let indexes = self.index.select(keys);
630        if indexes.is_empty() || keys.is_empty() {
631            return Array2::default((0, 0));
632        }
633
634        let nrows = self.nrows();
635        let ncols = keys.len();
636
637        // Build per-requested-key column iterators, substituting an empty
638        // iterator for keys that are not present so the row layout still
639        // matches `keys` and missing values surface as `Null`.
640        let mut data = Vec::with_capacity(nrows * ncols);
641        let mut selected_cols: Vec<Box<dyn Iterator<Item = DataValue> + '_>> = keys
642            .iter()
643            .map(|key| match indexes.get_column_index(key) {
644                Some(col_idx) => self
645                    .get_column_by_idx(col_idx)
646                    .expect("Cannot get column on index")
647                    .iter_values(),
648                None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = DataValue> + '_>,
649            })
650            .collect();
651
652        for _ in 0..nrows {
653            for col in selected_cols.iter_mut() {
654                data.push(col.next().unwrap_or(DataValue::Null));
655            }
656        }
657
658        Array::from_shape_vec((nrows, ncols), data).unwrap_or_else(|_| Array2::default((0, 0)))
659    }
660
661    /// Returns selected columns as a `Vec` of owned [`Array1`] columns.
662    ///
663    /// Each entry of the returned `Vec` corresponds to one requested column,
664    /// in the same order as `keys`. Missing keys yield `None`; present keys
665    /// yield `Some(&TypedData)` borrowed directly from the column store. When
666    /// `keys` is `None` every column is borrowed in storage order.
667    ///
668    /// # Errors
669    ///
670    /// Returns [`Error::EmptyData`] when `keys` resolves to an empty or
671    /// entirely unknown key set.
672    ///
673    /// # Examples
674    ///
675    /// ```
676    /// use trs_dataframe::{column_frame, Key};
677    ///
678    /// let cf = column_frame! { "a" => [1i32, 2i32], "b" => [3i32, 4i32] };
679    /// let cols = cf.select_vec_view(Some(&["a".into()])).unwrap();
680    /// assert_eq!(cols.len(), 1);
681    /// assert_eq!(cols[0].as_ref().unwrap().len(), 2);
682    /// ```
683    pub fn select_vec_view(
684        &self,
685        keys: Option<&[Key]>,
686    ) -> Result<Vec<Option<&TypedDataArray>>, Error> {
687        if keys.is_none() && !self.is_empty() {
688            return Ok(self.data_frame.iter().map(Some).collect());
689        }
690        let keys = keys.unwrap_or_else(|| self.index.get_keys());
691        let indexes = self.index.select(keys);
692        if indexes.is_empty() || keys.is_empty() {
693            return Err(Error::EmptyData);
694        }
695        let ncols = keys.len();
696
697        let mut views = Vec::with_capacity(ncols);
698        for col_key in keys {
699            if let Some(col_idx) = self.index.get_column_index(col_key) {
700                views.push(Some(self.get_column_by_idx(col_idx)?));
701            } else {
702                views.push(None);
703            }
704        }
705        Ok(views)
706    }
707
708    /// Returns selected columns as a `Vec<TypedDataArray>`, preserving the
709    /// native primitive storage and null bitmap when available.
710    pub fn select_typed_columns(&self, keys: Option<&[Key]>) -> Result<Vec<TypedDataArray>, Error> {
711        if keys.is_none() && !self.is_empty() {
712            return Ok(self.data_frame.clone());
713        }
714        let keys = keys.unwrap_or_else(|| self.index.get_keys());
715        let indexes = self.index.select(keys);
716        if indexes.is_empty() || keys.is_empty() {
717            return Err(Error::EmptyData);
718        }
719        let mut out = Vec::with_capacity(keys.len());
720        for col_key in keys {
721            match indexes.get_column_index(col_key) {
722                Some(col_idx) => out.push(self.get_column_by_idx(col_idx)?.clone()),
723                None => out.push(TypedDataArray::default_init(col_key, self.nrows())),
724            }
725        }
726        Ok(out)
727    }
728
729    /// Returns selected columns wrapped in a [`MaybeView`].
730    ///
731    /// Each column is materialized as an owned [`Array1<DataValue>`] and then
732    /// stacked along axis 0, producing an owned [`Array2`] of shape
733    /// `(ncols, nrows)`. When `keys` is `None`, every column from the
734    /// [`KeyIndex`] is included.
735    ///
736    /// Use [`MaybeView::row_view`] on the result to obtain a uniform
737    /// `(nrows, ncols)` view regardless of which variant was returned.
738    ///
739    /// # Errors
740    ///
741    /// Returns [`Error::EmptyData`] when `keys` resolves to an empty or
742    /// entirely unknown key set.
743    ///
744    /// # Examples
745    ///
746    /// ```
747    /// use trs_dataframe::{column_frame, Key};
748    ///
749    /// let cf = column_frame! { "x" => [10i32, 20i32], "y" => [30i32, 40i32] };
750    /// let view = cf.select_view(Some(&["x".into(), "y".into()])).unwrap();
751    /// // row_view() gives a (nrows, ncols) view — shape is [2, 2] here.
752    /// assert_eq!(view.row_view().nrows(), 2);
753    /// ```
754    pub fn select_view(&self, keys: Option<&[Key]>) -> Result<MaybeView<'_>, Error> {
755        let keys = keys.unwrap_or_else(|| self.index.get_keys());
756        let indexes = self.index.select(keys);
757        if indexes.is_empty() || keys.is_empty() {
758            return Err(Error::EmptyData);
759        }
760        let ncols = keys.len();
761        let mut owned_cols: Vec<Array1<DataValue>> = Vec::with_capacity(ncols);
762        for col_idx in indexes.indexes() {
763            owned_cols.push(self.get_column_by_idx(col_idx)?.as_generic_array());
764        }
765        let views: Vec<ArrayView1<DataValue>> = owned_cols.iter().map(|a| a.view()).collect();
766        Ok(MaybeView::Array(ndarray::stack(Axis(0), &views)?))
767    }
768
769    /// Returns selected columns as a typed 2D array, converting each [`DataValue`]
770    /// via the [`Extract`] trait.
771    ///
772    /// This is the typed counterpart of [`select`](Self::select). If `keys` is `None`,
773    /// all columns are returned. The data is in column-major (Fortran) order
774    /// internally but the shape is `(nrows, ncols)` — indexing and row
775    /// iteration work identically to a row-major array.
776    ///
777    /// # Type coercion
778    ///
779    /// The [`Extract`] trait performs best-effort numeric coercion (e.g. `I32 -> f64`).
780    /// Values that cannot be meaningfully converted yield the type's default
781    /// (0 for numbers, `false` for bool, empty string for `String`).
782    pub fn select_typed<T: Extract + Clone>(&self, keys: Option<&[Key]>) -> Array2<T> {
783        let keys = keys.unwrap_or_else(|| self.index.get_keys());
784        let indexes = self.index.select(keys);
785        if indexes.is_empty() || keys.is_empty() {
786            return Array2::from_shape_vec((0, 0), vec![]).unwrap();
787        }
788
789        let nrows = self.nrows();
790        let ncols = keys.len();
791        let null_default = T::extract(&DataValue::Null);
792        let mut data = Vec::with_capacity(nrows * ncols);
793
794        for key in keys {
795            match indexes.get_column_index(key) {
796                Some(col_idx) => {
797                    let col = self
798                        .get_column_by_idx(col_idx)
799                        .expect("Cannot get column on index");
800                    for v in col.iter_values() {
801                        data.push(T::extract(&v));
802                    }
803                }
804                None => {
805                    data.resize(data.len() + nrows, null_default.clone());
806                }
807            }
808        }
809
810        Array2::from_shape_vec((nrows, ncols).f(), data)
811            .unwrap_or_else(|_| Array2::from_shape_vec((0, 0), vec![]).unwrap())
812    }
813
814    fn extend_dataframe_for_column(&mut self, key: Key) -> Result<(), Error> {
815        let len = self.nrows();
816        let column = TypedDataArray::default_init(&key, len);
817        self.index.store_key(key);
818        self.data_frame.push(column);
819        Ok(())
820    }
821
822    /// Pushes the row candidate into the [`ColumnFrame`]
823    /// If the column is not found this method will add the column to the [`ColumnFrame`]
824    ///
825    /// This operation clones all values from the candidate. For batch operations,
826    /// consider using [`Self::extend`] instead which can be more efficient.
827    pub fn push<C: CandidateData>(&mut self, row_candidate: C) -> Result<(), Error> {
828        // Pre-allocate with capacity to avoid reallocations
829        let num_keys = self.index.len();
830        let candidate_keys = row_candidate.keys();
831        let mut arr = Vec::with_capacity(num_keys.max(candidate_keys.len()));
832
833        // First pass: extend dataframe with any new columns
834        for key in &candidate_keys {
835            if self.index.get_column_index(key).is_none() {
836                self.extend_dataframe_for_column(key.clone())?;
837            }
838        }
839
840        // Second pass: collect values - capacity may have changed after extension
841        arr.reserve(self.index.len());
842        for index in self.index.get_keys() {
843            arr.push(
844                row_candidate
845                    .get_value_ref(index)
846                    .cloned()
847                    .unwrap_or(DataValue::Null),
848            );
849        }
850
851        self.push_row(arr)?;
852        Ok(())
853    }
854
855    /// Removes the specified columns from this frame and returns them as a new [`ColumnFrame`].
856    ///
857    /// The remaining columns stay in `self` with their data intact.
858    pub fn remove_column(&mut self, keys: &[Key]) -> Result<Self, Error> {
859        let mut removed_index = KeyIndex::default();
860        let removed_data = self.select_typed_columns(Some(keys))?;
861
862        // Collect column indices to remove before modifying the index
863        let mut indices_to_remove: Vec<usize> = keys
864            .iter()
865            .filter_map(|key| self.index.get_column_index(key))
866            .collect();
867
868        // Remove keys from index
869        for key in keys {
870            if let Some((current, _)) = self.index.remove_key(key) {
871                removed_index.store_key(current);
872            }
873        }
874
875        // Remove data columns in reverse order to preserve indices
876        indices_to_remove.sort_unstable();
877        indices_to_remove.dedup();
878        for idx in indices_to_remove.into_iter().rev() {
879            self.data_frame.remove(idx);
880        }
881
882        // Rebuild index with correct sequential indices
883        let remaining_keys = self.index.get_keys().to_vec();
884        self.index = KeyIndex::new(remaining_keys);
885
886        Ok(Self::new(removed_index, removed_data))
887    }
888
889    fn check_or_init_frame(&mut self, other: &Self) -> Result<Continue, Error> {
890        if self.index.is_empty() {
891            self.index = other.index.clone();
892            self.data_frame = other.data_frame.clone();
893            return Ok(Continue::End);
894        }
895        if other.index.is_empty() {
896            return Ok(Continue::End);
897        }
898        if self.is_empty() {
899            let n = other.nrows();
900            self.data_frame = self
901                .index
902                .get_keys()
903                .iter()
904                .map(|k| TypedDataArray::default_init(k, n))
905                .collect();
906        }
907
908        Ok(Continue::Continue)
909    }
910
911    fn extend_columns_from_other(&mut self, other: &Self) -> Result<(), Error> {
912        let missing_keys: Vec<Key> = other
913            .index
914            .get_keys()
915            .iter()
916            .filter(|key| self.index.get_column_index(key).is_none())
917            .cloned()
918            .collect();
919
920        if missing_keys.is_empty() {
921            return Ok(());
922        }
923
924        let nrows = self.nrows();
925        for key in missing_keys {
926            let column = TypedDataArray::default_init(&key, nrows);
927            self.data_frame.push(column);
928            self.index.store_key(key);
929        }
930
931        Ok(())
932    }
933
934    fn try_extend(&mut self, mut other: Self) -> Result<(), Error> {
935        let mut joined_keys = self.index.clone();
936        for key in other.keys() {
937            if self.index.get_column_index(key).is_none() {
938                joined_keys.store_key(key.clone());
939            }
940        }
941
942        let self_nrows = self.nrows();
943        let other_nrows = other.nrows();
944        let mut new_data: Vec<TypedDataArray> = Vec::with_capacity(joined_keys.len());
945
946        for key in joined_keys.get_keys() {
947            let self_col = self
948                .index
949                .get_column_index(key)
950                .map(|i| std::mem::take(&mut self.data_frame[i]));
951            let other_col = other
952                .index
953                .get_column_index(key)
954                .map(|i| std::mem::take(&mut other.data_frame[i]));
955
956            let col = match (self_col, other_col) {
957                (Some(mut s), Some(o)) => {
958                    s.extend_from(&o);
959                    s
960                }
961                (Some(mut s), None) => {
962                    let filler = TypedDataArray::default_init(key, other_nrows);
963                    s.extend_from(&filler);
964                    s
965                }
966                (None, Some(o)) => {
967                    let mut base = TypedDataArray::default_init(key, self_nrows);
968                    base.extend_from(&o);
969                    base
970                }
971                (None, None) => TypedDataArray::default_init(key, self_nrows + other_nrows),
972            };
973            new_data.push(col);
974        }
975
976        *self = ColumnFrame {
977            index: joined_keys,
978            data_frame: new_data,
979        };
980        Ok(())
981    }
982
983    /// Extends the [`ColumnFrame`] with the data from the other [`ColumnFrame`]
984    /// If the [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
985    /// If the other [`KeyIndex`] is empty, nothing happens
986    /// If the length of the [`KeyIndex`] of the other data frame is greater then current,
987    /// an error is returned [`Error::DataSetSizeDoesntMatch`]
988    /// If [`Key`] from other data frame - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
989    ///
990    pub fn extend(&mut self, mut other: Self) -> Result<(), Error> {
991        if self.check_or_init_frame(&other)?.should_end() {
992            return Ok(());
993        }
994
995        if self.index.check_order_of_indexes(&other.index).is_err() {
996            return self.try_extend(other);
997        }
998
999        trace!(
1000            "Extend columns from other {:?} vs {:?}",
1001            other.index.get_keys(),
1002            self.index.get_keys()
1003        );
1004
1005        // Ensure both frames have the same columns (null-filled for missing)
1006        self.extend_columns_from_other(&other)?;
1007        other.extend_columns_from_other(self)?;
1008
1009        // Append other's rows to self's columns (move, not clone — other is owned)
1010        let keys = self.index.get_keys().to_vec();
1011        for key in &keys {
1012            let self_idx = self.index.get_column_index(key).unwrap();
1013            let other_idx = other.index.get_column_index(key).unwrap();
1014            let other_col = std::mem::take(&mut other.data_frame[other_idx]);
1015            let self_col = &mut self.data_frame[self_idx];
1016            self_col.extend_from(&other_col);
1017        }
1018
1019        Ok(())
1020    }
1021
1022    /// Replace the [`ColumnFrame`] with the other [`ColumnFrame`]
1023    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
1024    /// If the other [`KeyIndex`] is empty, nothing happens
1025    /// If the [`KeyIndex`] of the other data frame and current doesn't match an error is returned [`Error::DataSetSizeDoesntMatch`]
1026    /// If the [`Key`] from other data frame is not present in the current [`ColumnFrame`] - extends the [`KeyIndex`] and add column to the current [`ColumnFrame`]
1027    pub fn replace(&mut self, other: Self) -> Result<(), Error> {
1028        if self.check_or_init_frame(&other)?.should_end() {
1029            return Ok(());
1030        }
1031
1032        if self.nrows() > other.nrows() {
1033            return Err(Error::DataSetSizeDoesntMatch(self.nrows(), other.nrows()));
1034        }
1035
1036        self.index = other.index;
1037        self.data_frame = other.data_frame;
1038
1039        Ok(())
1040    }
1041
1042    /// Joins the candidates by the keys in the `JoinRelation::JoinById` struct.
1043    /// This function creates [`Index`] for the keys and then joins the candidates by the keys.
1044    ///
1045    pub fn join_by_id_inner(&mut self, right: Self, keys: &[Key]) -> Result<(), Error> {
1046        if self.check_or_init_frame(&right)?.should_end() {
1047            return Ok(());
1048        }
1049
1050        let timer = std::time::Instant::now();
1051        let new_columns = right.index.get_complement_keys(self.index.get_keys());
1052
1053        // add new columns and keys into the column frame index
1054        self.extend_columns_from_other(&right)?;
1055        tracing::debug!("Extend took {}ns", timer.elapsed().as_nanos());
1056
1057        // Pre-compute column index mappings AFTER extension to get correct indices
1058        let column_mappings: Vec<(usize, usize)> = new_columns
1059            .iter()
1060            .filter_map(|key| {
1061                let left_idx = self.index.get_column_index(key)?;
1062                let right_idx = right.index.get_column_index(key)?;
1063                Some((left_idx, right_idx))
1064            })
1065            .collect();
1066
1067        // get the indexes for the keys
1068        let timer = std::time::Instant::now();
1069        let index = Index::new(keys.to_vec(), self);
1070        tracing::debug!("Left index build took: {}ns", timer.elapsed().as_nanos());
1071        tracing::trace!("Index {index:?}");
1072
1073        let timer = std::time::Instant::now();
1074        let right_index = Index::new(keys.to_vec(), &right);
1075        let joined_idx = index.join(right_index);
1076        tracing::debug!(
1077            "Right index build and join took: {}ns",
1078            timer.elapsed().as_nanos()
1079        );
1080
1081        // Instead of copying all data, we only fill in the new columns
1082        // The existing columns are already correct from extend_columns_from_other
1083        let timer = std::time::Instant::now();
1084        let joined_idx_len = joined_idx.len();
1085
1086        for (left_col_idx, right_col_idx) in &column_mappings {
1087            let right_col = right.get_column_by_idx(*right_col_idx)?;
1088            let left_col = self.get_column_by_idx_mut(*left_col_idx)?;
1089
1090            for (left_indices, right_indices) in &joined_idx {
1091                for right_row_idx in right_indices {
1092                    let value = right_col.get_or_null(*right_row_idx);
1093                    for left_idx in left_indices {
1094                        let _ = left_col.set(*left_idx, value.clone());
1095                    }
1096                }
1097            }
1098        }
1099
1100        let elapsed = timer.elapsed();
1101        tracing::debug!(
1102            "Filled {} rows in {}ms|{}s",
1103            joined_idx_len,
1104            elapsed.as_millis(),
1105            elapsed.as_secs()
1106        );
1107
1108        Ok(())
1109    }
1110
1111    /// Adds a single column to the current [`ColumnFrame`].
1112    ///
1113    /// The column may be provided as anything that converts into a
1114    /// [`TypedData`] — e.g. `Vec<DataValue>`, a typed primitive `Vec<T>`, an
1115    /// `Array1<DataValue>`, or a raw `TypedData`. If the column's dtype is
1116    /// Generic but `key` declares a concrete dtype, the column is coerced to
1117    /// match.
1118    ///
1119    /// Errors:
1120    /// - [`Error::ColumnAlreadyExists`] if `key` is already present.
1121    /// - [`Error::DataSetSizeDoesntMatch`] if the column length doesn't match
1122    ///   the current row count (and the frame is non-empty).
1123    pub fn add_single_column<K, V>(&mut self, key: K, column: V) -> Result<(), Error>
1124    where
1125        K: Into<Key>,
1126        V: Into<TypedDataArray>,
1127    {
1128        let key = key.into();
1129        let mut column: TypedDataArray = column.into();
1130        if self.index.get_column_index(&key).is_some() {
1131            return Err(Error::ColumnAlreadyExists(key));
1132        }
1133        if self.nrows() != column.len() && !self.is_empty() {
1134            return Err(Error::DataSetSizeDoesntMatch(self.nrows(), column.len()));
1135        }
1136
1137        // If existing columns are empty but new column has data, resize existing columns
1138        if self.is_empty() && !column.is_empty() {
1139            let new_len = column.len();
1140            let keys = self.index.get_keys().to_vec();
1141            for (i, existing_col) in self.data_frame.iter_mut().enumerate() {
1142                *existing_col = TypedDataArray::default_init(&keys[i], new_len);
1143            }
1144        }
1145
1146        if matches!(column.data_type(), crate::DataType::Unknown)
1147            && !matches!(key.ctype, crate::DataType::Unknown)
1148        {
1149            let _ = column.try_convert_to_dtype(key.ctype);
1150        }
1151
1152        self.index.store_key(key);
1153        self.data_frame.push(column);
1154        Ok(())
1155    }
1156    /// Adds the columns from the other [`ColumnFrame`] to the current [`ColumnFrame`]
1157    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
1158    /// If the other [`KeyIndex`] is empty, nothing happens
1159    pub fn add_columns(&mut self, other: Self) -> Result<(), Error> {
1160        if self.check_or_init_frame(&other)?.should_end() {
1161            return Ok(());
1162        }
1163
1164        self.extend_columns_from_other(&other)?;
1165        let nrows = self.nrows();
1166        for (idx, key) in other.index.get_keys().iter().enumerate() {
1167            if let Some(index) = self.index.get_column_index(key) {
1168                let arr = match other.get_column_by_idx(idx) {
1169                    Ok(arr) => arr.clone(),
1170                    Err(_) => continue,
1171                };
1172                trace!(
1173                    "Adding column {key:?} at index {idx} vs {index} datasize: self:{} vs other:{}",
1174                    nrows,
1175                    arr.len()
1176                );
1177                let dst = self.get_column_by_idx_mut(index)?;
1178                if arr.len() != nrows {
1179                    dst.fill(DataValue::Null);
1180                } else {
1181                    dst.assign(&arr);
1182                }
1183            }
1184        }
1185        Ok(())
1186    }
1187
1188    /// Broadcasts the data from the other [`ColumnFrame`] to the current [`ColumnFrame`]
1189    /// If the current [`KeyIndex`] is empty, the [`ColumnFrame`] is replaced with the other [`ColumnFrame`]
1190    /// If the other [`KeyIndex`] is empty, nothing happens
1191    /// If the length (number of rows) of the other data frame is greater then 1 an error is returned [`Error::CannotBroadcast`]
1192    pub fn broadcast(&mut self, other: Self) -> Result<(), Error> {
1193        if self.check_or_init_frame(&other)?.should_end() {
1194            return Ok(());
1195        }
1196        if other.nrows() != 1 {
1197            return Err(Error::CannotBroadcast);
1198        }
1199
1200        // Get all keys (current + new from other)
1201        let other_keys: Vec<_> = other
1202            .index
1203            .get_keys()
1204            .iter()
1205            .filter(|k| self.index.get_column_index(k).is_none())
1206            .cloned()
1207            .collect();
1208
1209        let nrows = self.nrows();
1210        // Add new keys to index
1211        for key in &other_keys {
1212            self.index.store_key(key.clone());
1213            let value = other.get_column(key)?;
1214            let first = value.get_or_null(0);
1215            let mut new_col = TypedDataArray::default_init(key, nrows);
1216            new_col.fill(first);
1217            self.data_frame.push(new_col);
1218        }
1219
1220        Ok(())
1221    }
1222
1223    /// Replaces `self` with the Cartesian product of `self` and `other` —
1224    /// every row of `self` paired with every row of `other`.
1225    ///
1226    /// Existing column order in `self` is preserved; columns that exist only
1227    /// in `other` are appended at the end.
1228    pub fn cartesian_product(&mut self, other: Self) -> Result<(), Error> {
1229        if self.check_or_init_frame(&other)?.should_end() {
1230            return Ok(());
1231        }
1232
1233        let self_nrows = self.nrows();
1234        let other_nrows = other.nrows();
1235        let max_rows = self_nrows * other_nrows;
1236
1237        // Build combined index
1238        for other_key in other.keys() {
1239            if self.index.get_column_index(other_key).is_none() {
1240                self.index.store_key(other_key.clone());
1241            } else {
1242                self.index.store_key(Key::new(
1243                    format!("{}-{}", other_key, other_key.id()).as_str(),
1244                    other_key.ctype,
1245                ));
1246            }
1247        }
1248
1249        let mut df: Vec<TypedDataArray> = Vec::with_capacity(self.index.len());
1250        // swap pointers to data to be able update dataset
1251        std::mem::swap(&mut df, &mut self.data_frame);
1252        // Self columns: repeat each value other_nrows times
1253        for col in df.into_iter() {
1254            let mut new_col = vec![DataValue::Null; max_rows];
1255            for (idx, value) in col.iter_values().enumerate() {
1256                for self_idx in 0..other_nrows {
1257                    new_col[self_idx + (idx * other_nrows)] = value.clone();
1258                }
1259            }
1260            self.data_frame
1261                .push(TypedDataArray::new(col.data_type(), new_col));
1262        }
1263
1264        // Other columns: tile the column self_nrows times
1265        for col in other.data_frame.into_iter() {
1266            let mut new_col: Vec<DataValue> = Vec::with_capacity(max_rows);
1267            let tile = col.to_vec();
1268            for _ in 0..self_nrows {
1269                new_col.extend(tile.iter().cloned());
1270            }
1271            self.data_frame
1272                .push(TypedDataArray::new(col.data_type(), new_col));
1273        }
1274
1275        Ok(())
1276    }
1277
1278    /// Joins the candidates with the other candidates by the [`JoinRelation`] policy.
1279    /// For [`JoinBy::AddColumns`] the columns are added to the existing structure via [`Self::add_columns`]
1280    /// For [`JoinBy::Replace`] the columns are replaced with the new columns
1281    /// For [`JoinBy::Extend`] the candidates are extended via [`Self::extend`]
1282    /// For [`JoinBy::Broadcast`] each candidate is extended with the values of the other candidates `Self::broadcast`
1283    /// For [`JoinBy::CartesianProduct`] the candidates are multiplied by the other candidates
1284    /// For [`JoinBy::JoinById`] the candidates are joined by the keys in the `JoinRelation::JoinById` struct see [`Self::join_by_id_inner`]
1285    pub fn join(&mut self, right: Self, join_type: &JoinRelation) -> Result<(), Error> {
1286        use JoinBy::*;
1287        match &join_type.join_type {
1288            AddColumns => self.add_columns(right),
1289            Replace => self.replace(right),
1290            Extend => self.extend(right),
1291            Broadcast => self.broadcast(right),
1292            CartesianProduct => self.cartesian_product(right),
1293            JoinById(join) => self.join_by_id_inner(right, &join.keys),
1294        }
1295    }
1296
1297    /// Returns a single column materialized as an owned [`Array1<DataValue>`].
1298    ///
1299    /// Typed columns allocate a [`DataValue`] per element on the fly. For
1300    /// zero-copy typed access, use [`Self::get_column`] to obtain the
1301    /// underlying [`TypedData`] directly.
1302    #[deprecated(note = "allocates O(n); use get_column() for zero-copy typed access")]
1303    pub fn get_single_column(&self, key: &Key) -> Option<Array1<DataValue>> {
1304        self.index
1305            .get_column_index(key)
1306            .and_then(|x| self.get_column_by_idx(x).ok())
1307            .map(|col| col.as_generic_array())
1308    }
1309
1310    /// Returns a column extracted into a typed [`Array1<T>`], where each
1311    /// [`DataValue`] is converted via the [`Extract`] trait. Returns `None` if
1312    /// the key does not exist in the frame.
1313    ///
1314    /// # Type coercion
1315    ///
1316    /// The [`Extract`] trait performs best-effort numeric coercion (e.g.
1317    /// `I32 -> f64`). Values that cannot be meaningfully converted yield the
1318    /// type's default (0 for numbers, `false` for bool, empty string for
1319    /// `String`).
1320    pub fn get_single_column_typed<T: Extract>(&self, key: &Key) -> Option<Array1<T>> {
1321        self.index
1322            .get_column_index(key)
1323            .and_then(|x| self.get_column_by_idx(x).ok())
1324            .map(|col| col.to_typed_array())
1325    }
1326
1327    /// Returns a [`SortedDataFrame`](sorted_df::SortedDataFrame) view sorted ascending by the given column.
1328    ///
1329    /// `Null` values are pushed to the end. Ties are broken by original row order.
1330    /// Use [`SortedDataFrame::topn`](sorted_df::SortedDataFrame::topn) to efficiently
1331    /// extract the first/last N rows.
1332    pub fn sorted(&self, key: &Key) -> Result<sorted_df::SortedDataFrame<'_>, Error> {
1333        let index = self
1334            .index
1335            .get_column_index(key)
1336            .ok_or(Error::NotFound(key.clone()))?;
1337        let column = self.get_column_by_idx(index)?;
1338        let values: Vec<DataValue> = column.to_vec();
1339        let mut data_with_index: Vec<(usize, &DataValue)> = values.iter().enumerate().collect();
1340        tracing::trace!("Sorting by key: {key:?} vals {data_with_index:?}");
1341        data_with_index.sort_by(
1342            |(a_idx, a_val), (b_idx, b_val)| match a_val.partial_cmp(b_val) {
1343                Some(ordering) => ordering.then_with(|| a_idx.cmp(b_idx)),
1344                None => {
1345                    let a_null = matches!(a_val, DataValue::Null);
1346                    let b_null = matches!(b_val, DataValue::Null);
1347                    match (a_null, b_null) {
1348                        (true, true) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
1349                        (true, false) => std::cmp::Ordering::Greater.then_with(|| a_idx.cmp(b_idx)),
1350                        (false, true) => std::cmp::Ordering::Less.then_with(|| a_idx.cmp(b_idx)),
1351                        (false, false) => std::cmp::Ordering::Equal.then_with(|| a_idx.cmp(b_idx)),
1352                    }
1353                }
1354            },
1355        );
1356
1357        tracing::trace!("Sorted by key: {key:?} vals {data_with_index:?}");
1358        let indicies = data_with_index
1359            .into_iter()
1360            .map(|(idx, _)| idx)
1361            .collect::<Vec<_>>();
1362
1363        Ok(sorted_df::SortedDataFrame::new(self, indicies))
1364    }
1365
1366    /// Returns a new [`ColumnFrame`] containing only rows that match the filter expression.
1367    ///
1368    /// The filter is applied against each row and matching row indices are collected,
1369    /// de-duplicated, and used to build the result.
1370    pub fn filter(&self, filter: &crate::filter::FilterRules) -> Result<Self, Error> {
1371        let mut final_indices = Vec::new();
1372        let filter_df = filter_df::ColumnFrameFiltering { column_frame: self };
1373        for rule in &filter.rules {
1374            final_indices.extend(crate::filter::filter_combination(&filter_df, rule)?);
1375        }
1376
1377        final_indices.sort_unstable();
1378        final_indices.dedup();
1379
1380        // let new_data: Vec<Array1<DataValue>> = self
1381        //     .data_frame
1382        //     .iter()
1383        //     .map(|col| {
1384        //         Array1::from_vec(final_indices.iter().map(|&idx| col[idx].clone()).collect())
1385        //     })
1386        //     .collect();
1387        let new_data: Vec<Vec<DataValue>> = self
1388            .data_frame
1389            .iter()
1390            .map(|col| {
1391                final_indices
1392                    .iter()
1393                    .map(|&idx| col.get_or_null(idx))
1394                    .collect::<Vec<DataValue>>()
1395            })
1396            .collect();
1397
1398        Ok(ColumnFrame::new(self.index.clone(), new_data))
1399    }
1400}
1401
1402#[macro_export]
1403macro_rules! df {
1404    ($($everything:tt)*) => {
1405        $crate::DataFrame::new($crate::column_frame!($($everything)*))
1406    };
1407}
1408
1409#[macro_export]
1410macro_rules! column_frame {
1411    // case { "a" => 1, }
1412    // ($($key:expr => $value:expr $(,)?)+) => { $crate::column_frame!($($key => $value),*) };
1413    // case { "a" => vec![1, 2, 3] }
1414    ($($key:expr => vec![$($value:expr),*]),*) => {
1415        $crate::column_frame!($($key => [$($value),*]),*)
1416    };
1417    // case { "a" => [1, 2, 3] }
1418    ($($key:expr => [$($value:expr),*]),*) => {
1419        {
1420           let data: ::std::vec::Vec<::std::vec::Vec<$crate::data_value::DataValue>> = vec!($(
1421                vec![$($value.into(),)*],
1422            )*);
1423
1424           let _keys = vec![$($key.into(),)*];
1425
1426            $crate::ColumnFrame::new(
1427                $crate::KeyIndex::new(_keys),
1428                data
1429            )
1430
1431        }
1432    };
1433    // case { "a" => 1 }
1434    ($($key:expr => $value:expr $(,)?)*) => {
1435        {
1436           let _data: ::std::vec::Vec<::std::vec::Vec<$crate::data_value::DataValue>> =
1437               vec![$(vec![$value.into()],)*];
1438           let _keys = vec![$($key.into(),)*];
1439            tracing::trace!("{_keys:?}, {_data:?}");
1440            $crate::ColumnFrame::new(
1441                $crate::KeyIndex::new(_keys),
1442                _data,
1443            )
1444        }
1445    };
1446}
1447
1448#[cfg(test)]
1449mod test {
1450    use crate::{filter::FilterRules, JoinById};
1451
1452    use super::*;
1453    use data_value::stdhashmap;
1454    use rstest::*;
1455    use tracing_test::traced_test;
1456
1457    #[rstest]
1458    #[case(
1459        column_frame! {
1460            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1461            "b" => [4, 5, 6],
1462            "c" => [7, 8, 9]
1463        },
1464        column_frame! {
1465            "t" => [1752001987000000u64],
1466            "b" => [5],
1467            "c" => [8]
1468        },
1469        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1470    )]
1471    #[case(
1472        column_frame! {
1473            "t" => [1751001987000000f64, 1752001987000000f64, 1753001987000000f64],
1474            "b" => [4, 5, 6],
1475            "c" => [7, 8, 9]
1476        },
1477        column_frame! {
1478            "t" => [1752001987000000f64],
1479            "b" => [5],
1480            "c" => [8]
1481        },
1482        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1483    )]
1484    #[case(
1485        column_frame! {
1486            "t" => [1751001987000000i64, 1752001987000000i64, 1753001987000000i64],
1487            "b" => [4, 5, 6],
1488            "c" => [7, 8, 9]
1489        },
1490        column_frame! {
1491            "t" => [1752001987000000i64],
1492            "b" => [5],
1493            "c" => [8]
1494        },
1495        FilterRules::try_from("t.to_datetime_us() == '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1496    )]
1497    #[case(
1498        column_frame! {
1499            "t" => [1751001987000000u64, 1752001987000000u64, 1753001987000000u64],
1500            "b" => [4, 5, 6],
1501            "c" => [7, 8, 9]
1502        },
1503        column_frame! {
1504            "t" => [1751001987000000u64],
1505            "b" => [4],
1506            "c" => [7]
1507        },
1508        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1509    )]
1510    #[case(
1511        column_frame! {
1512            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1513            "b" => [4, 5, 6],
1514            "c" => [7, 8, 9]
1515        },
1516        column_frame! {
1517            "t" => ["2025-07-08 18:13:07"],
1518            "b" => [4],
1519            "c" => [7]
1520        },
1521        FilterRules::try_from("t.to_datetime_us() < '2025-07-08 19:13:07'").expect("BUG: cannot create filter rules"),
1522    )]
1523    #[case(
1524        column_frame! {
1525            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1526            "b" => [4, 5, 6],
1527            "c" => [7, 8, 9]
1528        },
1529        column_frame! {
1530            "t" => [],
1531            "b" => [],
1532            "c" => []
1533        },
1534        FilterRules::try_from("t.len() < 10u64").expect("BUG: cannot create filter rules"),
1535    )]
1536    #[case(
1537        column_frame! {
1538            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1539            "b" => [4, 5, 6],
1540            "c" => [7, 8, 9]
1541        },
1542        column_frame! {
1543            "t" => ["2025-07-08 18:13:07", "2025-07-08 19:13:07", "2025-07-08 20:13:07"],
1544            "b" => [4, 5, 6],
1545            "c" => [7, 8, 9]
1546        },
1547        FilterRules::try_from("t.len() > 10u64").expect("BUG: cannot create filter rules"),
1548    )]
1549    #[case(
1550        column_frame! {
1551            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1552            "b" => [4, 5, 6],
1553            "c" => [7, 8, 9]
1554        },
1555        column_frame! {
1556            "t" => [DataValue::Vec(vec![])],
1557            "b" => [5],
1558            "c" => [ 8]
1559        },
1560        FilterRules::try_from("t.len() == 0u64").expect("BUG: cannot create filter rules"),
1561    )]
1562    #[case(
1563        column_frame! {
1564            "t" => [DataValue::Vec(vec![1.into(), 2.into(), 3.into()]), DataValue::Vec(vec![]), DataValue::Vec(vec![1.into()])],
1565            "b" => [4, 5, 6],
1566            "c" => [7, 8, 9]
1567        },
1568        column_frame! {
1569            "t" => [DataValue::Vec(vec![1.into()])],
1570            "b" => [6],
1571            "c" => [9]
1572        },
1573        FilterRules::try_from("t.len() == 1u64").expect("BUG: cannot create filter rules"),
1574    )]
1575    #[case(
1576        column_frame! {
1577            "a" => [1, 2, 3],
1578            "b" => [4, 5, 6],
1579            "c" => [7, 8, 9]
1580        },
1581        column_frame! {
1582            "a" => [1, 2],
1583            "b" => [4, 5],
1584            "c" => [7, 8]
1585        },
1586        FilterRules::try_from("a <= 2i32").expect("BUG: cannot create filter rules"),
1587    )]
1588    #[case(
1589        column_frame! {
1590            "a" => [1, 2, 3],
1591            "b" => [4, 5, 6],
1592            "c" => [7, 8, 9]
1593        },
1594        column_frame! {
1595            "a" => [2],
1596            "b" => [5],
1597            "c" => [8]
1598        },
1599        FilterRules::try_from("a <= 2i32 && c > 7i32").expect("BUG: cannot create filter rules"),
1600    )]
1601    #[case(
1602        column_frame! {
1603            "a" => [1, 2, 3],
1604            "b" => [4, 5, 6],
1605            "c" => [7, 8, 9]
1606        },
1607        column_frame! {
1608            "a" => [],
1609            "b" => [],
1610            "c" => []
1611        },
1612        FilterRules::try_from("a <= 2i32 && c > 9i32").expect("BUG: cannot create filter rules"),
1613    )]
1614    #[case(
1615        column_frame! {
1616            "a" => [1, 2, 3],
1617            "b" => [4, 5, 6],
1618            "c" => [7, 8, 9]
1619        },
1620        column_frame! {
1621            "a" => [1, 2],
1622            "b" => [4, 5],
1623            "c" => [7, 8]
1624        },
1625        FilterRules::try_from("a <= 2i32 || c > 9i32").expect("BUG: cannot create filter rules"),
1626    )]
1627    #[case(
1628        column_frame! {
1629            "a" => [1, 2, 3],
1630            "b" => [4, 5, 6],
1631            "c" => [7, 8, 9]
1632        },
1633        column_frame! {
1634            "a" => [2],
1635            "b" => [5],
1636            "c" => [8]
1637        },
1638        FilterRules::try_from("a <= 2i32 && (c > 9i32 || b == 5i32)").expect("BUG: cannot create filter rules"),
1639    )]
1640    #[case(
1641        column_frame! {
1642            "a" => ["abcd", "ab", "abcdefg"],
1643            "b" => [4, 5, 6],
1644            "c" => [7, 8, 9]
1645        },
1646        column_frame! {
1647            "a" => ["abcd","abcdefg"],
1648            "b" => [4, 6],
1649            "c" => [7, 9]
1650        },
1651        FilterRules::try_from("a ~= 'abcd.*'").expect("BUG: cannot create filter rules"),
1652    )]
1653    #[case(
1654        column_frame! {
1655            "a" => [1, 2, 3],
1656            "b" => [4, 5, 6],
1657            "c" => [7, 8, 9]
1658        },
1659        column_frame! {
1660            "a" => [1],
1661            "b" => [4],
1662            "c" => [7]
1663        },
1664        FilterRules::try_from("a in [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1665    )]
1666    #[case(
1667        column_frame! {
1668            "a" => [1, 2, 3],
1669            "b" => [4, 5, 6],
1670            "c" => [7, 8, 9]
1671        },
1672        column_frame! {
1673            "a" => [2, 3],
1674            "b" => [5, 6],
1675            "c" => [8, 9]
1676        },
1677        FilterRules::try_from("a notIn [1u32, 1i32]'").expect("BUG: cannot create filter rules"),
1678    )]
1679    #[case(
1680        column_frame! {
1681            "a" => [1f64, 2f64, 3f64],
1682            "b" => [4, 5, 6],
1683            "c" => [7, 8, 9]
1684        },
1685        column_frame! {
1686            "a" => [1f64, 2f64],
1687            "b" => [4, 5],
1688            "c" => [7, 8]
1689        },
1690        FilterRules::try_from("a < 3f64 || (a < 3f64 && b <= 5i32)").expect("BUG: cannot create filter rules"),
1691    )]
1692    #[case(
1693        column_frame! {
1694            "a" => [1f64, 2f64, 3f64],
1695            "b" => [4i64, 5i64, 6i64],
1696            "c" => [7i64, 8i64, 9i64]
1697        },
1698        column_frame! {
1699            "a" => [1f64, 2f64],
1700            "b" => [4i64, 5i64],
1701            "c" => [7i64, 8i64]
1702        },
1703        FilterRules::try_from("a >= 1f64 && (b <= 5 || c <= 8) && b >= 4").expect("BUG: cannot create filter rules"),
1704    )]
1705    #[traced_test]
1706    fn filter_test(
1707        #[case] df: ColumnFrame,
1708        #[case] expected: ColumnFrame,
1709        #[case] filter: FilterRules,
1710    ) {
1711        let filtered = df.filter(&filter).expect("BUG: cannot filter");
1712        assert_eq!(filtered, expected);
1713    }
1714
1715    #[rstest]
1716    #[traced_test]
1717    fn test_macro() {
1718        let df = column_frame! {
1719            "a" => 1,
1720            "b" => 2,
1721            "c" => 3,
1722            "d" => 4,
1723        };
1724
1725        assert_eq!(df.nrows(), 1);
1726        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into(), "d".into()]);
1727        let f = Array2::from_shape_vec((1, 4), vec![1.into(), 2.into(), 3.into(), 4.into()])
1728            .expect("BUG: cannot create array");
1729        assert_eq!(df.select(None), f);
1730
1731        let df = column_frame! {
1732            "a" => [1, 2, 3],
1733            "b" => [4, 5, 6],
1734            "c" => [7, 8, 9]
1735        };
1736
1737        assert_eq!(df.nrows(), 3);
1738        assert_eq!(df.keys(), &["a".into(), "b".into(), "c".into()]);
1739        let f = Array2::from_shape_vec(
1740            (3, 3),
1741            vec![
1742                1.into(),
1743                4.into(),
1744                7.into(),
1745                2.into(),
1746                5.into(),
1747                8.into(),
1748                3.into(),
1749                6.into(),
1750                9.into(),
1751            ],
1752        )
1753        .expect("BUG: cannot create array");
1754        let selected = df.select(None);
1755        trace!("{selected:?}");
1756        assert_eq!(selected, f);
1757
1758        let df1 = df! {
1759            "a" => [1, 2, 3],
1760            "b" => [4, 5, 6],
1761            "c" => [7, 8, 9]
1762        };
1763
1764        // just to make sure we've tested Display
1765        let formatted = format!("{}", df);
1766        debug!("{}", formatted);
1767
1768        assert_eq!(df1, crate::DataFrame::from(df));
1769    }
1770
1771    #[rstest]
1772    #[case(
1773        column_frame! {
1774            "a" => [1, 2, 3],
1775            "b" => [4, 5, 6],
1776            "c" => [7, 8, 9]
1777        },
1778        column_frame! {
1779            "a_new" => [1, 2, 3],
1780            "b" => [4, 5, 6],
1781            "c" => [7, 8, 9]
1782        },
1783        vec!["a_new", "b", "c"].into_iter().map(|x| x.into()).collect(),
1784        vec![("a", "a_new".into())]
1785    )]
1786    #[traced_test]
1787    fn rename_test(
1788        #[case] df: ColumnFrame,
1789        #[case] expected: ColumnFrame,
1790        #[case] keys: Vec<Key>,
1791        #[case] renames: Vec<(&str, Key)>,
1792    ) {
1793        let mut df = df;
1794        for (old, new) in renames {
1795            df.rename_key(old, new).expect("BUG: cannot rename key");
1796        }
1797        assert_eq!(df, expected);
1798        assert_eq!(df.keys(), keys.as_slice());
1799    }
1800
1801    #[rstest]
1802    #[case(
1803        column_frame!("a" => [1, 2, 3]),
1804        Key::new("a", crate::DataType::I32),
1805        column_frame!("a" => [1i32, 2i32, 3i32])
1806    )]
1807    #[case(
1808        column_frame!("a" => [1, 2, 3]),
1809        Key::new("a", crate::DataType::U32),
1810        column_frame!("a" => [1u32, 2u32, 3u32])
1811    )]
1812    #[case(
1813        column_frame!("a" => [1, 2, 3]),
1814        Key::new("a", crate::DataType::I64),
1815        column_frame!("a" => [1i64, 2i64, 3i64])
1816    )]
1817    #[case(
1818        column_frame!("a" => [1, 2, 3]),
1819        Key::new("a", crate::DataType::U64),
1820        column_frame!("a" => [1u64, 2u64, 3u64])
1821    )]
1822    #[case(
1823        column_frame!("a" => [1, 2, 3]),
1824        Key::new("a", crate::DataType::F64),
1825        column_frame!("a" => [1f64, 2f64, 3f64])
1826    )]
1827    #[case(
1828        column_frame!("a" => [1, 2, 3]),
1829        Key::new("a", crate::DataType::F32),
1830        column_frame!("a" => [1f32, 2f32, 3f32])
1831    )]
1832    // #[case(
1833    //     column_frame!("a" => [1, 2, 3]),
1834    //     Key::new("a", crate::DataType::String),
1835    //     column_frame!("a" => ["1", "2", "3"])
1836    // )]
1837    fn test_try_fix_dtype(
1838        #[case] mut df: ColumnFrame,
1839        #[case] key: Key,
1840        #[case] expected: ColumnFrame,
1841    ) {
1842        assert!(df.try_fix_column_by_key(&key).is_ok());
1843        assert_eq!(
1844            df.select(Some(&[key.clone()])),
1845            expected.select(Some(&[key.clone()]))
1846        );
1847    }
1848
1849    #[fixture]
1850    fn unknown_df() -> ColumnFrame {
1851        let mut hm: HashMap<String, Vec<DataValue>> = HashMap::new();
1852
1853        hm.insert("a".into(), vec![1u32.into()]);
1854        hm.insert("b".into(), vec![3i64.into()]);
1855        hm.insert("c".into(), vec![1f64.into()]);
1856        hm.insert("d".into(), vec![1u64.into()]);
1857
1858        hm.into()
1859    }
1860    #[rstest]
1861    #[case(stdhashmap!(
1862        "a" => crate::DataType::U32,
1863        "b" => crate::DataType::I64,
1864        "c" => crate::DataType::F64,
1865        "d" => crate::DataType::U64)
1866    )]
1867    fn test_try_fix_dtype_unknown(
1868        mut unknown_df: ColumnFrame,
1869        #[case] dtypes: HashMap<String, crate::DataType>,
1870    ) {
1871        for dtype in dtypes.iter() {
1872            let t: &Key = unknown_df
1873                .keys()
1874                .iter()
1875                .find(|x| x.name() == dtype.0)
1876                .unwrap();
1877            assert_ne!(t.ctype, crate::DataType::Unknown);
1878        }
1879        assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1880        for dtype in dtypes.iter() {
1881            let t: &Key = unknown_df
1882                .keys()
1883                .iter()
1884                .find(|x| x.name() == dtype.0)
1885                .unwrap();
1886            assert_eq!(t.ctype, *dtype.1);
1887            assert!(unknown_df.try_fix_dtype_for_keys(false).is_ok());
1888        }
1889        assert!(unknown_df.try_fix_dtype_for_keys(true).is_ok());
1890    }
1891
1892    #[rstest]
1893    #[case(
1894        column_frame!(Key::new("a", crate::DataType::F32) => [1, 2, 3]),
1895        Key::new("a", crate::DataType::F32),
1896        column_frame!("a" => [1f32, 2f32, 3f32])
1897    )]
1898    #[traced_test]
1899    fn test_try_fix(#[case] mut df: ColumnFrame, #[case] key: Key, #[case] expected: ColumnFrame) {
1900        assert!(df.try_fix_dtype().is_ok());
1901        assert_eq!(
1902            df.select(Some(&[key.clone()])),
1903            expected.select(Some(&[key]))
1904        )
1905    }
1906
1907    #[rstest]
1908    #[traced_test]
1909    fn test_not_key_fix() {
1910        let mut cf = column_frame!("a" => [1]);
1911        let non_existing = Key::new("b", crate::DataType::I32);
1912        assert!(cf.try_fix_column_by_key(&non_existing).is_err());
1913    }
1914
1915    #[rstest]
1916    #[case(
1917        column_frame! {
1918            "a" => [1, 2, 3],
1919            "b" => [4, 5, 6],
1920            "c" => [7, 8, 9]
1921        },
1922        vec!["a_alias", "b", "c"].into_iter().map(|x| x.into()).collect(),
1923        vec![("a", "a_alias")]
1924    )]
1925    #[traced_test]
1926    fn alias_test(
1927        #[case] df: ColumnFrame,
1928        #[case] keys: Vec<Key>,
1929        #[case] aliases: Vec<(&str, &str)>,
1930    ) {
1931        let mut df = df;
1932        for (old, new) in aliases {
1933            df.add_alias(old, new).expect("BUG: cannot rename key");
1934        }
1935        let origin_keys = df.keys().to_vec();
1936        let selected_aliases = df.select(Some(keys.as_slice()));
1937        let selected = df.select(Some(origin_keys.as_slice()));
1938        assert_eq!(selected, selected_aliases);
1939    }
1940
1941    #[rstest]
1942    #[traced_test]
1943    fn test_mut_view() {
1944        // Column "a": [1f64, 2f64, f64::NAN]
1945        // Column "b": [4f32, f32::NAN, f32::INFINITY]
1946        let keys: Vec<Key> = vec!["a".into(), "b".into()];
1947        let index = KeyIndex::new(keys.clone());
1948        let data_frame = vec![
1949            Array1::from_vec(vec![
1950                DataValue::from(1f64),
1951                DataValue::from(2f64),
1952                DataValue::from(f64::NAN),
1953            ]),
1954            Array1::from_vec(vec![
1955                DataValue::from(4f32),
1956                DataValue::from(f32::NAN),
1957                DataValue::from(f32::INFINITY),
1958            ]),
1959        ];
1960        let mut df = ColumnFrame::new(index.clone(), data_frame);
1961        for col in &mut df.data_frame {
1962            col.mapv_inplace(|x| match x {
1963                DataValue::F32(f) if f.is_infinite() || f.is_nan() => DataValue::F32(0f32),
1964                DataValue::F64(f) if f.is_infinite() || f.is_nan() => DataValue::F64(0f64),
1965                e => e,
1966            });
1967        }
1968        let expected = ColumnFrame::new(
1969            index,
1970            vec![
1971                Array1::from_vec(vec![
1972                    DataValue::from(1f64),
1973                    DataValue::from(2f64),
1974                    DataValue::from(0f64),
1975                ]),
1976                Array1::from_vec(vec![
1977                    DataValue::from(4f32),
1978                    DataValue::from(0f32),
1979                    DataValue::from(0f32),
1980                ]),
1981            ],
1982        );
1983        assert_eq!(df, expected);
1984    }
1985
1986    #[rstest]
1987    #[traced_test]
1988    fn dummy_test() {
1989        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
1990        let index = KeyIndex::new(keys.clone());
1991        let data_frame = vec![
1992            Array1::from_vec(vec![DataValue::U32(1)]),
1993            Array1::from_vec(vec![DataValue::I32(2)]),
1994            Array1::from_vec(vec![DataValue::I64(3)]),
1995            Array1::from_vec(vec![DataValue::U64(4)]),
1996        ];
1997
1998        let frame = ColumnFrame::new(index, data_frame);
1999        assert_eq!(
2000            frame.get_by_row_index(&"a".into(), 0),
2001            Some(DataValue::U32(1))
2002        );
2003        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
2004        assert_eq!(frame.get_by_row_index(&"a".into(), 1), None);
2005        assert_eq!(
2006            frame.select(Some(&["a".into(), "b".into()])),
2007            Array2::from_shape_vec((1, 2), vec![DataValue::U32(1), DataValue::I32(2)])
2008                .expect("BUG: cannot create array")
2009        );
2010    }
2011
2012    #[rstest]
2013    #[traced_test]
2014    fn dummy_test_multiple_rows() {
2015        // Row-major: [1, 2, 3, 4], [12, 22, 32, 42]
2016        // Column-oriented:
2017        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
2018        let index = KeyIndex::new(keys.clone());
2019        let data_frame = vec![
2020            Array1::from_vec(vec![DataValue::U32(1), DataValue::U32(12)]),
2021            Array1::from_vec(vec![DataValue::I32(2), DataValue::I32(22)]),
2022            Array1::from_vec(vec![DataValue::I64(3), DataValue::I64(32)]),
2023            Array1::from_vec(vec![DataValue::U64(4), DataValue::U64(42)]),
2024        ];
2025
2026        let frame = ColumnFrame::new(index, data_frame);
2027        assert_eq!(
2028            frame.get_by_row_index(&"a".into(), 0),
2029            Some(DataValue::U32(1))
2030        );
2031        assert_eq!(frame.get_by_row_index(&"aa".into(), 0), None);
2032        assert_eq!(frame.get_by_row_index(&"a".into(), 3), None);
2033        let arr = Array2::from_shape_vec(
2034            (2, 2),
2035            vec![
2036                DataValue::U32(1),
2037                DataValue::I32(2),
2038                DataValue::U32(12),
2039                DataValue::I32(22),
2040            ],
2041        )
2042        .expect("BUG: cannot create array");
2043        trace!("{arr:?}");
2044        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
2045    }
2046
2047    #[rstest]
2048    #[traced_test]
2049    fn dummy_test_multiple_rows_push() {
2050        let keys: Vec<Key> = vec!["a".into(), "b".into(), "c".into(), "d".into()];
2051        let index = KeyIndex::new(keys.clone());
2052        let data_frame = vec![
2053            Array1::from_vec(vec![DataValue::U32(1), DataValue::U32(12)]),
2054            Array1::from_vec(vec![DataValue::I32(2), DataValue::I32(22)]),
2055            Array1::from_vec(vec![DataValue::I64(3), DataValue::I64(32)]),
2056            Array1::from_vec(vec![DataValue::U64(4), DataValue::U64(42)]),
2057        ];
2058
2059        let mut frame = ColumnFrame::new(index, data_frame);
2060        assert!(frame
2061            .push(data_value::stdhashmap!(
2062                "a" => DataValue::U32(2),
2063                "b" => DataValue::I32(3),
2064                "c" => DataValue::I64(4),
2065                "d" => DataValue::U64(5)
2066            ))
2067            .is_ok());
2068        let arr = Array2::from_shape_vec(
2069            (3, 2),
2070            vec![
2071                DataValue::U32(1),
2072                DataValue::I32(2),
2073                DataValue::U32(12),
2074                DataValue::I32(22),
2075                DataValue::U32(2),
2076                DataValue::I32(3),
2077            ],
2078        )
2079        .expect("BUG: cannot create array");
2080        trace!("{arr:?}");
2081        assert_eq!(frame.select(Some(&["a".into(), "b".into()])), arr);
2082        let result = frame.push(data_value::stdhashmap!(
2083            "a" => DataValue::U32(34),
2084            "b" => DataValue::I32(44),
2085            "c" => DataValue::I64(54),
2086            "e" => DataValue::F32(6f32)
2087        ));
2088        assert!(result.is_ok(), "{result:?}");
2089        let arr = Array2::from_shape_vec(
2090            (4, 2),
2091            vec![
2092                DataValue::U64(4),
2093                DataValue::Null,
2094                DataValue::U64(42),
2095                DataValue::Null,
2096                DataValue::U64(5),
2097                DataValue::Null,
2098                DataValue::Null,
2099                DataValue::F32(6f32),
2100            ],
2101        )
2102        .expect("BUG: cannot create array");
2103        trace!("{arr:?}");
2104        assert_eq!(frame.select(Some(&["d".into(), "e".into()])), arr);
2105    }
2106
2107    #[rstest]
2108    #[case(
2109        column_frame! {
2110            "group_id" => vec![1, 2],
2111            "feed_tag" => vec![3, 4]
2112        },
2113        Some(vec![Key::from("group_id")]),
2114        ndarray::array!([1.into()], [2.into()])
2115    )]
2116    #[case(
2117        column_frame! {
2118            "group_id" => vec![1, 2],
2119            "feed_tag" => vec![3, 4]
2120        },
2121        Some(vec!["group_id".into(), "feed_tag".into()]),
2122        ndarray::array!([1.into(), 3.into()], [2.into(), 4.into()])
2123    )]
2124    #[case(
2125        column_frame! {
2126            "group_id" => vec![1, 2],
2127            "feed_tag" => vec![3, DataValue::Null]
2128        },
2129        Some(vec!["feed_tag".into()]),
2130        ndarray::array![[3.into()], [DataValue::Null]]
2131    )]
2132    #[case(
2133        column_frame! {
2134            "group_id" => vec![1, 2],
2135            "feed_tag" => vec![1, DataValue::Null]
2136        },
2137        Some(vec!["feed_tag2".into()]),
2138       Array2::<DataValue>::default((0, 0))
2139    )]
2140    #[traced_test]
2141    fn test_select(
2142        #[case] input: ColumnFrame,
2143        #[case] keys: Option<Vec<Key>>,
2144        #[case] expected: Array2<DataValue>,
2145    ) {
2146        trace!("input={input:?}");
2147        let keys_slice = keys.as_deref();
2148        let selected = input.select(keys_slice);
2149        trace!("selected={selected:?}");
2150        assert_eq!(selected, expected);
2151        let selected = input.select_transposed(keys_slice);
2152        trace!("selected_transposed={selected:?}");
2153        assert!(selected.is_ok());
2154        assert_eq!(selected.unwrap(), expected.t());
2155    }
2156
2157    #[rstest]
2158    #[case(
2159        column_frame! {
2160            "group_id" => vec![1, 2],
2161            "feed_tag" => vec![3, 4]
2162        },
2163        Key::from("group_id"),
2164        Some(ndarray::array!(1.into(), 2.into()))
2165    )]
2166    #[case(
2167        column_frame! {
2168            "group_id" => vec![1, 2, 5, 6],
2169            "feed_tag" => vec![3, 4, 7, 8]
2170        },
2171        Key::from("group_id"),
2172        Some(ndarray::array!(1.into(), 2.into(), 5.into(), 6.into()))
2173    )]
2174    #[case(
2175        column_frame! {
2176            "group_id" => vec![1, 2],
2177            "feed_tag" => vec![1, 1]
2178        },
2179        Key::from("feed_tag1"),
2180        None
2181    )]
2182    #[traced_test]
2183    #[allow(deprecated)]
2184    fn test_select_column(
2185        #[case] input: ColumnFrame,
2186        #[case] key: Key,
2187        #[case] expected: Option<Array1<DataValue>>,
2188    ) {
2189        let selected = input.select_column(&key);
2190        trace!("selected={selected:?}");
2191        match expected {
2192            Some(expected) => {
2193                assert!(selected.is_some());
2194                assert_eq!(selected.expect("BUG: checked above"), expected);
2195            }
2196            None => assert!(selected.is_none()),
2197        }
2198    }
2199
2200    #[test]
2201    #[traced_test]
2202    fn empty_join_test() {
2203        let join = JoinRelation::add_columns();
2204        let mut column_frame = ColumnFrame::default();
2205        column_frame
2206            .add_single_column("group_id", Vec::<DataValue>::new())
2207            .expect("BUG: cannot add column");
2208        let column_frame2 = column_frame! {
2209            "group_id" => vec![2, 1, 3],
2210            "feed_tag" => vec![1, 1, 1],
2211            "clicks" => vec![100, 10, 10],
2212            "imps" => vec![1000, 200, 200]
2213        };
2214        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2215
2216        let joined = column_frame.join(column_frame2, &join);
2217        assert!(joined.is_ok(), "{joined:?}");
2218
2219        trace!("{column_frame:?}");
2220        assert_eq!(
2221            column_frame.select(Some(&[
2222                "group_id".into(),
2223                "feed_tag".into(),
2224                "clicks".into(),
2225                "imps".into()
2226            ])),
2227            ndarray::array!(
2228                [2.into(), 1.into(), 100.into(), 1000.into()],
2229                [1.into(), 1.into(), 10.into(), 200.into()],
2230                [3.into(), 1.into(), 10.into(), 200.into()],
2231            )
2232        );
2233
2234        let mut column_frame2 = column_frame! {
2235            "feed_tag" => vec![1, 1, 1],
2236            "clicks" => vec![100, 10, 10],
2237            "imps" => vec![1000, 200, 200]
2238        };
2239        let mut column_frame = ColumnFrame::default();
2240        column_frame
2241            .add_single_column("group_id", Array1::from_vec(Vec::<DataValue>::new()))
2242            .expect("BUG: cannot add column");
2243        let joined = column_frame2.join(column_frame, &join);
2244        assert!(joined.is_ok(), "{joined:?}");
2245
2246        trace!("{column_frame2:?}");
2247        assert_eq!(
2248            column_frame2.select(Some(&[
2249                "group_id".into(),
2250                "feed_tag".into(),
2251                "clicks".into(),
2252                "imps".into()
2253            ])),
2254            ndarray::array!(
2255                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
2256                [DataValue::Null, 1.into(), 10.into(), 200.into()],
2257                [DataValue::Null, 1.into(), 10.into(), 200.into()],
2258            )
2259        );
2260
2261        let mut column_frame = ColumnFrame::default();
2262        column_frame.index = KeyIndex::new(vec!["group_id2".into()]);
2263        let joined = column_frame2.join(column_frame, &join);
2264        assert!(joined.is_ok(), "{joined:?}");
2265
2266        trace!("{column_frame2:?}");
2267        assert_eq!(
2268            column_frame2.select(Some(&[
2269                "group_id2".into(),
2270                "feed_tag".into(),
2271                "clicks".into(),
2272                "imps".into()
2273            ])),
2274            ndarray::array!(
2275                [DataValue::Null, 1.into(), 100.into(), 1000.into()],
2276                [DataValue::Null, 1.into(), 10.into(), 200.into()],
2277                [DataValue::Null, 1.into(), 10.into(), 200.into()],
2278            )
2279        );
2280    }
2281
2282    #[test]
2283    #[traced_test]
2284    fn join_test_multiple() {
2285        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
2286        let mut column_frame = column_frame! {
2287            "group_id" => vec![1, 1, 3]
2288        };
2289        let column_frame2 = column_frame! {
2290            "group_id" => vec![2, 1, 1],
2291            "clicks" => vec![100, 10, 10],
2292            "imps" => vec![1000, 200, 200]
2293        };
2294
2295        let joined = column_frame.join(column_frame2, &join);
2296        assert!(joined.is_ok(), "{joined:?}");
2297
2298        trace!("{column_frame:?}");
2299        assert_eq!(
2300            column_frame.select(Some(&["group_id".into(), "clicks".into(), "imps".into(),])),
2301            ndarray::array!(
2302                [1.into(), 10.into(), 200.into()],
2303                [1.into(), 10.into(), 200.into()],
2304                [3.into(), DataValue::Null, DataValue::Null],
2305            )
2306        )
2307    }
2308
2309    #[test]
2310    #[traced_test]
2311    fn join_test_no_matches() {
2312        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec!["group_id".into()])));
2313        let mut column_frame = column_frame! {
2314            "group_id" => vec![DataValue::I32(1), DataValue::I32(2), DataValue::I32(3)]
2315        };
2316        let column_frame2 = column_frame! {
2317            "group_id" => vec![DataValue::I32(4), DataValue::I32(5), DataValue::I32(6)],
2318            "clicks" => vec![DataValue::I32(100), DataValue::I32(200), DataValue::I32(300)],
2319        };
2320
2321        let joined = column_frame.join(column_frame2, &join);
2322        assert!(joined.is_ok(), "{joined:?}");
2323
2324        trace!("{column_frame:?}");
2325        assert_eq!(
2326            column_frame.select(Some(&["group_id".into(), "clicks".into()])),
2327            ndarray::array!(
2328                [DataValue::I32(1), DataValue::Null],
2329                [DataValue::I32(2), DataValue::Null],
2330                [DataValue::I32(3), DataValue::Null],
2331            )
2332        )
2333    }
2334    #[test]
2335    #[traced_test]
2336    fn join_test() {
2337        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
2338            "group_id".into(),
2339            "feed_tag".into(),
2340        ])));
2341        let mut column_frame = column_frame! {
2342            "group_id" => vec![1, 2, 8],
2343            "feed_tag" => vec![1, 1, 10]
2344        };
2345        let column_frame2 = column_frame! {
2346            "group_id" => vec![2, 1, 3],
2347            "feed_tag" => vec![1, 1, 1],
2348            "clicks" => vec![100, 10, 10],
2349            "imps" => vec![1000, 200, 200]
2350        };
2351        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2352
2353        let joined = column_frame.join(column_frame2, &join);
2354        assert!(joined.is_ok(), "{joined:?}");
2355
2356        trace!("{column_frame:?}");
2357        assert_eq!(
2358            column_frame.select(Some(&[
2359                "group_id".into(),
2360                "feed_tag".into(),
2361                "clicks".into(),
2362                "imps".into()
2363            ])),
2364            ndarray::array!(
2365                [1.into(), 1.into(), 10.into(), 200.into()],
2366                [2.into(), 1.into(), 100.into(), 1000.into()],
2367                [8.into(), 10.into(), DataValue::Null, DataValue::Null]
2368            ),
2369            "DF {column_frame:?}"
2370        );
2371        assert_eq!(
2372            column_frame
2373                .select_view(Some(&[
2374                    "group_id".into(),
2375                    "feed_tag".into(),
2376                    "clicks".into(),
2377                    "imps".into()
2378                ]))
2379                .unwrap()
2380                .row_view(),
2381            ndarray::array!(
2382                [1.into(), 1.into(), 10.into(), 200.into()],
2383                [2.into(), 1.into(), 100.into(), 1000.into()],
2384                [8.into(), 10.into(), DataValue::Null, DataValue::Null]
2385            )
2386            .view(),
2387            "DF {column_frame:?}"
2388        )
2389    }
2390
2391    #[test]
2392    #[traced_test]
2393    fn join_test_with_additional() {
2394        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
2395            "group_id".into(),
2396            "feed_tag".into(),
2397        ])));
2398        let mut column_frame = column_frame! {
2399            "group_id" => vec![1, 2, 8],
2400            "feed_tag" => vec![1, 1, 10],
2401            "clicked" => vec![0, 0, 1]
2402        };
2403        let column_frame2 = column_frame! {
2404            "group_id" => vec![2, 1, 3],
2405            "feed_tag" => vec![1, 1, 1],
2406            "clicks" => vec![100, 10, 10],
2407            "imps" => vec![1000, 200, 200]
2408        };
2409        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2410
2411        let joined = column_frame.join(column_frame2, &join);
2412        assert!(joined.is_ok(), "{joined:?}");
2413
2414        trace!("{column_frame:?}");
2415        assert_eq!(
2416            column_frame.select(Some(&[
2417                "group_id".into(),
2418                "feed_tag".into(),
2419                "clicks".into(),
2420                "imps".into(),
2421                "clicked".into()
2422            ])),
2423            ndarray::array!(
2424                [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
2425                [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
2426                [
2427                    8.into(),
2428                    10.into(),
2429                    DataValue::Null,
2430                    DataValue::Null,
2431                    1.into()
2432                ]
2433            )
2434        );
2435        assert_eq!(
2436            column_frame
2437                .select_view(Some(&[
2438                    "group_id".into(),
2439                    "feed_tag".into(),
2440                    "clicks".into(),
2441                    "imps".into(),
2442                    "clicked".into()
2443                ]))
2444                .unwrap()
2445                .row_view(),
2446            ndarray::array!(
2447                [1.into(), 1.into(), 10.into(), 200.into(), 0.into()],
2448                [2.into(), 1.into(), 100.into(), 1000.into(), 0.into()],
2449                [
2450                    8.into(),
2451                    10.into(),
2452                    DataValue::Null,
2453                    DataValue::Null,
2454                    1.into()
2455                ]
2456            )
2457            .view(),
2458            "DF {column_frame:?}"
2459        );
2460    }
2461
2462    #[test]
2463    #[traced_test]
2464    fn join_test_with_additional_single() {
2465        let join = JoinRelation::new(JoinBy::JoinById(JoinById::new(vec![
2466            "group_id".into(),
2467            "feed_tag".into(),
2468        ])));
2469        let mut column_frame = column_frame! {
2470            "group_id" => vec![1, 2, 8],
2471            "feed_tag" => vec![1, 1, 10],
2472            "clicked" => vec![0, 0, 1]
2473        };
2474        let column_frame2 = column_frame! {
2475            "a" => vec![1],
2476            "group_id" => vec![2],
2477            "feed_tag" => vec![1],
2478            "clicks" => vec![10],
2479            "imps" => vec![200]
2480        };
2481        assert!(column_frame.join(ColumnFrame::default(), &join).is_ok());
2482
2483        let joined = column_frame.join(column_frame2, &join);
2484        assert!(joined.is_ok(), "{joined:?}");
2485
2486        trace!("{column_frame:?}");
2487        assert_eq!(
2488            column_frame.select(Some(&[
2489                "group_id".into(),
2490                "feed_tag".into(),
2491                "clicks".into(),
2492                "imps".into(),
2493                "clicked".into()
2494            ])),
2495            ndarray::array!(
2496                [
2497                    1.into(),
2498                    1.into(),
2499                    DataValue::Null,
2500                    DataValue::Null,
2501                    0.into(),
2502                ],
2503                [2.into(), 1.into(), 10.into(), 200.into(), 0.into()],
2504                [
2505                    8.into(),
2506                    10.into(),
2507                    DataValue::Null,
2508                    DataValue::Null,
2509                    1.into()
2510                ]
2511            )
2512        )
2513    }
2514
2515    #[rstest]
2516    #[traced_test]
2517    fn cartesian_product_join() {
2518        let mut df = column_frame! {
2519            "group_id" => vec![1, 2, 3],
2520            "feed_tag" => vec![1, 2, 3]
2521        };
2522        let df2 = column_frame! {
2523            "zone_id" => vec![111111, 111133],
2524            "zone_avg_ctr" => vec![0.1, 0.001]
2525        };
2526        assert!(df
2527            .join(
2528                ColumnFrame::default(),
2529                &JoinRelation::new(JoinBy::CartesianProduct)
2530            )
2531            .is_ok());
2532        let join = JoinRelation::new(JoinBy::CartesianProduct);
2533        let result = df.join(df2, &join);
2534        assert!(result.is_ok(), "{result:?}");
2535        let selected = df.select(None);
2536        trace!("{selected:?}");
2537        assert_eq!(
2538            selected,
2539            ndarray::array!(
2540                [1.into(), 1.into(), 111111.into(), 0.1.into()],
2541                [1.into(), 1.into(), 111133.into(), 0.001.into()],
2542                [2.into(), 2.into(), 111111.into(), 0.1.into()],
2543                [2.into(), 2.into(), 111133.into(), 0.001.into()],
2544                [3.into(), 3.into(), 111111.into(), 0.1.into()],
2545                [3.into(), 3.into(), 111133.into(), 0.001.into()],
2546            )
2547        );
2548
2549        let df2 = column_frame! {
2550            "zone_id" => vec![111]
2551        };
2552        let result = df.join(df2, &join);
2553        assert!(result.is_ok(), "{result:?}");
2554        let selected = df.select(None);
2555        trace!("{selected:?}");
2556        assert_eq!(
2557            selected,
2558            ndarray::array!(
2559                [1.into(), 1.into(), 111111.into(), 0.1.into(), 111.into()],
2560                [1.into(), 1.into(), 111133.into(), 0.001.into(), 111.into()],
2561                [2.into(), 2.into(), 111111.into(), 0.1.into(), 111.into()],
2562                [2.into(), 2.into(), 111133.into(), 0.001.into(), 111.into()],
2563                [3.into(), 3.into(), 111111.into(), 0.1.into(), 111.into()],
2564                [3.into(), 3.into(), 111133.into(), 0.001.into(), 111.into()],
2565            )
2566        );
2567    }
2568
2569    #[rstest]
2570    #[traced_test]
2571    fn broadcast_join() {
2572        let mut df = column_frame! {
2573            "group_id" => vec![1, 2, 3],
2574            "feed_tag" => vec![1, 2, 3]
2575        };
2576        let df2 = column_frame! {
2577            "zone_id" => vec![111111]
2578        };
2579        assert!(df
2580            .join(
2581                ColumnFrame::default(),
2582                &JoinRelation::new(JoinBy::Broadcast)
2583            )
2584            .is_ok());
2585        let join = JoinRelation::new(JoinBy::Broadcast);
2586        assert!(df.join(df2, &join).is_ok());
2587        let selected = df.select(None);
2588        trace!("{selected:?}");
2589        assert_eq!(
2590            selected,
2591            ndarray::array!(
2592                [1.into(), 1.into(), 111111.into()],
2593                [2.into(), 2.into(), 111111.into()],
2594                [3.into(), 3.into(), 111111.into()]
2595            )
2596        );
2597    }
2598    #[rstest]
2599    #[traced_test]
2600    fn merge_test() {
2601        let mut df = column_frame! {
2602            "group_id" => vec![1, 2, 3],
2603            "feed_tag" => vec![1, 2, 3]
2604        };
2605        let df2 = column_frame! {
2606            "group_id" => vec![11, 21, 31],
2607            "feed_tag" => vec![12, 22, 32]
2608        };
2609
2610        let join = JoinRelation::new(JoinBy::Replace);
2611        assert!(df.join(df2, &join).is_ok());
2612        let selected = df.select(None);
2613        trace!("{selected:?}");
2614        assert_eq!(
2615            selected,
2616            ndarray::array!(
2617                [11.into(), 12.into()],
2618                [21.into(), 22.into()],
2619                [31.into(), 32.into()]
2620            )
2621        );
2622    }
2623
2624    #[rstest]
2625    #[traced_test]
2626    fn extend_test() {
2627        let mut df = column_frame! {
2628            "group_id" => vec![1, 2, 3],
2629            "feed_tag" => vec![1, 2, 3]
2630        };
2631        let df2 = column_frame! {
2632            "group_id" => vec![11, 21, 31],
2633            "feed_tag" => vec![5, 6, 7]
2634        };
2635        assert!(df
2636            .join(ColumnFrame::default(), &JoinRelation::new(JoinBy::Extend))
2637            .is_ok());
2638
2639        let join = JoinRelation::new(JoinBy::Extend);
2640        assert!(df.join(df2, &join).is_ok());
2641        let selected = df.select(Some(&["feed_tag".into(), "group_id".into()]));
2642        trace!("{selected:?}");
2643        assert_eq!(
2644            selected,
2645            ndarray::array!(
2646                [1.into(), 1.into()],
2647                [2.into(), 2.into()],
2648                [3.into(), 3.into()],
2649                [5.into(), 11.into()],
2650                [6.into(), 21.into()],
2651                [7.into(), 31.into()]
2652            )
2653        );
2654        let as_map = df.select_as_map(Some(&["feed_tag".into(), "group_id".into()]));
2655        trace!("{as_map:?}");
2656        assert_eq!(
2657            as_map,
2658            stdhashmap!(
2659                "feed_tag" => vec![1, 2, 3, 5, 6, 7],
2660                "group_id" => vec![1, 2, 3, 11, 21, 31]
2661            )
2662        );
2663
2664        let as_map = df.select_as_map(Some(&["feed_tag1".into()]));
2665        trace!("{as_map:?}");
2666        assert_eq!(as_map, HashMap::default());
2667    }
2668
2669    #[rstest]
2670    #[traced_test]
2671    fn extend_test_with_non_existing_cols() {
2672        let mut df = column_frame! {
2673            "group_id" => vec![1, 2, 3],
2674            "feed_tag" => vec![1, 2, 3]
2675        };
2676        let mut df2 = column_frame! {
2677            "group_id" => vec![11, 21, 31],
2678            "feed_tag" => vec![5, 6, 7],
2679            "clicks" => vec![100, 200, 300],
2680            "impressions" => vec![1000, 2000, 3000]
2681        };
2682        let df_bckp = df.clone();
2683        let join = JoinRelation::new(JoinBy::Extend);
2684        assert!(df.join(df2.clone(), &join).is_ok());
2685        let selected = df.select(None);
2686        trace!("{selected:?}");
2687        assert_eq!(
2688            selected,
2689            ndarray::array!(
2690                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2691                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2692                [3.into(), 3.into(), DataValue::Null, DataValue::Null],
2693                [11.into(), 5.into(), 100.into(), 1000.into()],
2694                [21.into(), 6.into(), 200.into(), 2000.into()],
2695                [31.into(), 7.into(), 300.into(), 3000.into()]
2696            )
2697        );
2698        let join = JoinRelation::new(JoinBy::Extend);
2699        let r = df2.join(df_bckp, &join);
2700        assert!(r.is_ok(), "{r:?}");
2701        let selected = df2.select(None);
2702        trace!("{selected:?}");
2703        assert_eq!(
2704            selected,
2705            ndarray::array!(
2706                [11.into(), 5.into(), 100.into(), 1000.into()],
2707                [21.into(), 6.into(), 200.into(), 2000.into()],
2708                [31.into(), 7.into(), 300.into(), 3000.into()],
2709                [1.into(), 1.into(), DataValue::Null, DataValue::Null],
2710                [2.into(), 2.into(), DataValue::Null, DataValue::Null],
2711                [3.into(), 3.into(), DataValue::Null, DataValue::Null]
2712            )
2713        );
2714    }
2715
2716    #[rstest]
2717    #[traced_test]
2718    fn extend_test_with_non_existing_cols_wrong_order() {
2719        let mut df = column_frame! {
2720            "group_id" => vec![1, 2, 3],
2721            "feed_tag" => vec![1, 2, 3]
2722        };
2723        let df2 = column_frame! {
2724            "feed_tag" => vec![5, 6, 7],
2725            "group_id" => vec![11, 21, 31]
2726        };
2727        let join = JoinRelation::new(JoinBy::Extend);
2728        let err = df.join(df2, &join);
2729        assert!(err.is_ok(), "{err:?}");
2730
2731        assert_eq!(df.nrows(), 6);
2732        assert_eq!(
2733            df.select(Some(&["group_id".into(), "feed_tag".into()])),
2734            ndarray::array![
2735                [1.into(), 1.into()],
2736                [2.into(), 2.into()],
2737                [3.into(), 3.into()],
2738                [11.into(), 5.into()],
2739                [21.into(), 6.into()],
2740                [31.into(), 7.into()],
2741            ]
2742        );
2743    }
2744
2745    #[rstest]
2746    #[traced_test]
2747    fn extend_test_wrong_order_with_extra_columns() {
2748        let mut df = column_frame! {
2749            "a" => vec![1, 2],
2750            "b" => vec![10, 20]
2751        };
2752        let df2 = column_frame! {
2753            "c" => vec![100, 200],
2754            "a" => vec![3, 4]
2755        };
2756        let join = JoinRelation::new(JoinBy::Extend);
2757        assert!(df.join(df2, &join).is_ok());
2758
2759        assert_eq!(df.nrows(), 4);
2760        assert_eq!(
2761            df.select(Some(&["a".into(), "b".into(), "c".into()])),
2762            ndarray::array![
2763                [1.into(), 10.into(), DataValue::Null],
2764                [2.into(), 20.into(), DataValue::Null],
2765                [3.into(), DataValue::Null, 100.into()],
2766                [4.into(), DataValue::Null, 200.into()],
2767            ]
2768        );
2769    }
2770
2771    #[rstest]
2772    #[traced_test]
2773    fn test_replace_not_compatible() {
2774        let mut df = column_frame! {
2775            "group_id" => vec![1, 2, 3],
2776            "feed_tag" => vec![1, 2, 3]
2777        };
2778        let df2 = column_frame! {
2779            "feed_tag" => vec![5, 6],
2780            "group_id" => vec![11, 21]
2781        };
2782        let join = JoinRelation::new(JoinBy::Replace);
2783        let err = df.join(df2, &join);
2784        assert!(err.is_err(), "{err:?}");
2785        let empty = ColumnFrame::default();
2786        let err = df.join(empty, &join);
2787        assert!(err.is_ok(), "{err:?}");
2788    }
2789
2790    #[rstest]
2791    #[traced_test]
2792    fn test_different_data() {
2793        let mut df = column_frame! {
2794            "group_id" => vec![1, 2, 3],
2795            "feed_tag" => vec![1, 2, 3]
2796        };
2797        let df2 = column_frame! {
2798            "group_id" => vec![11, 21],
2799            "a" => vec![5, 6]
2800        };
2801        let join = JoinRelation::new(JoinBy::Extend);
2802        let err = df.join(df2, &join);
2803        assert!(err.is_ok(), "{err:?}");
2804        println!("{df:?}");
2805        let expected_df = ColumnFrame::new(
2806            KeyIndex::from(vec!["group_id".into(), "feed_tag".into(), "a".into()]),
2807            vec![
2808                Array1::from_vec(vec![1.into(), 2.into(), 3.into(), 11.into(), 21.into()]),
2809                Array1::from_vec(vec![
2810                    1.into(),
2811                    2.into(),
2812                    3.into(),
2813                    DataValue::Null,
2814                    DataValue::Null,
2815                ]),
2816                Array1::from_vec(vec![
2817                    DataValue::Null,
2818                    DataValue::Null,
2819                    DataValue::Null,
2820                    5.into(),
2821                    6.into(),
2822                ]),
2823            ],
2824        );
2825        assert_eq!(df, expected_df)
2826    }
2827
2828    #[rstest]
2829    #[traced_test]
2830    fn serde_column_frame() {
2831        let df = column_frame! {
2832            "group_id" => vec![1u64, 2u64, 3u64],
2833            "feed_tag" => vec![1u64, 2u64, 3u64]
2834        };
2835        let key_idx = df.index.clone();
2836        let serialized = serde_json::to_string(&key_idx).expect("BUG: cannot serialize");
2837        let deserialized: KeyIndex =
2838            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2839        assert_eq!(key_idx, deserialized);
2840        assert!(key_idx.get_key(0).is_some_and(|x| x == "group_id".into()));
2841        let serialized = serde_json::to_string(&df).expect("BUG: cannot serialize");
2842        let deserialized: ColumnFrame =
2843            serde_json::from_str(&serialized).expect("BUG: cannot deserialize");
2844        assert_eq!(df, deserialized);
2845    }
2846
2847    #[rstest]
2848    #[traced_test]
2849    fn update_value() {
2850        let mut df = column_frame! {
2851            "group_id" => vec![1, 2, 3],
2852            "feed_tag" => vec![1, 2, 3]
2853        };
2854        let group_id: Key = "group_id".into();
2855        let v = df.get_by_row_index(&group_id, 1);
2856        assert_eq!(v, Some(DataValue::I32(2)));
2857        df.set_by_row_index(&group_id, 1, DataValue::I32(22))
2858            .expect("set must succeed");
2859        let v = df.get_by_row_index(&group_id, 1);
2860        assert_eq!(v, Some(DataValue::I32(22)));
2861
2862        assert!(df
2863            .set_by_row_index(&"group_id2".into(), 1, DataValue::Null)
2864            .is_err());
2865    }
2866
2867    #[rstest]
2868    fn get_single_column_typed_f64() {
2869        let df = column_frame! {
2870            "a" => [1i32, 2i32, 3i32],
2871            "b" => [10u64, 20u64, 30u64]
2872        };
2873        let key: Key = "a".into();
2874        let col = df.get_single_column_typed::<f64>(&key).unwrap();
2875        assert_eq!(col, ndarray::arr1(&[1.0, 2.0, 3.0]));
2876    }
2877
2878    #[rstest]
2879    fn get_single_column_typed_i64() {
2880        let df = column_frame! {
2881            "x" => [10u32, 20u32, 30u32]
2882        };
2883        let key: Key = "x".into();
2884        let col = df.get_single_column_typed::<i64>(&key).unwrap();
2885        assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
2886    }
2887
2888    #[rstest]
2889    fn get_single_column_typed_string() {
2890        let df = column_frame! {
2891            "name" => ["alice", "bob", "carol"]
2892        };
2893        let key: Key = "name".into();
2894        let col = df.get_single_column_typed::<String>(&key).unwrap();
2895        assert_eq!(
2896            col,
2897            ndarray::arr1(&["alice".to_string(), "bob".to_string(), "carol".to_string()])
2898        );
2899    }
2900
2901    #[rstest]
2902    fn get_single_column_typed_bool() {
2903        let df = column_frame! {
2904            "flag" => [1i32, 0i32, 1i32]
2905        };
2906        let key: Key = "flag".into();
2907        let col = df.get_single_column_typed::<bool>(&key).unwrap();
2908        assert_eq!(col, ndarray::arr1(&[true, false, true]));
2909    }
2910
2911    #[rstest]
2912    fn get_single_column_typed_missing_key_returns_none() {
2913        let df = column_frame! {
2914            "a" => [1, 2, 3]
2915        };
2916        let missing: Key = "nonexistent".into();
2917        assert!(df.get_single_column_typed::<f64>(&missing).is_none());
2918    }
2919
2920    #[rstest]
2921    fn get_single_column_typed_numeric_coercion_from_mixed() {
2922        let df = column_frame! {
2923            "vals" => [1.5f64, 2.7f64, 3.9f64]
2924        };
2925        let key: Key = "vals".into();
2926        // f64 -> i32 truncates via `as`
2927        let col = df.get_single_column_typed::<i32>(&key).unwrap();
2928        assert_eq!(col, ndarray::arr1(&[1i32, 2i32, 3i32]));
2929    }
2930
2931    #[rstest]
2932    fn get_single_column_typed_selects_correct_column() {
2933        let df = column_frame! {
2934            "x" => [1, 2, 3],
2935            "y" => [10, 20, 30],
2936            "z" => [100, 200, 300]
2937        };
2938        let key: Key = "y".into();
2939        let col = df.get_single_column_typed::<i64>(&key).unwrap();
2940        assert_eq!(col, ndarray::arr1(&[10i64, 20i64, 30i64]));
2941    }
2942
2943    #[rstest]
2944    fn get_single_column_typed_u64_identity() {
2945        let df = column_frame! {
2946            "id" => [100u64, 200u64, 300u64]
2947        };
2948        let key: Key = "id".into();
2949        let col = df.get_single_column_typed::<u64>(&key).unwrap();
2950        assert_eq!(col, ndarray::arr1(&[100u64, 200u64, 300u64]));
2951    }
2952
2953    #[rstest]
2954    fn get_single_column_typed_single_row() {
2955        let df = column_frame! {
2956            "x" => [42i32]
2957        };
2958        let key: Key = "x".into();
2959        let col = df.get_single_column_typed::<f64>(&key).unwrap();
2960        assert_eq!(col, ndarray::arr1(&[42.0f64]));
2961    }
2962
2963    #[rstest]
2964    fn get_single_column_typed_empty_frame() {
2965        let df = ColumnFrame::default();
2966        let key: Key = "x".into();
2967        let col = df.get_single_column_typed::<f64>(&key);
2968        assert!(col.is_none());
2969    }
2970
2971    #[rstest]
2972    fn select_typed_all_columns() {
2973        let df = column_frame! {
2974            "a" => [1i32, 2i32],
2975            "b" => [3i32, 4i32]
2976        };
2977        let result = df.select_typed::<f64>(None);
2978        assert_eq!(result.nrows(), 2);
2979        assert_eq!(result.ncols(), 2);
2980        assert_eq!(result[[0, 0]], 1.0);
2981        assert_eq!(result[[0, 1]], 3.0);
2982        assert_eq!(result[[1, 0]], 2.0);
2983        assert_eq!(result[[1, 1]], 4.0);
2984    }
2985
2986    #[rstest]
2987    fn select_typed_subset_of_columns() {
2988        let df = column_frame! {
2989            "a" => [10u64, 20u64],
2990            "b" => [30u64, 40u64],
2991            "c" => [50u64, 60u64]
2992        };
2993        let keys: Vec<Key> = vec!["a".into(), "c".into()];
2994        let result = df.select_typed::<i64>(Some(&keys));
2995        assert_eq!(result.nrows(), 2);
2996        assert_eq!(result.ncols(), 2);
2997        assert_eq!(result[[0, 0]], 10i64);
2998        assert_eq!(result[[0, 1]], 50i64);
2999        assert_eq!(result[[1, 0]], 20i64);
3000        assert_eq!(result[[1, 1]], 60i64);
3001    }
3002
3003    #[rstest]
3004    fn select_typed_nonexistent_keys_returns_empty() {
3005        let df = column_frame! {
3006            "a" => [1i32, 2i32]
3007        };
3008        let keys: Vec<Key> = vec!["z".into()];
3009        let result = df.select_typed::<f64>(Some(&keys));
3010        assert_eq!(result.shape(), &[0, 0]);
3011    }
3012
3013    #[rstest]
3014    fn select_typed_string_extraction() {
3015        let df = column_frame! {
3016            "name" => ["hello", "world"]
3017        };
3018        let result = df.select_typed::<String>(None);
3019        assert_eq!(result[[0, 0]], "hello");
3020        assert_eq!(result[[1, 0]], "world");
3021    }
3022
3023    #[rstest]
3024    fn select_typed_matches_manual_mapv() {
3025        let df = column_frame! {
3026            "x" => [1i32, 2i32, 3i32],
3027            "y" => [4i32, 5i32, 6i32]
3028        };
3029        let typed = df.select_typed::<f64>(None);
3030        let manual = df.select(None).mapv(|v| f64::extract(&v));
3031        assert_eq!(typed, manual);
3032    }
3033
3034    // ── select_vec_view ──────────────────────────────────────────────────────
3035
3036    /// `None` keys → fast-path that borrows every column.
3037    #[rstest]
3038    #[traced_test]
3039    fn select_vec_view_all_columns() {
3040        let cf = column_frame! {
3041            "a" => [1i32, 2i32, 3i32],
3042            "b" => [4i32, 5i32, 6i32]
3043        };
3044        let cols = cf.select_vec_view(None).expect("should succeed");
3045        assert_eq!(cols.len(), 2, "must return one borrow per column");
3046        let a = cols[0].expect("column 'a' present");
3047        let b = cols[1].expect("column 'b' present");
3048        assert_eq!(a.len(), 3, "each column has 3 rows");
3049        assert_eq!(b.len(), 3);
3050        // Ordering matches the internal column order ("a" first, "b" second).
3051        assert_eq!(
3052            a.to_vec(),
3053            vec![
3054                DataValue::from(1i32),
3055                DataValue::from(2i32),
3056                DataValue::from(3i32),
3057            ]
3058        );
3059        assert_eq!(
3060            b.to_vec(),
3061            vec![
3062                DataValue::from(4i32),
3063                DataValue::from(5i32),
3064                DataValue::from(6i32),
3065            ]
3066        );
3067    }
3068
3069    /// Explicit key subset is returned in the requested order.
3070    #[rstest]
3071    #[traced_test]
3072    fn select_vec_view_subset_in_order() {
3073        let cf = column_frame! {
3074            "a" => [10i32, 20i32],
3075            "b" => [30i32, 40i32],
3076            "c" => [50i32, 60i32]
3077        };
3078        // Ask for "c" then "a" — order must be preserved.
3079        let cols = cf
3080            .select_vec_view(Some(&["c".into(), "a".into()]))
3081            .expect("should succeed");
3082        assert_eq!(cols.len(), 2);
3083        assert_eq!(
3084            cols[0].expect("first col is 'c'").to_vec(),
3085            vec![DataValue::from(50i32), DataValue::from(60i32)],
3086        );
3087        assert_eq!(
3088            cols[1].expect("second col is 'a'").to_vec(),
3089            vec![DataValue::from(10i32), DataValue::from(20i32)],
3090        );
3091    }
3092
3093    /// A single existing key is returned as a one-element Vec.
3094    #[rstest]
3095    #[traced_test]
3096    fn select_vec_view_single_column() {
3097        let cf = column_frame! {
3098            "x" => [7i32, 8i32, 9i32],
3099            "y" => [1i32, 2i32, 3i32]
3100        };
3101        let cols = cf
3102            .select_vec_view(Some(&["x".into()]))
3103            .expect("should succeed");
3104        assert_eq!(cols.len(), 1);
3105        assert_eq!(
3106            cols[0].expect("column 'x' present").to_vec(),
3107            vec![
3108                DataValue::from(7i32),
3109                DataValue::from(8i32),
3110                DataValue::from(9i32),
3111            ]
3112        );
3113    }
3114
3115    /// Entirely unknown keys return `Err(EmptyData)`.
3116    #[rstest]
3117    #[traced_test]
3118    fn select_vec_view_unknown_keys_returns_err() {
3119        let cf = column_frame! {
3120            "a" => [1i32, 2i32]
3121        };
3122        let result = cf.select_vec_view(Some(&["nonexistent".into()]));
3123        assert!(
3124            result.is_err(),
3125            "should error on unknown key, got {result:?}"
3126        );
3127    }
3128
3129    /// Empty key slice returns `Err(EmptyData)`.
3130    #[rstest]
3131    #[traced_test]
3132    fn select_vec_view_empty_keys_returns_err() {
3133        let cf = column_frame! {
3134            "a" => [1i32, 2i32]
3135        };
3136        let result = cf.select_vec_view(Some(&[]));
3137        assert!(result.is_err(), "empty slice should return an error");
3138    }
3139
3140    /// Nulls survive the round-trip through select_vec_view.
3141    #[rstest]
3142    #[traced_test]
3143    fn select_vec_view_preserves_nulls() {
3144        let cf = column_frame! {
3145            "v" => [DataValue::Null, DataValue::from(42i32), DataValue::Null]
3146        };
3147        let cols = cf.select_vec_view(None).expect("should succeed");
3148        assert_eq!(cols.len(), 1);
3149        assert_eq!(
3150            cols[0].expect("column 'v' present").to_vec(),
3151            vec![DataValue::Null, DataValue::from(42i32), DataValue::Null]
3152        );
3153    }
3154
3155    // ── select_view ──────────────────────────────────────────────────────────
3156
3157    /// `None` keys → owned `Array2` of shape `(ncols, nrows)` covering all columns.
3158    #[rstest]
3159    #[traced_test]
3160    fn select_view_all_columns_shape() {
3161        let cf = column_frame! {
3162            "a" => [1i32, 2i32, 3i32],
3163            "b" => [4i32, 5i32, 6i32]
3164        };
3165        let mv = cf.select_view(None).expect("should succeed");
3166        // row_view transposes to (nrows × ncols) so shape == [3, 2].
3167        let rv = mv.row_view();
3168        assert_eq!(rv.nrows(), 3, "nrows should be 3");
3169        assert_eq!(rv.ncols(), 2, "ncols should be 2");
3170    }
3171
3172    /// Explicit key subset produces the correct row_view shape.
3173    #[rstest]
3174    #[traced_test]
3175    fn select_view_subset_shape() {
3176        let cf = column_frame! {
3177            "a" => [10i32, 20i32],
3178            "b" => [30i32, 40i32],
3179            "c" => [50i32, 60i32]
3180        };
3181        let mv = cf
3182            .select_view(Some(&["a".into(), "c".into()]))
3183            .expect("should succeed");
3184        let rv = mv.row_view();
3185        assert_eq!(rv.nrows(), 2);
3186        assert_eq!(rv.ncols(), 2);
3187    }
3188
3189    /// row_view data matches select() for the same keys.
3190    ///
3191    /// `select_view` stacks columns on Axis(0) → shape (ncols × nrows).
3192    /// `row_view()` transposes back to (nrows × ncols), which must equal
3193    /// what `select()` returns directly.
3194    #[rstest]
3195    #[traced_test]
3196    fn select_view_data_matches_select() {
3197        let cf = column_frame! {
3198            "p" => [1i32, 2i32],
3199            "q" => [3i32, 4i32]
3200        };
3201        let keys: &[Key] = &["p".into(), "q".into()];
3202        let mv = cf.select_view(Some(keys)).expect("should succeed");
3203        let view_data = mv.row_view().to_owned();
3204        let select_data = cf.select(Some(keys));
3205        assert_eq!(view_data, select_data);
3206    }
3207
3208    /// Entirely unknown keys return an error.
3209    #[rstest]
3210    #[traced_test]
3211    fn select_view_unknown_keys_returns_err() {
3212        let cf = column_frame! {
3213            "a" => [1i32, 2i32]
3214        };
3215        let result = cf.select_view(Some(&["does_not_exist".into()]));
3216        assert!(result.is_err(), "unknown key should return an error");
3217    }
3218
3219    /// Empty key slice returns an error.
3220    #[rstest]
3221    #[traced_test]
3222    fn select_view_empty_keys_returns_err() {
3223        let cf = column_frame! {
3224            "a" => [1i32, 2i32]
3225        };
3226        let result = cf.select_view(Some(&[]));
3227        assert!(result.is_err(), "empty slice should return an error");
3228    }
3229
3230    /// Single-column selection: row_view produces (nrows × 1).
3231    ///
3232    /// `select_view` stacks on Axis(0) → shape (1 × 4).
3233    /// `row_view()` transposes to (4 × 1) — 4 rows, 1 column.
3234    #[rstest]
3235    #[traced_test]
3236    fn select_view_single_column() {
3237        let cf = column_frame! {
3238            "only" => [5i32, 6i32, 7i32, 8i32]
3239        };
3240        let mv = cf
3241            .select_view(Some(&["only".into()]))
3242            .expect("should succeed");
3243        let rv = mv.row_view();
3244        assert_eq!(rv.nrows(), 4, "four rows after transposing single column");
3245        assert_eq!(rv.ncols(), 1, "one column");
3246    }
3247
3248    // ══════════════════════════════════════════════════════════════════════
3249    // Serde backward-compatibility fixture tests (V1, V2)
3250    // ══════════════════════════════════════════════════════════════════════
3251
3252    #[test]
3253    fn serde_v1_array2_roundtrip() {
3254        #[derive(Serialize)]
3255        struct WireV1 {
3256            index: KeyIndex,
3257            data_frame: Array2<DataValue>,
3258        }
3259
3260        let index = KeyIndex::new(vec![
3261            Key::new("a", crate::DataType::I32),
3262            Key::new("b", crate::DataType::String),
3263        ]);
3264        let data_frame = Array2::from_shape_vec(
3265            (2, 2),
3266            vec![
3267                DataValue::I32(1),
3268                DataValue::String("x".into()),
3269                DataValue::I32(2),
3270                DataValue::String("y".into()),
3271            ],
3272        )
3273        .unwrap();
3274        let wire = WireV1 { index, data_frame };
3275        let bytes = rmp_serde::to_vec(&wire).unwrap();
3276        let deserialized: ColumnFrame = rmp_serde::from_slice(&bytes).unwrap();
3277
3278        assert_eq!(deserialized.nrows(), 2);
3279        assert_eq!(deserialized.ncolumns(), 2);
3280        assert_eq!(
3281            deserialized.get_column(&"a".into()).unwrap().get(0),
3282            Some(DataValue::I32(1))
3283        );
3284        assert_eq!(
3285            deserialized.get_column(&"a".into()).unwrap().get(1),
3286            Some(DataValue::I32(2))
3287        );
3288        assert_eq!(
3289            deserialized.get_column(&"b".into()).unwrap().get(0),
3290            Some(DataValue::String("x".into()))
3291        );
3292    }
3293
3294    #[test]
3295    fn serde_v2_vec_typed_data_roundtrip() {
3296        #[derive(Serialize)]
3297        struct WireV2 {
3298            index: KeyIndex,
3299            data_frame: Vec<TypedData>,
3300        }
3301
3302        let index = KeyIndex::new(vec![
3303            Key::new("c", crate::DataType::F64),
3304            Key::new("d", crate::DataType::Bool),
3305        ]);
3306        let data_frame = vec![
3307            TypedData::from(vec![1.5f64, 2.5]),
3308            TypedData::from(vec![true, false]),
3309        ];
3310        let wire = WireV2 { index, data_frame };
3311        let bytes = rmp_serde::to_vec(&wire).unwrap();
3312        let deserialized: ColumnFrame = rmp_serde::from_slice(&bytes).unwrap();
3313
3314        assert_eq!(deserialized.nrows(), 2);
3315        assert_eq!(deserialized.ncolumns(), 2);
3316        assert_eq!(
3317            deserialized.get_column(&"c".into()).unwrap().get(0),
3318            Some(DataValue::F64(1.5))
3319        );
3320        assert_eq!(
3321            deserialized.get_column(&"d".into()).unwrap().get(1),
3322            Some(DataValue::Bool(false))
3323        );
3324    }
3325
3326    #[test]
3327    fn serde_v3_current_format_roundtrip() {
3328        let cf = column_frame! {
3329            "x" => [100i32, 200i32, 300i32],
3330            "y" => [true, false, true]
3331        };
3332        let bytes = rmp_serde::to_vec(&cf).unwrap();
3333        let deserialized: ColumnFrame = rmp_serde::from_slice(&bytes).unwrap();
3334
3335        assert_eq!(deserialized.nrows(), 3);
3336        assert_eq!(deserialized.ncolumns(), 2);
3337        let col_x = deserialized.get_column(&"x".into()).unwrap();
3338        assert_eq!(col_x.len(), 3);
3339        let col_y = deserialized.get_column(&"y".into()).unwrap();
3340        assert_eq!(col_y.get(0), Some(DataValue::Bool(true)));
3341        assert_eq!(col_y.get(1), Some(DataValue::Bool(false)));
3342    }
3343}