1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12use std::rc::Rc;
13
14use crate::bitmask::BitMask;
15use crate::column::{Column, ColumnKeyRef, GroupKey};
16use crate::dataframe::DataFrame;
17use crate::expr::{self, DExpr, ExprValue};
18use crate::kahan::KahanAccumulator;
19
20#[derive(Debug, Clone)]
24pub enum TidyError {
25 ColumnNotFound(String),
27 DuplicateColumn(String),
29 PredicateNotBool { got: String },
31 TypeMismatch { expected: String, got: String },
33 LengthMismatch { expected: usize, got: usize },
35 Internal(String),
37 EmptyGroup,
39}
40
41impl fmt::Display for TidyError {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 TidyError::ColumnNotFound(n) => write!(f, "column `{}` not found", n),
45 TidyError::DuplicateColumn(n) => write!(f, "duplicate column `{}`", n),
46 TidyError::PredicateNotBool { got } => {
47 write!(f, "filter predicate must be Bool, got {}", got)
48 }
49 TidyError::TypeMismatch { expected, got } => {
50 write!(f, "type mismatch: expected {}, got {}", expected, got)
51 }
52 TidyError::LengthMismatch { expected, got } => {
53 write!(f, "length mismatch: expected {} rows, got {}", expected, got)
54 }
55 TidyError::Internal(msg) => write!(f, "internal error: {}", msg),
56 TidyError::EmptyGroup => write!(f, "aggregation on empty group"),
57 }
58 }
59}
60
61impl std::error::Error for TidyError {}
62
63#[derive(Debug, Clone)]
67pub struct ProjectionMap {
68 indices: Option<Vec<usize>>,
70}
71
72impl ProjectionMap {
73 pub fn all() -> Self {
75 Self { indices: None }
76 }
77
78 pub fn from_indices(indices: Vec<usize>) -> Self {
80 Self {
81 indices: Some(indices),
82 }
83 }
84
85 pub fn resolve(&self, ncols: usize) -> Vec<usize> {
87 match &self.indices {
88 Some(idx) => idx.clone(),
89 None => (0..ncols).collect(),
90 }
91 }
92
93 pub fn len(&self, ncols: usize) -> usize {
95 match &self.indices {
96 Some(idx) => idx.len(),
97 None => ncols,
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
106pub struct ArrangeKey {
107 pub col_name: String,
108 pub descending: bool,
109}
110
111impl ArrangeKey {
112 pub fn asc(col_name: &str) -> Self {
113 Self {
114 col_name: col_name.to_string(),
115 descending: false,
116 }
117 }
118 pub fn desc(col_name: &str) -> Self {
119 Self {
120 col_name: col_name.to_string(),
121 descending: true,
122 }
123 }
124}
125
126#[derive(Debug, Clone)]
130pub enum TidyAgg {
131 Count,
133 Sum(String),
135 Mean(String),
137 Min(String),
139 Max(String),
141 Sd(String),
143 Var(String),
145 First(String),
147 Last(String),
149 NDistinct(String),
151}
152
153#[derive(Debug, Clone)]
157pub struct GroupMeta {
158 pub key_values: Vec<GroupKey>,
159 pub row_indices: Vec<usize>,
160}
161
162#[derive(Debug, Clone)]
164pub struct GroupIndex {
165 pub groups: Vec<GroupMeta>,
166 pub key_names: Vec<String>,
167}
168
169impl GroupIndex {
170 pub fn build_fast_typed(
173 base: &DataFrame,
174 key_col_indices: &[usize],
175 visible_rows: &[usize],
176 key_names: Vec<String>,
177 ) -> Self {
178 if key_col_indices.len() == 1 {
179 return Self::build_single(base, key_col_indices[0], visible_rows, key_names);
180 }
181 Self::build_multi(base, key_col_indices, visible_rows, key_names)
182 }
183
184 fn build_single(
186 base: &DataFrame,
187 key_col_idx: usize,
188 visible_rows: &[usize],
189 key_names: Vec<String>,
190 ) -> Self {
191 let col = &base.columns[key_col_idx].1;
192 let mut groups: Vec<GroupMeta> = Vec::new();
193 let mut key_to_slot: BTreeMap<ColumnKeyRef<'_>, usize> = BTreeMap::new();
194
195 for &row in visible_rows {
196 let key = ColumnKeyRef::from_column(col, row);
197 if let Some(&slot) = key_to_slot.get(&key) {
198 groups[slot].row_indices.push(row);
199 } else {
200 let slot = groups.len();
201 let key_values = vec![key.to_owned_key()];
202 key_to_slot.insert(key, slot);
203 groups.push(GroupMeta {
204 key_values,
205 row_indices: vec![row],
206 });
207 }
208 }
209 GroupIndex { groups, key_names }
210 }
211
212 fn build_multi(
214 base: &DataFrame,
215 key_col_indices: &[usize],
216 visible_rows: &[usize],
217 key_names: Vec<String>,
218 ) -> Self {
219 let mut groups: Vec<GroupMeta> = Vec::new();
220 let mut key_to_slot: BTreeMap<Vec<ColumnKeyRef<'_>>, usize> = BTreeMap::new();
221 let cols: Vec<&Column> = key_col_indices
222 .iter()
223 .map(|&ci| &base.columns[ci].1)
224 .collect();
225
226 for &row in visible_rows {
227 let key: Vec<ColumnKeyRef<'_>> =
228 cols.iter().map(|col| ColumnKeyRef::from_column(col, row)).collect();
229 if let Some(&slot) = key_to_slot.get(&key) {
230 groups[slot].row_indices.push(row);
231 } else {
232 let slot = groups.len();
233 let key_values: Vec<GroupKey> = key.iter().map(|k| k.to_owned_key()).collect();
234 key_to_slot.insert(key, slot);
235 groups.push(GroupMeta {
236 key_values,
237 row_indices: vec![row],
238 });
239 }
240 }
241 GroupIndex { groups, key_names }
242 }
243}
244
245#[derive(Debug, Clone)]
253pub struct TidyView {
254 pub(crate) base: Rc<DataFrame>,
255 pub(crate) mask: BitMask,
256 pub(crate) proj: ProjectionMap,
257 pub(crate) ordering: Option<Rc<Vec<usize>>>,
258}
259
260impl TidyView {
261 pub fn new(df: DataFrame) -> Self {
263 let nrows = df.nrows();
264 Self {
265 base: Rc::new(df),
266 mask: BitMask::all_true(nrows),
267 proj: ProjectionMap::all(),
268 ordering: None,
269 }
270 }
271
272 pub fn from_rc(base: Rc<DataFrame>) -> Self {
274 let nrows = base.nrows();
275 Self {
276 base,
277 mask: BitMask::all_true(nrows),
278 proj: ProjectionMap::all(),
279 ordering: None,
280 }
281 }
282
283 pub fn nrows(&self) -> usize {
285 self.mask.count_ones()
286 }
287
288 pub fn ncols(&self) -> usize {
290 self.proj.len(self.base.ncols())
291 }
292
293 pub fn mask(&self) -> &BitMask {
295 &self.mask
296 }
297
298 pub fn base(&self) -> &DataFrame {
300 &self.base
301 }
302
303 fn visible_rows_ordered(&self) -> Vec<usize> {
305 if let Some(ref ord) = self.ordering {
306 ord.as_ref().clone()
307 } else {
308 self.mask.iter_set().collect()
309 }
310 }
311
312 fn resolve_ordering(&self) -> Option<TidyView> {
315 let ord = self.ordering.as_ref()?;
316 let row_indices: &[usize] = ord.as_ref();
317 let mut all_cols = Vec::with_capacity(self.base.ncols());
318 for (name, col) in &self.base.columns {
319 all_cols.push((name.clone(), col.gather(row_indices)));
320 }
321 let new_base =
322 DataFrame::from_columns(all_cols).expect("resolve_ordering: column length mismatch");
323 let nrows = new_base.nrows();
324 Some(TidyView {
325 base: Rc::new(new_base),
326 mask: BitMask::all_true(nrows),
327 proj: self.proj.clone(),
328 ordering: None,
329 })
330 }
331
332 fn view_from_row_indices(&self, row_indices: Vec<usize>) -> TidyView {
334 let nrows_base = self.base.nrows();
335 let mut words = vec![0u64; crate::bitmask::nwords_for(nrows_base)];
336 for &r in &row_indices {
337 words[r / 64] |= 1u64 << (r % 64);
338 }
339 TidyView {
340 base: Rc::clone(&self.base),
341 mask: BitMask {
342 words,
343 nrows: nrows_base,
344 },
345 proj: self.proj.clone(),
346 ordering: None,
347 }
348 }
349
350 pub fn filter(&self, predicate: &DExpr) -> Result<TidyView, TidyError> {
355 if let Some(ref ord) = self.ordering {
357 let pred_mask =
358 if let Some(m) = expr::try_eval_predicate_columnar(&self.base, predicate, &self.mask)
359 {
360 m
361 } else {
362 let nrows_base = self.base.nrows();
363 let mut new_words = self.mask.words.clone();
364 for &row in ord.iter() {
365 let b = expr::eval_expr_row(&self.base, predicate, row)
366 .map_err(|e| TidyError::Internal(e))?;
367 let pass = match b {
368 ExprValue::Bool(v) => v,
369 _ => {
370 return Err(TidyError::PredicateNotBool {
371 got: b.type_name().to_string(),
372 })
373 }
374 };
375 if !pass {
376 new_words[row / 64] &= !(1u64 << (row % 64));
377 }
378 }
379 BitMask {
380 words: new_words,
381 nrows: nrows_base,
382 }
383 };
384 let new_ord: Vec<usize> = ord.iter().filter(|&&row| pred_mask.get(row)).copied().collect();
385 return Ok(TidyView {
386 base: Rc::clone(&self.base),
387 mask: pred_mask,
388 proj: self.proj.clone(),
389 ordering: Some(Rc::new(new_ord)),
390 });
391 }
392
393 if let Some(new_mask) =
395 expr::try_eval_predicate_columnar(&self.base, predicate, &self.mask)
396 {
397 return Ok(TidyView {
398 base: Rc::clone(&self.base),
399 mask: new_mask,
400 proj: self.proj.clone(),
401 ordering: None,
402 });
403 }
404
405 let nrows_base = self.base.nrows();
407 let mut new_words = self.mask.words.clone();
408 for row in self.mask.iter_set() {
409 let b = expr::eval_expr_row(&self.base, predicate, row)
410 .map_err(|e| TidyError::Internal(e))?;
411 let pass = match b {
412 ExprValue::Bool(v) => v,
413 _ => {
414 return Err(TidyError::PredicateNotBool {
415 got: b.type_name().to_string(),
416 })
417 }
418 };
419 if !pass {
420 new_words[row / 64] &= !(1u64 << (row % 64));
421 }
422 }
423 Ok(TidyView {
424 base: Rc::clone(&self.base),
425 mask: BitMask {
426 words: new_words,
427 nrows: nrows_base,
428 },
429 proj: self.proj.clone(),
430 ordering: None,
431 })
432 }
433
434 pub fn select(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
439 let mut seen = BTreeSet::new();
440 for &name in cols {
441 if !seen.insert(name) {
442 return Err(TidyError::DuplicateColumn(name.to_string()));
443 }
444 }
445 let mut new_indices = Vec::with_capacity(cols.len());
446 for &name in cols {
447 let idx = self
448 .base
449 .columns
450 .iter()
451 .position(|(n, _)| n == name)
452 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
453 new_indices.push(idx);
454 }
455 Ok(TidyView {
456 base: Rc::clone(&self.base),
457 mask: self.mask.clone(),
458 proj: ProjectionMap::from_indices(new_indices),
459 ordering: self.ordering.clone(),
460 })
461 }
462
463 pub fn mutate(&self, assignments: &[(&str, DExpr)]) -> Result<DataFrame, TidyError> {
470 let mut seen = BTreeSet::new();
471 for &(name, _) in assignments {
472 if !seen.insert(name) {
473 return Err(TidyError::DuplicateColumn(name.to_string()));
474 }
475 }
476
477 let mut df = self.materialize()?;
478 let snapshot_names: Vec<String> = df.columns.iter().map(|(n, _)| n.clone()).collect();
479
480 for &(col_name, ref dexpr) in assignments {
481 validate_expr_columns(dexpr, &snapshot_names)?;
483
484 let nrows = df.nrows();
485 let new_col = eval_expr_column(&df, dexpr, nrows)?;
486
487 if let Some(pos) = df.columns.iter().position(|(n, _)| n == col_name) {
488 df.columns[pos].1 = new_col;
489 } else {
490 df.columns.push((col_name.to_string(), new_col));
491 }
492 }
493 Ok(df)
494 }
495
496 pub fn group_by(&self, keys: &[&str]) -> Result<GroupedTidyView, TidyError> {
502 let mut key_col_indices = Vec::with_capacity(keys.len());
503 for &key in keys {
504 let idx = self
505 .base
506 .columns
507 .iter()
508 .position(|(n, _)| n == key)
509 .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
510 key_col_indices.push(idx);
511 }
512
513 let key_names: Vec<String> = keys.iter().map(|s| s.to_string()).collect();
514 let visible_rows: Vec<usize> = if self.ordering.is_none()
515 && self.mask.count_ones() == self.base.nrows()
516 {
517 (0..self.base.nrows()).collect()
518 } else {
519 self.visible_rows_ordered()
520 };
521
522 let index =
523 GroupIndex::build_fast_typed(&self.base, &key_col_indices, &visible_rows, key_names);
524
525 Ok(GroupedTidyView {
526 view: self.clone(),
527 index,
528 })
529 }
530
531 pub fn arrange(&self, keys: &[ArrangeKey]) -> Result<TidyView, TidyError> {
536 for key in keys {
537 if self.base.get_column(&key.col_name).is_none() {
538 return Err(TidyError::ColumnNotFound(key.col_name.clone()));
539 }
540 }
541 let mut row_indices: Vec<usize> = self.mask.iter_set().collect();
542 let key_cols: Vec<(&Column, bool)> = keys
543 .iter()
544 .map(|key| {
545 let col = self.base.get_column(&key.col_name).unwrap();
546 (col, key.descending)
547 })
548 .collect();
549
550 row_indices.sort_by(|&a, &b| {
551 for &(col, desc) in &key_cols {
552 let ord = col.compare_rows(a, b);
553 let ord = if desc { ord.reverse() } else { ord };
554 if ord != std::cmp::Ordering::Equal {
555 return ord;
556 }
557 }
558 std::cmp::Ordering::Equal
559 });
560
561 Ok(TidyView {
562 base: Rc::clone(&self.base),
563 mask: self.mask.clone(),
564 proj: self.proj.clone(),
565 ordering: Some(Rc::new(row_indices)),
566 })
567 }
568
569 pub fn slice_head(&self, n: usize) -> TidyView {
573 let rows: Vec<usize> = self.visible_rows_ordered().into_iter().take(n).collect();
574 self.view_from_row_indices(rows)
575 }
576
577 pub fn slice_tail(&self, n: usize) -> TidyView {
579 let all = self.visible_rows_ordered();
580 let start = all.len().saturating_sub(n);
581 let rows = all[start..].to_vec();
582 self.view_from_row_indices(rows)
583 }
584
585 pub fn slice_sample(&self, n: usize, seed: u64) -> TidyView {
587 let resolved = self.resolve_ordering();
588 let this = resolved.as_ref().unwrap_or(self);
589 let mut visible: Vec<usize> = this.mask.iter_set().collect();
590 let total = visible.len();
591 if n >= total {
592 return this.view_from_row_indices(visible);
593 }
594 let mut rng = seed;
596 for i in 0..n {
597 rng = rng
598 .wrapping_mul(6364136223846793005)
599 .wrapping_add(1442695040888963407);
600 let j = i + (rng as usize % (total - i));
601 visible.swap(i, j);
602 }
603 visible.truncate(n);
604 visible.sort_unstable();
605 this.view_from_row_indices(visible)
606 }
607
608 pub fn distinct(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
612 let resolved = self.resolve_ordering();
613 let this = resolved.as_ref().unwrap_or(self);
614
615 let mut col_indices = Vec::with_capacity(cols.len());
616 for &name in cols {
617 let idx = this
618 .base
619 .columns
620 .iter()
621 .position(|(n, _)| n == name)
622 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
623 col_indices.push(idx);
624 }
625
626 let mut seen_keys: BTreeSet<Vec<String>> = BTreeSet::new();
627 let mut selected_rows: Vec<usize> = Vec::new();
628
629 for row in this.mask.iter_set() {
630 let key: Vec<String> = col_indices
631 .iter()
632 .map(|&ci| this.base.columns[ci].1.get_display(row))
633 .collect();
634 if seen_keys.insert(key) {
635 selected_rows.push(row);
636 }
637 }
638 Ok(this.view_from_row_indices(selected_rows))
639 }
640
641 pub fn inner_join(
645 &self,
646 right: &TidyView,
647 on: &[(&str, &str)],
648 ) -> Result<DataFrame, TidyError> {
649 let l = self.resolve_ordering();
650 let lref = l.as_ref().unwrap_or(self);
651 let r = right.resolve_ordering();
652 let rref = r.as_ref().unwrap_or(right);
653 let (left_rows, right_rows) = join_match_rows(lref, rref, on)?;
654 build_join_frame(lref, rref, &left_rows, &right_rows, on)
655 }
656
657 pub fn left_join(
659 &self,
660 right: &TidyView,
661 on: &[(&str, &str)],
662 ) -> Result<DataFrame, TidyError> {
663 let l = self.resolve_ordering();
664 let lref = l.as_ref().unwrap_or(self);
665 let r = right.resolve_ordering();
666 let rref = r.as_ref().unwrap_or(right);
667 let (left_rows, right_rows_opt) = join_match_rows_optional(lref, rref, on)?;
668 build_left_join_frame(lref, rref, &left_rows, &right_rows_opt, on)
669 }
670
671 pub fn materialize(&self) -> Result<DataFrame, TidyError> {
675 let resolved = self.resolve_ordering();
676 let this = resolved.as_ref().unwrap_or(self);
677
678 let visible: Vec<usize> = this.mask.iter_set().collect();
679 let proj_indices = this.proj.resolve(this.base.ncols());
680
681 let mut result_cols = Vec::with_capacity(proj_indices.len());
682 for &ci in &proj_indices {
683 let (name, col) = &this.base.columns[ci];
684 result_cols.push((name.clone(), col.gather(&visible)));
685 }
686
687 DataFrame::from_columns(result_cols).map_err(|e| TidyError::Internal(e.to_string()))
688 }
689
690 pub fn column_names(&self) -> Vec<String> {
692 let proj_indices = self.proj.resolve(self.base.ncols());
693 proj_indices
694 .iter()
695 .map(|&i| self.base.columns[i].0.clone())
696 .collect()
697 }
698}
699
700#[derive(Debug, Clone)]
704pub struct GroupedTidyView {
705 pub view: TidyView,
706 pub index: GroupIndex,
707}
708
709impl GroupedTidyView {
710 pub fn summarise(
712 &self,
713 assignments: &[(&str, TidyAgg)],
714 ) -> Result<DataFrame, TidyError> {
715 let mut seen = BTreeSet::new();
716 for &(name, _) in assignments {
717 if !seen.insert(name) {
718 return Err(TidyError::DuplicateColumn(name.to_string()));
719 }
720 }
721
722 let base = &self.view.base;
723 let n_groups = self.index.groups.len();
724 let mut result_columns: Vec<(String, Column)> = Vec::new();
725
726 for (ki, key_name) in self.index.key_names.iter().enumerate() {
728 let base_col = base
729 .get_column(key_name)
730 .ok_or_else(|| TidyError::ColumnNotFound(key_name.clone()))?;
731
732 let col = match base_col {
733 Column::Int(_) => {
734 let vals: Vec<i64> = self.index.groups.iter().map(|g| {
735 match &g.key_values[ki] {
736 GroupKey::Int(v) => *v,
737 _ => 0,
738 }
739 }).collect();
740 Column::Int(vals)
741 }
742 Column::Float(_) => {
743 let vals: Vec<f64> = self.index.groups.iter().map(|g| {
744 match &g.key_values[ki] {
745 GroupKey::Float(fk) => fk.0,
746 GroupKey::Int(v) => *v as f64,
747 _ => 0.0,
748 }
749 }).collect();
750 Column::Float(vals)
751 }
752 Column::Bool(_) => {
753 let vals: Vec<bool> = self.index.groups.iter().map(|g| {
754 match &g.key_values[ki] {
755 GroupKey::Bool(v) => *v,
756 _ => false,
757 }
758 }).collect();
759 Column::Bool(vals)
760 }
761 _ => {
762 let vals: Vec<String> = self.index.groups.iter().map(|g| {
763 g.key_values[ki].to_display()
764 }).collect();
765 Column::Str(vals)
766 }
767 };
768 result_columns.push((key_name.clone(), col));
769 }
770
771 for &(out_name, ref agg) in assignments {
773 let col = self.eval_agg(agg, n_groups, base)?;
774 result_columns.push((out_name.to_string(), col));
775 }
776
777 DataFrame::from_columns(result_columns).map_err(|e| TidyError::Internal(e.to_string()))
778 }
779
780 fn eval_agg(
781 &self,
782 agg: &TidyAgg,
783 _n_groups: usize,
784 base: &DataFrame,
785 ) -> Result<Column, TidyError> {
786 match agg {
787 TidyAgg::Count => {
788 let counts: Vec<i64> = self
789 .index
790 .groups
791 .iter()
792 .map(|g| g.row_indices.len() as i64)
793 .collect();
794 Ok(Column::Int(counts))
795 }
796 TidyAgg::Sum(col_name) => {
797 let col = base
798 .get_column(col_name)
799 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
800 let sums: Vec<f64> = self.index.groups.iter().map(|g| {
801 let mut acc = KahanAccumulator::new();
802 for &i in &g.row_indices {
803 if let Some(v) = col.get_f64(i) {
804 acc.add(v);
805 }
806 }
807 acc.finalize()
808 }).collect();
809 Ok(Column::Float(sums))
810 }
811 TidyAgg::Mean(col_name) => {
812 let col = base
813 .get_column(col_name)
814 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
815 let means: Vec<f64> = self.index.groups.iter().map(|g| {
816 if g.row_indices.is_empty() {
817 return f64::NAN;
818 }
819 let mut acc = KahanAccumulator::new();
820 for &i in &g.row_indices {
821 if let Some(v) = col.get_f64(i) {
822 acc.add(v);
823 }
824 }
825 acc.finalize() / g.row_indices.len() as f64
826 }).collect();
827 Ok(Column::Float(means))
828 }
829 TidyAgg::Min(col_name) => {
830 let col = base
831 .get_column(col_name)
832 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
833 let mins: Vec<f64> = self.index.groups.iter().map(|g| {
834 let mut min = f64::INFINITY;
835 for &i in &g.row_indices {
836 if let Some(v) = col.get_f64(i) {
837 if v < min { min = v; }
838 }
839 }
840 min
841 }).collect();
842 Ok(Column::Float(mins))
843 }
844 TidyAgg::Max(col_name) => {
845 let col = base
846 .get_column(col_name)
847 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
848 let maxs: Vec<f64> = self.index.groups.iter().map(|g| {
849 let mut max = f64::NEG_INFINITY;
850 for &i in &g.row_indices {
851 if let Some(v) = col.get_f64(i) {
852 if v > max { max = v; }
853 }
854 }
855 max
856 }).collect();
857 Ok(Column::Float(maxs))
858 }
859 TidyAgg::Sd(col_name) => {
860 let col = base
861 .get_column(col_name)
862 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
863 let sds: Vec<f64> = self.index.groups.iter().map(|g| {
864 let n = g.row_indices.len();
865 if n < 2 { return f64::NAN; }
866 let mut acc = KahanAccumulator::new();
867 for &i in &g.row_indices {
868 if let Some(v) = col.get_f64(i) { acc.add(v); }
869 }
870 let mean = acc.finalize() / n as f64;
871 let mut var_acc = KahanAccumulator::new();
872 for &i in &g.row_indices {
873 if let Some(v) = col.get_f64(i) {
874 let diff = v - mean;
875 var_acc.add(diff * diff);
876 }
877 }
878 (var_acc.finalize() / (n - 1) as f64).sqrt()
879 }).collect();
880 Ok(Column::Float(sds))
881 }
882 TidyAgg::Var(col_name) => {
883 let col = base
884 .get_column(col_name)
885 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
886 let vars: Vec<f64> = self.index.groups.iter().map(|g| {
887 let n = g.row_indices.len();
888 if n < 2 { return f64::NAN; }
889 let mut acc = KahanAccumulator::new();
890 for &i in &g.row_indices {
891 if let Some(v) = col.get_f64(i) { acc.add(v); }
892 }
893 let mean = acc.finalize() / n as f64;
894 let mut var_acc = KahanAccumulator::new();
895 for &i in &g.row_indices {
896 if let Some(v) = col.get_f64(i) {
897 let diff = v - mean;
898 var_acc.add(diff * diff);
899 }
900 }
901 var_acc.finalize() / (n - 1) as f64
902 }).collect();
903 Ok(Column::Float(vars))
904 }
905 TidyAgg::First(col_name) => {
906 let col = base
907 .get_column(col_name)
908 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
909 let vals: Result<Vec<String>, _> = self.index.groups.iter().map(|g| {
910 g.row_indices.first()
911 .map(|&i| col.get_display(i))
912 .ok_or(TidyError::EmptyGroup)
913 }).collect();
914 Ok(Column::Str(vals?))
915 }
916 TidyAgg::Last(col_name) => {
917 let col = base
918 .get_column(col_name)
919 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
920 let vals: Result<Vec<String>, _> = self.index.groups.iter().map(|g| {
921 g.row_indices.last()
922 .map(|&i| col.get_display(i))
923 .ok_or(TidyError::EmptyGroup)
924 }).collect();
925 Ok(Column::Str(vals?))
926 }
927 TidyAgg::NDistinct(col_name) => {
928 let col = base
929 .get_column(col_name)
930 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
931 let counts: Vec<i64> = self.index.groups.iter().map(|g| {
932 let mut uniq = BTreeSet::new();
933 for &i in &g.row_indices {
934 uniq.insert(col.get_display(i));
935 }
936 uniq.len() as i64
937 }).collect();
938 Ok(Column::Int(counts))
939 }
940 }
941 }
942
943 pub fn group_index(&self) -> &GroupIndex {
945 &self.index
946 }
947}
948
949fn resolve_join_keys(
952 left: &TidyView,
953 right: &TidyView,
954 on: &[(&str, &str)],
955) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
956 let mut left_indices = Vec::with_capacity(on.len());
957 let mut right_indices = Vec::with_capacity(on.len());
958 for &(lk, rk) in on {
959 let li = left
960 .base
961 .column_index(lk)
962 .ok_or_else(|| TidyError::ColumnNotFound(lk.to_string()))?;
963 let ri = right
964 .base
965 .column_index(rk)
966 .ok_or_else(|| TidyError::ColumnNotFound(rk.to_string()))?;
967 left_indices.push(li);
968 right_indices.push(ri);
969 }
970 Ok((left_indices, right_indices))
971}
972
973fn join_match_rows(
974 left: &TidyView,
975 right: &TidyView,
976 on: &[(&str, &str)],
977) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
978 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
979 let mut out_left = Vec::new();
980 let mut out_right = Vec::new();
981
982 if left_key_cols.len() == 1 {
984 let r_col = &right.base.columns[right_key_cols[0]].1;
985 let l_col = &left.base.columns[left_key_cols[0]].1;
986 let mut lookup: BTreeMap<ColumnKeyRef<'_>, Vec<usize>> = BTreeMap::new();
987 for r in right.mask.iter_set() {
988 let key = ColumnKeyRef::from_column(r_col, r);
989 lookup.entry(key).or_default().push(r);
990 }
991 for l_row in left.mask.iter_set() {
992 let key = ColumnKeyRef::from_column(l_col, l_row);
993 if let Some(matches) = lookup.get(&key) {
994 for &r_row in matches {
995 out_left.push(l_row);
996 out_right.push(r_row);
997 }
998 }
999 }
1000 return Ok((out_left, out_right));
1001 }
1002
1003 let r_cols: Vec<&Column> = right_key_cols
1005 .iter()
1006 .map(|&ci| &right.base.columns[ci].1)
1007 .collect();
1008 let l_cols: Vec<&Column> = left_key_cols
1009 .iter()
1010 .map(|&ci| &left.base.columns[ci].1)
1011 .collect();
1012 let mut lookup: BTreeMap<Vec<ColumnKeyRef<'_>>, Vec<usize>> = BTreeMap::new();
1013 for r in right.mask.iter_set() {
1014 let key: Vec<ColumnKeyRef<'_>> = r_cols.iter().map(|col| ColumnKeyRef::from_column(col, r)).collect();
1015 lookup.entry(key).or_default().push(r);
1016 }
1017 for l_row in left.mask.iter_set() {
1018 let key: Vec<ColumnKeyRef<'_>> = l_cols.iter().map(|col| ColumnKeyRef::from_column(col, l_row)).collect();
1019 if let Some(matches) = lookup.get(&key) {
1020 for &r_row in matches {
1021 out_left.push(l_row);
1022 out_right.push(r_row);
1023 }
1024 }
1025 }
1026 Ok((out_left, out_right))
1027}
1028
1029fn join_match_rows_optional(
1030 left: &TidyView,
1031 right: &TidyView,
1032 on: &[(&str, &str)],
1033) -> Result<(Vec<usize>, Vec<Option<usize>>), TidyError> {
1034 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
1035 let mut out_left = Vec::new();
1036 let mut out_right: Vec<Option<usize>> = Vec::new();
1037
1038 if left_key_cols.len() == 1 {
1039 let r_col = &right.base.columns[right_key_cols[0]].1;
1040 let l_col = &left.base.columns[left_key_cols[0]].1;
1041 let mut lookup: BTreeMap<ColumnKeyRef<'_>, Vec<usize>> = BTreeMap::new();
1042 for r in right.mask.iter_set() {
1043 let key = ColumnKeyRef::from_column(r_col, r);
1044 lookup.entry(key).or_default().push(r);
1045 }
1046 for l_row in left.mask.iter_set() {
1047 let key = ColumnKeyRef::from_column(l_col, l_row);
1048 match lookup.get(&key) {
1049 Some(matches) if !matches.is_empty() => {
1050 for &r_row in matches {
1051 out_left.push(l_row);
1052 out_right.push(Some(r_row));
1053 }
1054 }
1055 _ => {
1056 out_left.push(l_row);
1057 out_right.push(None);
1058 }
1059 }
1060 }
1061 return Ok((out_left, out_right));
1062 }
1063
1064 let r_cols: Vec<&Column> = right_key_cols.iter().map(|&ci| &right.base.columns[ci].1).collect();
1065 let l_cols: Vec<&Column> = left_key_cols.iter().map(|&ci| &left.base.columns[ci].1).collect();
1066 let mut lookup: BTreeMap<Vec<ColumnKeyRef<'_>>, Vec<usize>> = BTreeMap::new();
1067 for r in right.mask.iter_set() {
1068 let key: Vec<ColumnKeyRef<'_>> = r_cols.iter().map(|col| ColumnKeyRef::from_column(col, r)).collect();
1069 lookup.entry(key).or_default().push(r);
1070 }
1071 for l_row in left.mask.iter_set() {
1072 let key: Vec<ColumnKeyRef<'_>> = l_cols.iter().map(|col| ColumnKeyRef::from_column(col, l_row)).collect();
1073 match lookup.get(&key) {
1074 Some(matches) if !matches.is_empty() => {
1075 for &r_row in matches {
1076 out_left.push(l_row);
1077 out_right.push(Some(r_row));
1078 }
1079 }
1080 _ => {
1081 out_left.push(l_row);
1082 out_right.push(None);
1083 }
1084 }
1085 }
1086 Ok((out_left, out_right))
1087}
1088
1089fn build_join_frame(
1090 left: &TidyView,
1091 right: &TidyView,
1092 left_rows: &[usize],
1093 right_rows: &[usize],
1094 on: &[(&str, &str)],
1095) -> Result<DataFrame, TidyError> {
1096 let right_key_names: BTreeSet<&str> = on.iter().map(|&(_, rk)| rk).collect();
1097 let mut result_cols: Vec<(String, Column)> = Vec::new();
1098
1099 for (name, col) in &left.base.columns {
1101 result_cols.push((name.clone(), col.gather(left_rows)));
1102 }
1103
1104 for (name, col) in &right.base.columns {
1106 if !right_key_names.contains(name.as_str()) {
1107 result_cols.push((name.clone(), col.gather(right_rows)));
1108 }
1109 }
1110
1111 DataFrame::from_columns(result_cols).map_err(|e| TidyError::Internal(e.to_string()))
1112}
1113
1114fn build_left_join_frame(
1115 left: &TidyView,
1116 right: &TidyView,
1117 left_rows: &[usize],
1118 right_rows_opt: &[Option<usize>],
1119 on: &[(&str, &str)],
1120) -> Result<DataFrame, TidyError> {
1121 let right_key_names: BTreeSet<&str> = on.iter().map(|&(_, rk)| rk).collect();
1122 let mut result_cols: Vec<(String, Column)> = Vec::new();
1123
1124 for (name, col) in &left.base.columns {
1126 result_cols.push((name.clone(), col.gather(left_rows)));
1127 }
1128
1129 for (name, col) in &right.base.columns {
1131 if right_key_names.contains(name.as_str()) {
1132 continue;
1133 }
1134 let gathered = match col {
1135 Column::Int(v) => Column::Int(
1136 right_rows_opt.iter().map(|opt| opt.map(|i| v[i]).unwrap_or(0)).collect(),
1137 ),
1138 Column::Float(v) => Column::Float(
1139 right_rows_opt.iter().map(|opt| opt.map(|i| v[i]).unwrap_or(0.0)).collect(),
1140 ),
1141 Column::Str(v) => Column::Str(
1142 right_rows_opt
1143 .iter()
1144 .map(|opt| opt.map(|i| v[i].clone()).unwrap_or_default())
1145 .collect(),
1146 ),
1147 Column::Bool(v) => Column::Bool(
1148 right_rows_opt.iter().map(|opt| opt.map(|i| v[i]).unwrap_or(false)).collect(),
1149 ),
1150 };
1151 result_cols.push((name.clone(), gathered));
1152 }
1153
1154 DataFrame::from_columns(result_cols).map_err(|e| TidyError::Internal(e.to_string()))
1155}
1156
1157fn validate_expr_columns(expr: &DExpr, valid_names: &[String]) -> Result<(), TidyError> {
1160 match expr {
1161 DExpr::Col(name) => {
1162 if !valid_names.iter().any(|n| n == name) {
1163 return Err(TidyError::ColumnNotFound(name.clone()));
1164 }
1165 Ok(())
1166 }
1167 DExpr::BinOp { left, right, .. } => {
1168 validate_expr_columns(left, valid_names)?;
1169 validate_expr_columns(right, valid_names)
1170 }
1171 DExpr::Not(inner) => validate_expr_columns(inner, valid_names),
1172 DExpr::And(a, b) | DExpr::Or(a, b) => {
1173 validate_expr_columns(a, valid_names)?;
1174 validate_expr_columns(b, valid_names)
1175 }
1176 _ => Ok(()),
1177 }
1178}
1179
1180fn eval_expr_column(
1181 df: &DataFrame,
1182 dexpr: &DExpr,
1183 nrows: usize,
1184) -> Result<Column, TidyError> {
1185 let mut floats = Vec::with_capacity(nrows);
1187 let mut ints = Vec::with_capacity(nrows);
1188 let mut strings = Vec::with_capacity(nrows);
1189 let mut bools = Vec::with_capacity(nrows);
1190 let mut first_type: Option<&str> = None;
1191
1192 for row in 0..nrows {
1193 let val = expr::eval_expr_row(df, dexpr, row)
1194 .map_err(|e| TidyError::Internal(e))?;
1195 match &val {
1196 ExprValue::Float(v) => {
1197 if first_type.is_none() { first_type = Some("Float"); }
1198 floats.push(*v);
1199 }
1200 ExprValue::Int(v) => {
1201 if first_type.is_none() { first_type = Some("Int"); }
1202 ints.push(*v);
1203 }
1204 ExprValue::Str(v) => {
1205 if first_type.is_none() { first_type = Some("Str"); }
1206 strings.push(v.clone());
1207 }
1208 ExprValue::Bool(v) => {
1209 if first_type.is_none() { first_type = Some("Bool"); }
1210 bools.push(*v);
1211 }
1212 }
1213 }
1214
1215 match first_type {
1216 Some("Float") => Ok(Column::Float(floats)),
1217 Some("Int") => Ok(Column::Int(ints)),
1218 Some("Str") => Ok(Column::Str(strings)),
1219 Some("Bool") => Ok(Column::Bool(bools)),
1220 _ => Ok(Column::Float(Vec::new())),
1221 }
1222}
1223
1224#[cfg(test)]
1227mod tests {
1228 use super::*;
1229 use crate::expr::{binop, col, BinOp};
1230
1231 fn make_test_df() -> DataFrame {
1232 DataFrame::from_columns(vec![
1233 ("id".into(), Column::Int(vec![1, 2, 3, 4, 5])),
1234 (
1235 "region".into(),
1236 Column::Str(vec![
1237 "West".into(), "East".into(), "West".into(),
1238 "East".into(), "West".into(),
1239 ]),
1240 ),
1241 ("value".into(), Column::Float(vec![10.0, 20.0, 30.0, 40.0, 50.0])),
1242 ])
1243 .unwrap()
1244 }
1245
1246 #[test]
1247 fn test_filter_no_copy() {
1248 let df = make_test_df();
1249 let view = TidyView::new(df);
1250 let filtered = view
1251 .filter(&binop(BinOp::Gt, col("value"), DExpr::LitFloat(25.0)))
1252 .unwrap();
1253 assert_eq!(filtered.nrows(), 3); assert_eq!(view.nrows(), 5); }
1256
1257 #[test]
1258 fn test_chained_filter() {
1259 let df = make_test_df();
1260 let view = TidyView::new(df);
1261 let filtered = view
1262 .filter(&binop(BinOp::Gt, col("value"), DExpr::LitFloat(15.0)))
1263 .unwrap()
1264 .filter(&binop(BinOp::Lt, col("value"), DExpr::LitFloat(45.0)))
1265 .unwrap();
1266 assert_eq!(filtered.nrows(), 3); }
1268
1269 #[test]
1270 fn test_select() {
1271 let df = make_test_df();
1272 let view = TidyView::new(df);
1273 let selected = view.select(&["id", "value"]).unwrap();
1274 assert_eq!(selected.ncols(), 2);
1275 assert_eq!(view.ncols(), 3); }
1277
1278 #[test]
1279 fn test_group_by_summarise() {
1280 let df = make_test_df();
1281 let view = TidyView::new(df);
1282 let grouped = view.group_by(&["region"]).unwrap();
1283 let summary = grouped
1284 .summarise(&[
1285 ("n", TidyAgg::Count),
1286 ("total", TidyAgg::Sum("value".into())),
1287 ("avg", TidyAgg::Mean("value".into())),
1288 ])
1289 .unwrap();
1290 assert_eq!(summary.nrows(), 2);
1292 assert_eq!(summary.ncols(), 4); }
1294
1295 #[test]
1296 fn test_group_order_first_occurrence() {
1297 let df = make_test_df();
1298 let view = TidyView::new(df);
1299 let grouped = view.group_by(&["region"]).unwrap();
1300 assert_eq!(grouped.index.groups[0].key_values[0].to_display(), "West");
1302 assert_eq!(grouped.index.groups[1].key_values[0].to_display(), "East");
1303 }
1304
1305 #[test]
1306 fn test_arrange() {
1307 let df = make_test_df();
1308 let view = TidyView::new(df);
1309 let sorted = view.arrange(&[ArrangeKey::desc("value")]).unwrap();
1310 let mat = sorted.materialize().unwrap();
1311 if let Column::Float(vals) = &mat.columns[2].1 {
1312 assert_eq!(vals, &[50.0, 40.0, 30.0, 20.0, 10.0]);
1313 }
1314 }
1315
1316 #[test]
1317 fn test_inner_join() {
1318 let left = DataFrame::from_columns(vec![
1319 ("id".into(), Column::Int(vec![1, 2, 3])),
1320 ("name".into(), Column::Str(vec!["a".into(), "b".into(), "c".into()])),
1321 ]).unwrap();
1322 let right = DataFrame::from_columns(vec![
1323 ("id".into(), Column::Int(vec![1, 3, 4])),
1324 ("dept".into(), Column::Str(vec!["eng".into(), "sales".into(), "hr".into()])),
1325 ]).unwrap();
1326 let lv = TidyView::new(left);
1327 let rv = TidyView::new(right);
1328 let joined = lv.inner_join(&rv, &[("id", "id")]).unwrap();
1329 assert_eq!(joined.nrows(), 2); }
1331
1332 #[test]
1333 fn test_deterministic_sample() {
1334 let df = make_test_df();
1335 let view = TidyView::new(df);
1336 let s1 = view.slice_sample(3, 42);
1337 let s2 = view.slice_sample(3, 42);
1338 let r1: Vec<usize> = s1.mask.iter_set().collect();
1339 let r2: Vec<usize> = s2.mask.iter_set().collect();
1340 assert_eq!(r1, r2); }
1342
1343 #[test]
1344 fn test_distinct() {
1345 let df = DataFrame::from_columns(vec![
1346 ("x".into(), Column::Str(vec!["a".into(), "b".into(), "a".into(), "c".into()])),
1347 ]).unwrap();
1348 let view = TidyView::new(df);
1349 let unique = view.distinct(&["x"]).unwrap();
1350 assert_eq!(unique.nrows(), 3); }
1352
1353 #[test]
1354 fn test_kahan_summation_in_summarise() {
1355 let n = 10_000;
1357 let values: Vec<f64> = (0..n).map(|_| 0.1).collect();
1358 let grps: Vec<String> = (0..n).map(|_| "a".to_string()).collect();
1359 let df = DataFrame::from_columns(vec![
1360 ("grp".into(), Column::Str(grps)),
1361 ("val".into(), Column::Float(values)),
1362 ]).unwrap();
1363 let view = TidyView::new(df);
1364 let grouped = view.group_by(&["grp"]).unwrap();
1365 let summary = grouped.summarise(&[("total", TidyAgg::Sum("val".into()))]).unwrap();
1366 if let Column::Float(v) = &summary.columns[1].1 {
1367 assert!(
1368 (v[0] - 1000.0).abs() < 1e-6,
1369 "Kahan sum {} should be close to 1000.0", v[0]
1370 );
1371 }
1372 }
1373
1374 #[test]
1375 fn test_snapshot_semantics() {
1376 let df = DataFrame::from_columns(vec![
1377 ("x".into(), Column::Int(vec![1, 2, 3])),
1378 ]).unwrap();
1379 let view = TidyView::new(df);
1380 let result = view.mutate(&[
1382 ("a", binop(BinOp::Add, col("x"), DExpr::LitInt(1))),
1383 ("b", binop(BinOp::Mul, col("a"), DExpr::LitInt(2))),
1384 ]);
1385 assert!(result.is_err()); }
1387}