Skip to main content

virtual_frame/
tidyview.rs

1//! TidyView — the virtual frame engine.
2//!
3//! A TidyView is a zero-copy lens over shared columnar data: an `Rc<DataFrame>`
4//! base, a `BitMask` recording which rows are visible, a `ProjectionMap` for
5//! column selection, and an optional ordering permutation.
6//!
7//! Filters don't copy data — they flip bits. Selects don't copy data — they
8//! narrow the projection. The original data is never modified.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12use std::rc::Rc;
13
14use crate::bitmask::BitMask;
15use crate::column::{Column, ColumnKeyRef, GroupKey};
16use crate::dataframe::DataFrame;
17use crate::expr::{self, DExpr, ExprValue};
18use crate::kahan::KahanAccumulator;
19
20// ── Error type ──────────────────────────────────────────────────────────────
21
22/// Error type for TidyView operations.
23#[derive(Debug, Clone)]
24pub enum TidyError {
25    /// Column not found in the base DataFrame.
26    ColumnNotFound(String),
27    /// Duplicate column name in select/mutate.
28    DuplicateColumn(String),
29    /// Filter predicate didn't evaluate to Bool.
30    PredicateNotBool { got: String },
31    /// Type mismatch in expression evaluation.
32    TypeMismatch { expected: String, got: String },
33    /// Length mismatch (e.g., broadcast failure).
34    LengthMismatch { expected: usize, got: usize },
35    /// Internal error.
36    Internal(String),
37    /// Aggregation on an empty group.
38    EmptyGroup,
39}
40
41impl fmt::Display for TidyError {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            TidyError::ColumnNotFound(n) => write!(f, "column `{}` not found", n),
45            TidyError::DuplicateColumn(n) => write!(f, "duplicate column `{}`", n),
46            TidyError::PredicateNotBool { got } => {
47                write!(f, "filter predicate must be Bool, got {}", got)
48            }
49            TidyError::TypeMismatch { expected, got } => {
50                write!(f, "type mismatch: expected {}, got {}", expected, got)
51            }
52            TidyError::LengthMismatch { expected, got } => {
53                write!(f, "length mismatch: expected {} rows, got {}", expected, got)
54            }
55            TidyError::Internal(msg) => write!(f, "internal error: {}", msg),
56            TidyError::EmptyGroup => write!(f, "aggregation on empty group"),
57        }
58    }
59}
60
61impl std::error::Error for TidyError {}
62
63// ── Projection map ─────────────────────────────────────────────────────────
64
65/// Tracks which columns are visible after select().
66#[derive(Debug, Clone)]
67pub struct ProjectionMap {
68    /// Column indices into the base DataFrame. None = all columns visible.
69    indices: Option<Vec<usize>>,
70}
71
72impl ProjectionMap {
73    /// All columns visible.
74    pub fn all() -> Self {
75        Self { indices: None }
76    }
77
78    /// Specific column indices visible.
79    pub fn from_indices(indices: Vec<usize>) -> Self {
80        Self {
81            indices: Some(indices),
82        }
83    }
84
85    /// Get the visible indices (or all 0..ncols if None).
86    pub fn resolve(&self, ncols: usize) -> Vec<usize> {
87        match &self.indices {
88            Some(idx) => idx.clone(),
89            None => (0..ncols).collect(),
90        }
91    }
92
93    /// Number of visible columns.
94    pub fn len(&self, ncols: usize) -> usize {
95        match &self.indices {
96            Some(idx) => idx.len(),
97            None => ncols,
98        }
99    }
100}
101
102// ── Sort key ────────────────────────────────────────────────────────────────
103
104/// A sort key for arrange().
105#[derive(Debug, Clone)]
106pub struct ArrangeKey {
107    pub col_name: String,
108    pub descending: bool,
109}
110
111impl ArrangeKey {
112    pub fn asc(col_name: &str) -> Self {
113        Self {
114            col_name: col_name.to_string(),
115            descending: false,
116        }
117    }
118    pub fn desc(col_name: &str) -> Self {
119        Self {
120            col_name: col_name.to_string(),
121            descending: true,
122        }
123    }
124}
125
126// ── Aggregation enum ────────────────────────────────────────────────────────
127
128/// Aggregation functions for summarise().
129#[derive(Debug, Clone)]
130pub enum TidyAgg {
131    /// Row count (no column argument).
132    Count,
133    /// Kahan-compensated sum of a numeric column.
134    Sum(String),
135    /// Mean (NaN for empty groups).
136    Mean(String),
137    /// Minimum value.
138    Min(String),
139    /// Maximum value.
140    Max(String),
141    /// Sample standard deviation (Kahan-based).
142    Sd(String),
143    /// Sample variance (Kahan-based).
144    Var(String),
145    /// First row's value.
146    First(String),
147    /// Last row's value.
148    Last(String),
149    /// Count of distinct values.
150    NDistinct(String),
151}
152
153// ── Group index ─────────────────────────────────────────────────────────────
154
155/// Metadata for one group: key values + row indices.
156#[derive(Debug, Clone)]
157pub struct GroupMeta {
158    pub key_values: Vec<GroupKey>,
159    pub row_indices: Vec<usize>,
160}
161
162/// Index over groups, in first-occurrence order.
163#[derive(Debug, Clone)]
164pub struct GroupIndex {
165    pub groups: Vec<GroupMeta>,
166    pub key_names: Vec<String>,
167}
168
169impl GroupIndex {
170    /// Build group index using borrowed keys. Single-key fast path avoids
171    /// Vec wrapper per row (100K fewer heap allocations).
172    pub fn build_fast_typed(
173        base: &DataFrame,
174        key_col_indices: &[usize],
175        visible_rows: &[usize],
176        key_names: Vec<String>,
177    ) -> Self {
178        if key_col_indices.len() == 1 {
179            return Self::build_single(base, key_col_indices[0], visible_rows, key_names);
180        }
181        Self::build_multi(base, key_col_indices, visible_rows, key_names)
182    }
183
184    /// Single-key: BTreeMap<ColumnKeyRef, usize> — no Vec per row.
185    fn build_single(
186        base: &DataFrame,
187        key_col_idx: usize,
188        visible_rows: &[usize],
189        key_names: Vec<String>,
190    ) -> Self {
191        let col = &base.columns[key_col_idx].1;
192        let mut groups: Vec<GroupMeta> = Vec::new();
193        let mut key_to_slot: BTreeMap<ColumnKeyRef<'_>, usize> = BTreeMap::new();
194
195        for &row in visible_rows {
196            let key = ColumnKeyRef::from_column(col, row);
197            if let Some(&slot) = key_to_slot.get(&key) {
198                groups[slot].row_indices.push(row);
199            } else {
200                let slot = groups.len();
201                let key_values = vec![key.to_owned_key()];
202                key_to_slot.insert(key, slot);
203                groups.push(GroupMeta {
204                    key_values,
205                    row_indices: vec![row],
206                });
207            }
208        }
209        GroupIndex { groups, key_names }
210    }
211
212    /// Multi-key: BTreeMap<Vec<ColumnKeyRef>, usize>.
213    fn build_multi(
214        base: &DataFrame,
215        key_col_indices: &[usize],
216        visible_rows: &[usize],
217        key_names: Vec<String>,
218    ) -> Self {
219        let mut groups: Vec<GroupMeta> = Vec::new();
220        let mut key_to_slot: BTreeMap<Vec<ColumnKeyRef<'_>>, usize> = BTreeMap::new();
221        let cols: Vec<&Column> = key_col_indices
222            .iter()
223            .map(|&ci| &base.columns[ci].1)
224            .collect();
225
226        for &row in visible_rows {
227            let key: Vec<ColumnKeyRef<'_>> =
228                cols.iter().map(|col| ColumnKeyRef::from_column(col, row)).collect();
229            if let Some(&slot) = key_to_slot.get(&key) {
230                groups[slot].row_indices.push(row);
231            } else {
232                let slot = groups.len();
233                let key_values: Vec<GroupKey> = key.iter().map(|k| k.to_owned_key()).collect();
234                key_to_slot.insert(key, slot);
235                groups.push(GroupMeta {
236                    key_values,
237                    row_indices: vec![row],
238                });
239            }
240        }
241        GroupIndex { groups, key_names }
242    }
243}
244
245// ── TidyView ────────────────────────────────────────────────────────────────
246
247/// Zero-copy virtual view over a shared DataFrame.
248///
249/// The base data is behind an `Rc` and never modified. Filters create bitmasks,
250/// selects narrow projections, sorts store permutation vectors. Data is only
251/// copied when you explicitly materialize or mutate.
252#[derive(Debug, Clone)]
253pub struct TidyView {
254    pub(crate) base: Rc<DataFrame>,
255    pub(crate) mask: BitMask,
256    pub(crate) proj: ProjectionMap,
257    pub(crate) ordering: Option<Rc<Vec<usize>>>,
258}
259
260impl TidyView {
261    /// Create a new TidyView over the entire DataFrame.
262    pub fn new(df: DataFrame) -> Self {
263        let nrows = df.nrows();
264        Self {
265            base: Rc::new(df),
266            mask: BitMask::all_true(nrows),
267            proj: ProjectionMap::all(),
268            ordering: None,
269        }
270    }
271
272    /// Create from a shared Rc<DataFrame>.
273    pub fn from_rc(base: Rc<DataFrame>) -> Self {
274        let nrows = base.nrows();
275        Self {
276            base,
277            mask: BitMask::all_true(nrows),
278            proj: ProjectionMap::all(),
279            ordering: None,
280        }
281    }
282
283    /// Number of visible rows.
284    pub fn nrows(&self) -> usize {
285        self.mask.count_ones()
286    }
287
288    /// Number of visible columns.
289    pub fn ncols(&self) -> usize {
290        self.proj.len(self.base.ncols())
291    }
292
293    /// Get the bitmask (for auditing).
294    pub fn mask(&self) -> &BitMask {
295        &self.mask
296    }
297
298    /// Get the base DataFrame (for auditing).
299    pub fn base(&self) -> &DataFrame {
300        &self.base
301    }
302
303    /// Get visible row indices in order.
304    fn visible_rows_ordered(&self) -> Vec<usize> {
305        if let Some(ref ord) = self.ordering {
306            ord.as_ref().clone()
307        } else {
308            self.mask.iter_set().collect()
309        }
310    }
311
312    /// Resolve pending ordering by materializing into a new base.
313    /// Returns None if no ordering exists (caller should use self).
314    fn resolve_ordering(&self) -> Option<TidyView> {
315        let ord = self.ordering.as_ref()?;
316        let row_indices: &[usize] = ord.as_ref();
317        let mut all_cols = Vec::with_capacity(self.base.ncols());
318        for (name, col) in &self.base.columns {
319            all_cols.push((name.clone(), col.gather(row_indices)));
320        }
321        let new_base =
322            DataFrame::from_columns(all_cols).expect("resolve_ordering: column length mismatch");
323        let nrows = new_base.nrows();
324        Some(TidyView {
325            base: Rc::new(new_base),
326            mask: BitMask::all_true(nrows),
327            proj: self.proj.clone(),
328            ordering: None,
329        })
330    }
331
332    /// Build a TidyView from explicit row indices.
333    fn view_from_row_indices(&self, row_indices: Vec<usize>) -> TidyView {
334        let nrows_base = self.base.nrows();
335        let mut words = vec![0u64; crate::bitmask::nwords_for(nrows_base)];
336        for &r in &row_indices {
337            words[r / 64] |= 1u64 << (r % 64);
338        }
339        TidyView {
340            base: Rc::clone(&self.base),
341            mask: BitMask {
342                words,
343                nrows: nrows_base,
344            },
345            proj: self.proj.clone(),
346            ordering: None,
347        }
348    }
349
350    // ── filter ──────────────────────────────────────────────────────────
351
352    /// Filter rows by predicate. Returns a new TidyView with an updated bitmask.
353    /// No data is copied — just bits are flipped.
354    pub fn filter(&self, predicate: &DExpr) -> Result<TidyView, TidyError> {
355        // Handle ordering case: filter the permutation vector directly
356        if let Some(ref ord) = self.ordering {
357            let pred_mask =
358                if let Some(m) = expr::try_eval_predicate_columnar(&self.base, predicate, &self.mask)
359                {
360                    m
361                } else {
362                    let nrows_base = self.base.nrows();
363                    let mut new_words = self.mask.words.clone();
364                    for &row in ord.iter() {
365                        let b = expr::eval_expr_row(&self.base, predicate, row)
366                            .map_err(|e| TidyError::Internal(e))?;
367                        let pass = match b {
368                            ExprValue::Bool(v) => v,
369                            _ => {
370                                return Err(TidyError::PredicateNotBool {
371                                    got: b.type_name().to_string(),
372                                })
373                            }
374                        };
375                        if !pass {
376                            new_words[row / 64] &= !(1u64 << (row % 64));
377                        }
378                    }
379                    BitMask {
380                        words: new_words,
381                        nrows: nrows_base,
382                    }
383                };
384            let new_ord: Vec<usize> = ord.iter().filter(|&&row| pred_mask.get(row)).copied().collect();
385            return Ok(TidyView {
386                base: Rc::clone(&self.base),
387                mask: pred_mask,
388                proj: self.proj.clone(),
389                ordering: Some(Rc::new(new_ord)),
390            });
391        }
392
393        // No ordering — columnar fast path first
394        if let Some(new_mask) =
395            expr::try_eval_predicate_columnar(&self.base, predicate, &self.mask)
396        {
397            return Ok(TidyView {
398                base: Rc::clone(&self.base),
399                mask: new_mask,
400                proj: self.proj.clone(),
401                ordering: None,
402            });
403        }
404
405        // Row-wise fallback
406        let nrows_base = self.base.nrows();
407        let mut new_words = self.mask.words.clone();
408        for row in self.mask.iter_set() {
409            let b = expr::eval_expr_row(&self.base, predicate, row)
410                .map_err(|e| TidyError::Internal(e))?;
411            let pass = match b {
412                ExprValue::Bool(v) => v,
413                _ => {
414                    return Err(TidyError::PredicateNotBool {
415                        got: b.type_name().to_string(),
416                    })
417                }
418            };
419            if !pass {
420                new_words[row / 64] &= !(1u64 << (row % 64));
421            }
422        }
423        Ok(TidyView {
424            base: Rc::clone(&self.base),
425            mask: BitMask {
426                words: new_words,
427                nrows: nrows_base,
428            },
429            proj: self.proj.clone(),
430            ordering: None,
431        })
432    }
433
434    // ── select ──────────────────────────────────────────────────────────
435
436    /// Select specific columns by name. Returns a new TidyView with a
437    /// narrowed projection. No data is copied.
438    pub fn select(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
439        let mut seen = BTreeSet::new();
440        for &name in cols {
441            if !seen.insert(name) {
442                return Err(TidyError::DuplicateColumn(name.to_string()));
443            }
444        }
445        let mut new_indices = Vec::with_capacity(cols.len());
446        for &name in cols {
447            let idx = self
448                .base
449                .columns
450                .iter()
451                .position(|(n, _)| n == name)
452                .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
453            new_indices.push(idx);
454        }
455        Ok(TidyView {
456            base: Rc::clone(&self.base),
457            mask: self.mask.clone(),
458            proj: ProjectionMap::from_indices(new_indices),
459            ordering: self.ordering.clone(),
460        })
461    }
462
463    // ── mutate ──────────────────────────────────────────────────────────
464
465    /// Add or replace columns. Materializes the view and returns a new DataFrame.
466    ///
467    /// Uses snapshot semantics: all column references resolve against the
468    /// column list frozen *before* any assignments execute.
469    pub fn mutate(&self, assignments: &[(&str, DExpr)]) -> Result<DataFrame, TidyError> {
470        let mut seen = BTreeSet::new();
471        for &(name, _) in assignments {
472            if !seen.insert(name) {
473                return Err(TidyError::DuplicateColumn(name.to_string()));
474            }
475        }
476
477        let mut df = self.materialize()?;
478        let snapshot_names: Vec<String> = df.columns.iter().map(|(n, _)| n.clone()).collect();
479
480        for &(col_name, ref dexpr) in assignments {
481            // Validate column refs exist in snapshot
482            validate_expr_columns(dexpr, &snapshot_names)?;
483
484            let nrows = df.nrows();
485            let new_col = eval_expr_column(&df, dexpr, nrows)?;
486
487            if let Some(pos) = df.columns.iter().position(|(n, _)| n == col_name) {
488                df.columns[pos].1 = new_col;
489            } else {
490                df.columns.push((col_name.to_string(), new_col));
491            }
492        }
493        Ok(df)
494    }
495
496    // ── group_by ────────────────────────────────────────────────────────
497
498    /// Group rows by one or more key columns.
499    ///
500    /// Groups appear in first-occurrence order (deterministic, not hash-dependent).
501    pub fn group_by(&self, keys: &[&str]) -> Result<GroupedTidyView, TidyError> {
502        let mut key_col_indices = Vec::with_capacity(keys.len());
503        for &key in keys {
504            let idx = self
505                .base
506                .columns
507                .iter()
508                .position(|(n, _)| n == key)
509                .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
510            key_col_indices.push(idx);
511        }
512
513        let key_names: Vec<String> = keys.iter().map(|s| s.to_string()).collect();
514        let visible_rows: Vec<usize> = if self.ordering.is_none()
515            && self.mask.count_ones() == self.base.nrows()
516        {
517            (0..self.base.nrows()).collect()
518        } else {
519            self.visible_rows_ordered()
520        };
521
522        let index =
523            GroupIndex::build_fast_typed(&self.base, &key_col_indices, &visible_rows, key_names);
524
525        Ok(GroupedTidyView {
526            view: self.clone(),
527            index,
528        })
529    }
530
531    // ── arrange ─────────────────────────────────────────────────────────
532
533    /// Sort visible rows by one or more keys. Stores the result as a lazy
534    /// permutation vector — no data is materialized.
535    pub fn arrange(&self, keys: &[ArrangeKey]) -> Result<TidyView, TidyError> {
536        for key in keys {
537            if self.base.get_column(&key.col_name).is_none() {
538                return Err(TidyError::ColumnNotFound(key.col_name.clone()));
539            }
540        }
541        let mut row_indices: Vec<usize> = self.mask.iter_set().collect();
542        let key_cols: Vec<(&Column, bool)> = keys
543            .iter()
544            .map(|key| {
545                let col = self.base.get_column(&key.col_name).unwrap();
546                (col, key.descending)
547            })
548            .collect();
549
550        row_indices.sort_by(|&a, &b| {
551            for &(col, desc) in &key_cols {
552                let ord = col.compare_rows(a, b);
553                let ord = if desc { ord.reverse() } else { ord };
554                if ord != std::cmp::Ordering::Equal {
555                    return ord;
556                }
557            }
558            std::cmp::Ordering::Equal
559        });
560
561        Ok(TidyView {
562            base: Rc::clone(&self.base),
563            mask: self.mask.clone(),
564            proj: self.proj.clone(),
565            ordering: Some(Rc::new(row_indices)),
566        })
567    }
568
569    // ── slice ───────────────────────────────────────────────────────────
570
571    /// Take the first N visible rows.
572    pub fn slice_head(&self, n: usize) -> TidyView {
573        let rows: Vec<usize> = self.visible_rows_ordered().into_iter().take(n).collect();
574        self.view_from_row_indices(rows)
575    }
576
577    /// Take the last N visible rows.
578    pub fn slice_tail(&self, n: usize) -> TidyView {
579        let all = self.visible_rows_ordered();
580        let start = all.len().saturating_sub(n);
581        let rows = all[start..].to_vec();
582        self.view_from_row_indices(rows)
583    }
584
585    /// Deterministic random sample of N rows. Same seed = same rows, always.
586    pub fn slice_sample(&self, n: usize, seed: u64) -> TidyView {
587        let resolved = self.resolve_ordering();
588        let this = resolved.as_ref().unwrap_or(self);
589        let mut visible: Vec<usize> = this.mask.iter_set().collect();
590        let total = visible.len();
591        if n >= total {
592            return this.view_from_row_indices(visible);
593        }
594        // Partial Fisher-Yates with LCG (deterministic)
595        let mut rng = seed;
596        for i in 0..n {
597            rng = rng
598                .wrapping_mul(6364136223846793005)
599                .wrapping_add(1442695040888963407);
600            let j = i + (rng as usize % (total - i));
601            visible.swap(i, j);
602        }
603        visible.truncate(n);
604        visible.sort_unstable();
605        this.view_from_row_indices(visible)
606    }
607
608    // ── distinct ────────────────────────────────────────────────────────
609
610    /// Keep only unique rows by the given column subset.
611    pub fn distinct(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
612        let resolved = self.resolve_ordering();
613        let this = resolved.as_ref().unwrap_or(self);
614
615        let mut col_indices = Vec::with_capacity(cols.len());
616        for &name in cols {
617            let idx = this
618                .base
619                .columns
620                .iter()
621                .position(|(n, _)| n == name)
622                .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
623            col_indices.push(idx);
624        }
625
626        let mut seen_keys: BTreeSet<Vec<String>> = BTreeSet::new();
627        let mut selected_rows: Vec<usize> = Vec::new();
628
629        for row in this.mask.iter_set() {
630            let key: Vec<String> = col_indices
631                .iter()
632                .map(|&ci| this.base.columns[ci].1.get_display(row))
633                .collect();
634            if seen_keys.insert(key) {
635                selected_rows.push(row);
636            }
637        }
638        Ok(this.view_from_row_indices(selected_rows))
639    }
640
641    // ── joins ───────────────────────────────────────────────────────────
642
643    /// Inner join: rows where keys match in both tables.
644    pub fn inner_join(
645        &self,
646        right: &TidyView,
647        on: &[(&str, &str)],
648    ) -> Result<DataFrame, TidyError> {
649        let l = self.resolve_ordering();
650        let lref = l.as_ref().unwrap_or(self);
651        let r = right.resolve_ordering();
652        let rref = r.as_ref().unwrap_or(right);
653        let (left_rows, right_rows) = join_match_rows(lref, rref, on)?;
654        build_join_frame(lref, rref, &left_rows, &right_rows, on)
655    }
656
657    /// Left join: all left rows, matched right rows or defaults.
658    pub fn left_join(
659        &self,
660        right: &TidyView,
661        on: &[(&str, &str)],
662    ) -> Result<DataFrame, TidyError> {
663        let l = self.resolve_ordering();
664        let lref = l.as_ref().unwrap_or(self);
665        let r = right.resolve_ordering();
666        let rref = r.as_ref().unwrap_or(right);
667        let (left_rows, right_rows_opt) = join_match_rows_optional(lref, rref, on)?;
668        build_left_join_frame(lref, rref, &left_rows, &right_rows_opt, on)
669    }
670
671    // ── materialize ─────────────────────────────────────────────────────
672
673    /// Materialize the view into a new DataFrame (mask + projection applied).
674    pub fn materialize(&self) -> Result<DataFrame, TidyError> {
675        let resolved = self.resolve_ordering();
676        let this = resolved.as_ref().unwrap_or(self);
677
678        let visible: Vec<usize> = this.mask.iter_set().collect();
679        let proj_indices = this.proj.resolve(this.base.ncols());
680
681        let mut result_cols = Vec::with_capacity(proj_indices.len());
682        for &ci in &proj_indices {
683            let (name, col) = &this.base.columns[ci];
684            result_cols.push((name.clone(), col.gather(&visible)));
685        }
686
687        DataFrame::from_columns(result_cols).map_err(|e| TidyError::Internal(e.to_string()))
688    }
689
690    /// Column names visible through the current projection.
691    pub fn column_names(&self) -> Vec<String> {
692        let proj_indices = self.proj.resolve(self.base.ncols());
693        proj_indices
694            .iter()
695            .map(|&i| self.base.columns[i].0.clone())
696            .collect()
697    }
698}
699
700// ── GroupedTidyView ─────────────────────────────────────────────────────────
701
702/// A TidyView with an associated group index.
703#[derive(Debug, Clone)]
704pub struct GroupedTidyView {
705    pub view: TidyView,
706    pub index: GroupIndex,
707}
708
709impl GroupedTidyView {
710    /// Compute aggregate statistics per group.
711    pub fn summarise(
712        &self,
713        assignments: &[(&str, TidyAgg)],
714    ) -> Result<DataFrame, TidyError> {
715        let mut seen = BTreeSet::new();
716        for &(name, _) in assignments {
717            if !seen.insert(name) {
718                return Err(TidyError::DuplicateColumn(name.to_string()));
719            }
720        }
721
722        let base = &self.view.base;
723        let n_groups = self.index.groups.len();
724        let mut result_columns: Vec<(String, Column)> = Vec::new();
725
726        // Build key columns (one value per group)
727        for (ki, key_name) in self.index.key_names.iter().enumerate() {
728            let base_col = base
729                .get_column(key_name)
730                .ok_or_else(|| TidyError::ColumnNotFound(key_name.clone()))?;
731
732            let col = match base_col {
733                Column::Int(_) => {
734                    let vals: Vec<i64> = self.index.groups.iter().map(|g| {
735                        match &g.key_values[ki] {
736                            GroupKey::Int(v) => *v,
737                            _ => 0,
738                        }
739                    }).collect();
740                    Column::Int(vals)
741                }
742                Column::Float(_) => {
743                    let vals: Vec<f64> = self.index.groups.iter().map(|g| {
744                        match &g.key_values[ki] {
745                            GroupKey::Float(fk) => fk.0,
746                            GroupKey::Int(v) => *v as f64,
747                            _ => 0.0,
748                        }
749                    }).collect();
750                    Column::Float(vals)
751                }
752                Column::Bool(_) => {
753                    let vals: Vec<bool> = self.index.groups.iter().map(|g| {
754                        match &g.key_values[ki] {
755                            GroupKey::Bool(v) => *v,
756                            _ => false,
757                        }
758                    }).collect();
759                    Column::Bool(vals)
760                }
761                _ => {
762                    let vals: Vec<String> = self.index.groups.iter().map(|g| {
763                        g.key_values[ki].to_display()
764                    }).collect();
765                    Column::Str(vals)
766                }
767            };
768            result_columns.push((key_name.clone(), col));
769        }
770
771        // Build aggregation columns
772        for &(out_name, ref agg) in assignments {
773            let col = self.eval_agg(agg, n_groups, base)?;
774            result_columns.push((out_name.to_string(), col));
775        }
776
777        DataFrame::from_columns(result_columns).map_err(|e| TidyError::Internal(e.to_string()))
778    }
779
780    fn eval_agg(
781        &self,
782        agg: &TidyAgg,
783        _n_groups: usize,
784        base: &DataFrame,
785    ) -> Result<Column, TidyError> {
786        match agg {
787            TidyAgg::Count => {
788                let counts: Vec<i64> = self
789                    .index
790                    .groups
791                    .iter()
792                    .map(|g| g.row_indices.len() as i64)
793                    .collect();
794                Ok(Column::Int(counts))
795            }
796            TidyAgg::Sum(col_name) => {
797                let col = base
798                    .get_column(col_name)
799                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
800                let sums: Vec<f64> = self.index.groups.iter().map(|g| {
801                    let mut acc = KahanAccumulator::new();
802                    for &i in &g.row_indices {
803                        if let Some(v) = col.get_f64(i) {
804                            acc.add(v);
805                        }
806                    }
807                    acc.finalize()
808                }).collect();
809                Ok(Column::Float(sums))
810            }
811            TidyAgg::Mean(col_name) => {
812                let col = base
813                    .get_column(col_name)
814                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
815                let means: Vec<f64> = self.index.groups.iter().map(|g| {
816                    if g.row_indices.is_empty() {
817                        return f64::NAN;
818                    }
819                    let mut acc = KahanAccumulator::new();
820                    for &i in &g.row_indices {
821                        if let Some(v) = col.get_f64(i) {
822                            acc.add(v);
823                        }
824                    }
825                    acc.finalize() / g.row_indices.len() as f64
826                }).collect();
827                Ok(Column::Float(means))
828            }
829            TidyAgg::Min(col_name) => {
830                let col = base
831                    .get_column(col_name)
832                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
833                let mins: Vec<f64> = self.index.groups.iter().map(|g| {
834                    let mut min = f64::INFINITY;
835                    for &i in &g.row_indices {
836                        if let Some(v) = col.get_f64(i) {
837                            if v < min { min = v; }
838                        }
839                    }
840                    min
841                }).collect();
842                Ok(Column::Float(mins))
843            }
844            TidyAgg::Max(col_name) => {
845                let col = base
846                    .get_column(col_name)
847                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
848                let maxs: Vec<f64> = self.index.groups.iter().map(|g| {
849                    let mut max = f64::NEG_INFINITY;
850                    for &i in &g.row_indices {
851                        if let Some(v) = col.get_f64(i) {
852                            if v > max { max = v; }
853                        }
854                    }
855                    max
856                }).collect();
857                Ok(Column::Float(maxs))
858            }
859            TidyAgg::Sd(col_name) => {
860                let col = base
861                    .get_column(col_name)
862                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
863                let sds: Vec<f64> = self.index.groups.iter().map(|g| {
864                    let n = g.row_indices.len();
865                    if n < 2 { return f64::NAN; }
866                    let mut acc = KahanAccumulator::new();
867                    for &i in &g.row_indices {
868                        if let Some(v) = col.get_f64(i) { acc.add(v); }
869                    }
870                    let mean = acc.finalize() / n as f64;
871                    let mut var_acc = KahanAccumulator::new();
872                    for &i in &g.row_indices {
873                        if let Some(v) = col.get_f64(i) {
874                            let diff = v - mean;
875                            var_acc.add(diff * diff);
876                        }
877                    }
878                    (var_acc.finalize() / (n - 1) as f64).sqrt()
879                }).collect();
880                Ok(Column::Float(sds))
881            }
882            TidyAgg::Var(col_name) => {
883                let col = base
884                    .get_column(col_name)
885                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
886                let vars: Vec<f64> = self.index.groups.iter().map(|g| {
887                    let n = g.row_indices.len();
888                    if n < 2 { return f64::NAN; }
889                    let mut acc = KahanAccumulator::new();
890                    for &i in &g.row_indices {
891                        if let Some(v) = col.get_f64(i) { acc.add(v); }
892                    }
893                    let mean = acc.finalize() / n as f64;
894                    let mut var_acc = KahanAccumulator::new();
895                    for &i in &g.row_indices {
896                        if let Some(v) = col.get_f64(i) {
897                            let diff = v - mean;
898                            var_acc.add(diff * diff);
899                        }
900                    }
901                    var_acc.finalize() / (n - 1) as f64
902                }).collect();
903                Ok(Column::Float(vars))
904            }
905            TidyAgg::First(col_name) => {
906                let col = base
907                    .get_column(col_name)
908                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
909                let vals: Result<Vec<String>, _> = self.index.groups.iter().map(|g| {
910                    g.row_indices.first()
911                        .map(|&i| col.get_display(i))
912                        .ok_or(TidyError::EmptyGroup)
913                }).collect();
914                Ok(Column::Str(vals?))
915            }
916            TidyAgg::Last(col_name) => {
917                let col = base
918                    .get_column(col_name)
919                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
920                let vals: Result<Vec<String>, _> = self.index.groups.iter().map(|g| {
921                    g.row_indices.last()
922                        .map(|&i| col.get_display(i))
923                        .ok_or(TidyError::EmptyGroup)
924                }).collect();
925                Ok(Column::Str(vals?))
926            }
927            TidyAgg::NDistinct(col_name) => {
928                let col = base
929                    .get_column(col_name)
930                    .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
931                let counts: Vec<i64> = self.index.groups.iter().map(|g| {
932                    let mut uniq = BTreeSet::new();
933                    for &i in &g.row_indices {
934                        uniq.insert(col.get_display(i));
935                    }
936                    uniq.len() as i64
937                }).collect();
938                Ok(Column::Int(counts))
939            }
940        }
941    }
942
943    /// Get the group index (for auditing).
944    pub fn group_index(&self) -> &GroupIndex {
945        &self.index
946    }
947}
948
949// ── Join helpers ────────────────────────────────────────────────────────────
950
951fn resolve_join_keys(
952    left: &TidyView,
953    right: &TidyView,
954    on: &[(&str, &str)],
955) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
956    let mut left_indices = Vec::with_capacity(on.len());
957    let mut right_indices = Vec::with_capacity(on.len());
958    for &(lk, rk) in on {
959        let li = left
960            .base
961            .column_index(lk)
962            .ok_or_else(|| TidyError::ColumnNotFound(lk.to_string()))?;
963        let ri = right
964            .base
965            .column_index(rk)
966            .ok_or_else(|| TidyError::ColumnNotFound(rk.to_string()))?;
967        left_indices.push(li);
968        right_indices.push(ri);
969    }
970    Ok((left_indices, right_indices))
971}
972
973fn join_match_rows(
974    left: &TidyView,
975    right: &TidyView,
976    on: &[(&str, &str)],
977) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
978    let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
979    let mut out_left = Vec::new();
980    let mut out_right = Vec::new();
981
982    // Single-key fast path
983    if left_key_cols.len() == 1 {
984        let r_col = &right.base.columns[right_key_cols[0]].1;
985        let l_col = &left.base.columns[left_key_cols[0]].1;
986        let mut lookup: BTreeMap<ColumnKeyRef<'_>, Vec<usize>> = BTreeMap::new();
987        for r in right.mask.iter_set() {
988            let key = ColumnKeyRef::from_column(r_col, r);
989            lookup.entry(key).or_default().push(r);
990        }
991        for l_row in left.mask.iter_set() {
992            let key = ColumnKeyRef::from_column(l_col, l_row);
993            if let Some(matches) = lookup.get(&key) {
994                for &r_row in matches {
995                    out_left.push(l_row);
996                    out_right.push(r_row);
997                }
998            }
999        }
1000        return Ok((out_left, out_right));
1001    }
1002
1003    // Multi-key path
1004    let r_cols: Vec<&Column> = right_key_cols
1005        .iter()
1006        .map(|&ci| &right.base.columns[ci].1)
1007        .collect();
1008    let l_cols: Vec<&Column> = left_key_cols
1009        .iter()
1010        .map(|&ci| &left.base.columns[ci].1)
1011        .collect();
1012    let mut lookup: BTreeMap<Vec<ColumnKeyRef<'_>>, Vec<usize>> = BTreeMap::new();
1013    for r in right.mask.iter_set() {
1014        let key: Vec<ColumnKeyRef<'_>> = r_cols.iter().map(|col| ColumnKeyRef::from_column(col, r)).collect();
1015        lookup.entry(key).or_default().push(r);
1016    }
1017    for l_row in left.mask.iter_set() {
1018        let key: Vec<ColumnKeyRef<'_>> = l_cols.iter().map(|col| ColumnKeyRef::from_column(col, l_row)).collect();
1019        if let Some(matches) = lookup.get(&key) {
1020            for &r_row in matches {
1021                out_left.push(l_row);
1022                out_right.push(r_row);
1023            }
1024        }
1025    }
1026    Ok((out_left, out_right))
1027}
1028
1029fn join_match_rows_optional(
1030    left: &TidyView,
1031    right: &TidyView,
1032    on: &[(&str, &str)],
1033) -> Result<(Vec<usize>, Vec<Option<usize>>), TidyError> {
1034    let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
1035    let mut out_left = Vec::new();
1036    let mut out_right: Vec<Option<usize>> = Vec::new();
1037
1038    if left_key_cols.len() == 1 {
1039        let r_col = &right.base.columns[right_key_cols[0]].1;
1040        let l_col = &left.base.columns[left_key_cols[0]].1;
1041        let mut lookup: BTreeMap<ColumnKeyRef<'_>, Vec<usize>> = BTreeMap::new();
1042        for r in right.mask.iter_set() {
1043            let key = ColumnKeyRef::from_column(r_col, r);
1044            lookup.entry(key).or_default().push(r);
1045        }
1046        for l_row in left.mask.iter_set() {
1047            let key = ColumnKeyRef::from_column(l_col, l_row);
1048            match lookup.get(&key) {
1049                Some(matches) if !matches.is_empty() => {
1050                    for &r_row in matches {
1051                        out_left.push(l_row);
1052                        out_right.push(Some(r_row));
1053                    }
1054                }
1055                _ => {
1056                    out_left.push(l_row);
1057                    out_right.push(None);
1058                }
1059            }
1060        }
1061        return Ok((out_left, out_right));
1062    }
1063
1064    let r_cols: Vec<&Column> = right_key_cols.iter().map(|&ci| &right.base.columns[ci].1).collect();
1065    let l_cols: Vec<&Column> = left_key_cols.iter().map(|&ci| &left.base.columns[ci].1).collect();
1066    let mut lookup: BTreeMap<Vec<ColumnKeyRef<'_>>, Vec<usize>> = BTreeMap::new();
1067    for r in right.mask.iter_set() {
1068        let key: Vec<ColumnKeyRef<'_>> = r_cols.iter().map(|col| ColumnKeyRef::from_column(col, r)).collect();
1069        lookup.entry(key).or_default().push(r);
1070    }
1071    for l_row in left.mask.iter_set() {
1072        let key: Vec<ColumnKeyRef<'_>> = l_cols.iter().map(|col| ColumnKeyRef::from_column(col, l_row)).collect();
1073        match lookup.get(&key) {
1074            Some(matches) if !matches.is_empty() => {
1075                for &r_row in matches {
1076                    out_left.push(l_row);
1077                    out_right.push(Some(r_row));
1078                }
1079            }
1080            _ => {
1081                out_left.push(l_row);
1082                out_right.push(None);
1083            }
1084        }
1085    }
1086    Ok((out_left, out_right))
1087}
1088
1089fn build_join_frame(
1090    left: &TidyView,
1091    right: &TidyView,
1092    left_rows: &[usize],
1093    right_rows: &[usize],
1094    on: &[(&str, &str)],
1095) -> Result<DataFrame, TidyError> {
1096    let right_key_names: BTreeSet<&str> = on.iter().map(|&(_, rk)| rk).collect();
1097    let mut result_cols: Vec<(String, Column)> = Vec::new();
1098
1099    // All left columns
1100    for (name, col) in &left.base.columns {
1101        result_cols.push((name.clone(), col.gather(left_rows)));
1102    }
1103
1104    // Right columns excluding join keys (already in left)
1105    for (name, col) in &right.base.columns {
1106        if !right_key_names.contains(name.as_str()) {
1107            result_cols.push((name.clone(), col.gather(right_rows)));
1108        }
1109    }
1110
1111    DataFrame::from_columns(result_cols).map_err(|e| TidyError::Internal(e.to_string()))
1112}
1113
1114fn build_left_join_frame(
1115    left: &TidyView,
1116    right: &TidyView,
1117    left_rows: &[usize],
1118    right_rows_opt: &[Option<usize>],
1119    on: &[(&str, &str)],
1120) -> Result<DataFrame, TidyError> {
1121    let right_key_names: BTreeSet<&str> = on.iter().map(|&(_, rk)| rk).collect();
1122    let mut result_cols: Vec<(String, Column)> = Vec::new();
1123
1124    // All left columns
1125    for (name, col) in &left.base.columns {
1126        result_cols.push((name.clone(), col.gather(left_rows)));
1127    }
1128
1129    // Right columns with default values for unmatched rows
1130    for (name, col) in &right.base.columns {
1131        if right_key_names.contains(name.as_str()) {
1132            continue;
1133        }
1134        let gathered = match col {
1135            Column::Int(v) => Column::Int(
1136                right_rows_opt.iter().map(|opt| opt.map(|i| v[i]).unwrap_or(0)).collect(),
1137            ),
1138            Column::Float(v) => Column::Float(
1139                right_rows_opt.iter().map(|opt| opt.map(|i| v[i]).unwrap_or(0.0)).collect(),
1140            ),
1141            Column::Str(v) => Column::Str(
1142                right_rows_opt
1143                    .iter()
1144                    .map(|opt| opt.map(|i| v[i].clone()).unwrap_or_default())
1145                    .collect(),
1146            ),
1147            Column::Bool(v) => Column::Bool(
1148                right_rows_opt.iter().map(|opt| opt.map(|i| v[i]).unwrap_or(false)).collect(),
1149            ),
1150        };
1151        result_cols.push((name.clone(), gathered));
1152    }
1153
1154    DataFrame::from_columns(result_cols).map_err(|e| TidyError::Internal(e.to_string()))
1155}
1156
1157// ── Expression helpers ──────────────────────────────────────────────────────
1158
1159fn validate_expr_columns(expr: &DExpr, valid_names: &[String]) -> Result<(), TidyError> {
1160    match expr {
1161        DExpr::Col(name) => {
1162            if !valid_names.iter().any(|n| n == name) {
1163                return Err(TidyError::ColumnNotFound(name.clone()));
1164            }
1165            Ok(())
1166        }
1167        DExpr::BinOp { left, right, .. } => {
1168            validate_expr_columns(left, valid_names)?;
1169            validate_expr_columns(right, valid_names)
1170        }
1171        DExpr::Not(inner) => validate_expr_columns(inner, valid_names),
1172        DExpr::And(a, b) | DExpr::Or(a, b) => {
1173            validate_expr_columns(a, valid_names)?;
1174            validate_expr_columns(b, valid_names)
1175        }
1176        _ => Ok(()),
1177    }
1178}
1179
1180fn eval_expr_column(
1181    df: &DataFrame,
1182    dexpr: &DExpr,
1183    nrows: usize,
1184) -> Result<Column, TidyError> {
1185    // Evaluate per-row and collect into a column
1186    let mut floats = Vec::with_capacity(nrows);
1187    let mut ints = Vec::with_capacity(nrows);
1188    let mut strings = Vec::with_capacity(nrows);
1189    let mut bools = Vec::with_capacity(nrows);
1190    let mut first_type: Option<&str> = None;
1191
1192    for row in 0..nrows {
1193        let val = expr::eval_expr_row(df, dexpr, row)
1194            .map_err(|e| TidyError::Internal(e))?;
1195        match &val {
1196            ExprValue::Float(v) => {
1197                if first_type.is_none() { first_type = Some("Float"); }
1198                floats.push(*v);
1199            }
1200            ExprValue::Int(v) => {
1201                if first_type.is_none() { first_type = Some("Int"); }
1202                ints.push(*v);
1203            }
1204            ExprValue::Str(v) => {
1205                if first_type.is_none() { first_type = Some("Str"); }
1206                strings.push(v.clone());
1207            }
1208            ExprValue::Bool(v) => {
1209                if first_type.is_none() { first_type = Some("Bool"); }
1210                bools.push(*v);
1211            }
1212        }
1213    }
1214
1215    match first_type {
1216        Some("Float") => Ok(Column::Float(floats)),
1217        Some("Int") => Ok(Column::Int(ints)),
1218        Some("Str") => Ok(Column::Str(strings)),
1219        Some("Bool") => Ok(Column::Bool(bools)),
1220        _ => Ok(Column::Float(Vec::new())),
1221    }
1222}
1223
1224// ── Tests ───────────────────────────────────────────────────────────────────
1225
1226#[cfg(test)]
1227mod tests {
1228    use super::*;
1229    use crate::expr::{binop, col, BinOp};
1230
1231    fn make_test_df() -> DataFrame {
1232        DataFrame::from_columns(vec![
1233            ("id".into(), Column::Int(vec![1, 2, 3, 4, 5])),
1234            (
1235                "region".into(),
1236                Column::Str(vec![
1237                    "West".into(), "East".into(), "West".into(),
1238                    "East".into(), "West".into(),
1239                ]),
1240            ),
1241            ("value".into(), Column::Float(vec![10.0, 20.0, 30.0, 40.0, 50.0])),
1242        ])
1243        .unwrap()
1244    }
1245
1246    #[test]
1247    fn test_filter_no_copy() {
1248        let df = make_test_df();
1249        let view = TidyView::new(df);
1250        let filtered = view
1251            .filter(&binop(BinOp::Gt, col("value"), DExpr::LitFloat(25.0)))
1252            .unwrap();
1253        assert_eq!(filtered.nrows(), 3); // 30, 40, 50
1254        assert_eq!(view.nrows(), 5); // original unchanged
1255    }
1256
1257    #[test]
1258    fn test_chained_filter() {
1259        let df = make_test_df();
1260        let view = TidyView::new(df);
1261        let filtered = view
1262            .filter(&binop(BinOp::Gt, col("value"), DExpr::LitFloat(15.0)))
1263            .unwrap()
1264            .filter(&binop(BinOp::Lt, col("value"), DExpr::LitFloat(45.0)))
1265            .unwrap();
1266        assert_eq!(filtered.nrows(), 3); // 20, 30, 40
1267    }
1268
1269    #[test]
1270    fn test_select() {
1271        let df = make_test_df();
1272        let view = TidyView::new(df);
1273        let selected = view.select(&["id", "value"]).unwrap();
1274        assert_eq!(selected.ncols(), 2);
1275        assert_eq!(view.ncols(), 3); // original unchanged
1276    }
1277
1278    #[test]
1279    fn test_group_by_summarise() {
1280        let df = make_test_df();
1281        let view = TidyView::new(df);
1282        let grouped = view.group_by(&["region"]).unwrap();
1283        let summary = grouped
1284            .summarise(&[
1285                ("n", TidyAgg::Count),
1286                ("total", TidyAgg::Sum("value".into())),
1287                ("avg", TidyAgg::Mean("value".into())),
1288            ])
1289            .unwrap();
1290        // West: 3 rows (10+30+50=90, mean=30), East: 2 rows (20+40=60, mean=30)
1291        assert_eq!(summary.nrows(), 2);
1292        assert_eq!(summary.ncols(), 4); // region, n, total, avg
1293    }
1294
1295    #[test]
1296    fn test_group_order_first_occurrence() {
1297        let df = make_test_df();
1298        let view = TidyView::new(df);
1299        let grouped = view.group_by(&["region"]).unwrap();
1300        // First occurrence: West (row 0), East (row 1)
1301        assert_eq!(grouped.index.groups[0].key_values[0].to_display(), "West");
1302        assert_eq!(grouped.index.groups[1].key_values[0].to_display(), "East");
1303    }
1304
1305    #[test]
1306    fn test_arrange() {
1307        let df = make_test_df();
1308        let view = TidyView::new(df);
1309        let sorted = view.arrange(&[ArrangeKey::desc("value")]).unwrap();
1310        let mat = sorted.materialize().unwrap();
1311        if let Column::Float(vals) = &mat.columns[2].1 {
1312            assert_eq!(vals, &[50.0, 40.0, 30.0, 20.0, 10.0]);
1313        }
1314    }
1315
1316    #[test]
1317    fn test_inner_join() {
1318        let left = DataFrame::from_columns(vec![
1319            ("id".into(), Column::Int(vec![1, 2, 3])),
1320            ("name".into(), Column::Str(vec!["a".into(), "b".into(), "c".into()])),
1321        ]).unwrap();
1322        let right = DataFrame::from_columns(vec![
1323            ("id".into(), Column::Int(vec![1, 3, 4])),
1324            ("dept".into(), Column::Str(vec!["eng".into(), "sales".into(), "hr".into()])),
1325        ]).unwrap();
1326        let lv = TidyView::new(left);
1327        let rv = TidyView::new(right);
1328        let joined = lv.inner_join(&rv, &[("id", "id")]).unwrap();
1329        assert_eq!(joined.nrows(), 2); // id=1 and id=3
1330    }
1331
1332    #[test]
1333    fn test_deterministic_sample() {
1334        let df = make_test_df();
1335        let view = TidyView::new(df);
1336        let s1 = view.slice_sample(3, 42);
1337        let s2 = view.slice_sample(3, 42);
1338        let r1: Vec<usize> = s1.mask.iter_set().collect();
1339        let r2: Vec<usize> = s2.mask.iter_set().collect();
1340        assert_eq!(r1, r2); // same seed = same rows
1341    }
1342
1343    #[test]
1344    fn test_distinct() {
1345        let df = DataFrame::from_columns(vec![
1346            ("x".into(), Column::Str(vec!["a".into(), "b".into(), "a".into(), "c".into()])),
1347        ]).unwrap();
1348        let view = TidyView::new(df);
1349        let unique = view.distinct(&["x"]).unwrap();
1350        assert_eq!(unique.nrows(), 3); // a, b, c
1351    }
1352
1353    #[test]
1354    fn test_kahan_summation_in_summarise() {
1355        // Sum many small values — Kahan compensates rounding drift
1356        let n = 10_000;
1357        let values: Vec<f64> = (0..n).map(|_| 0.1).collect();
1358        let grps: Vec<String> = (0..n).map(|_| "a".to_string()).collect();
1359        let df = DataFrame::from_columns(vec![
1360            ("grp".into(), Column::Str(grps)),
1361            ("val".into(), Column::Float(values)),
1362        ]).unwrap();
1363        let view = TidyView::new(df);
1364        let grouped = view.group_by(&["grp"]).unwrap();
1365        let summary = grouped.summarise(&[("total", TidyAgg::Sum("val".into()))]).unwrap();
1366        if let Column::Float(v) = &summary.columns[1].1 {
1367            assert!(
1368                (v[0] - 1000.0).abs() < 1e-6,
1369                "Kahan sum {} should be close to 1000.0", v[0]
1370            );
1371        }
1372    }
1373
1374    #[test]
1375    fn test_snapshot_semantics() {
1376        let df = DataFrame::from_columns(vec![
1377            ("x".into(), Column::Int(vec![1, 2, 3])),
1378        ]).unwrap();
1379        let view = TidyView::new(df);
1380        // Referencing a column created in the same mutate call should fail
1381        let result = view.mutate(&[
1382            ("a", binop(BinOp::Add, col("x"), DExpr::LitInt(1))),
1383            ("b", binop(BinOp::Mul, col("a"), DExpr::LitInt(2))),
1384        ]);
1385        assert!(result.is_err()); // "a" not found in snapshot
1386    }
1387}