1use cjc_repro::kahan_sum_f64;
10use std::cell::RefCell;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::fmt;
13use std::rc::Rc;
14
15mod csv;
16pub use csv::{CsvConfig, CsvReader, StreamingCsvProcessor};
17
18pub mod adaptive_selection;
19pub mod agg_kernels;
20pub mod byte_dict;
21pub mod column_meta;
22pub mod dataset_plan;
23pub mod detcoll;
24pub mod dict_encoding;
25pub mod lazy;
26pub mod predicate_bytecode;
27pub mod tidy_dispatch;
28
29pub use adaptive_selection::{AdaptiveSelection, SelectionIndices};
30pub use dataset_plan::{
31 BatchIterator, BatchSpec, DatasetError, DatasetPlan, EncodingSpec, MaterializedBatch,
32 Split, SplitSpec,
33};
34
35#[derive(Debug, Clone)]
39pub enum Column {
40 Int(Vec<i64>),
42 Float(Vec<f64>),
44 Str(Vec<String>),
46 Bool(Vec<bool>),
48 Categorical {
50 levels: Vec<String>,
51 codes: Vec<u32>,
52 },
53 CategoricalAdaptive(Box<crate::byte_dict::CategoricalColumn>),
64 DateTime(Vec<i64>),
66}
67
68impl Column {
69 pub fn len(&self) -> usize {
71 match self {
72 Column::Int(v) => v.len(),
73 Column::Float(v) => v.len(),
74 Column::Str(v) => v.len(),
75 Column::Bool(v) => v.len(),
76 Column::Categorical { codes, .. } => codes.len(),
77 Column::CategoricalAdaptive(cc) => cc.len(),
78 Column::DateTime(v) => v.len(),
79 }
80 }
81
82 pub fn is_empty(&self) -> bool {
84 self.len() == 0
85 }
86
87 pub fn type_name(&self) -> &'static str {
89 match self {
90 Column::Int(_) => "Int",
91 Column::Float(_) => "Float",
92 Column::Str(_) => "Str",
93 Column::Bool(_) => "Bool",
94 Column::Categorical { .. } => "Categorical",
95 Column::CategoricalAdaptive(_) => "CategoricalAdaptive",
96 Column::DateTime(_) => "DateTime",
97 }
98 }
99
100 pub fn get_display(&self, idx: usize) -> String {
102 match self {
103 Column::Int(v) => format!("{}", v[idx]),
104 Column::Float(v) => format!("{}", v[idx]),
105 Column::Str(v) => v[idx].clone(),
106 Column::Bool(v) => format!("{}", v[idx]),
107 Column::Categorical { levels, codes } => levels[codes[idx] as usize].clone(),
108 Column::CategoricalAdaptive(cc) => match cc.get(idx) {
109 None => String::new(),
110 Some(bytes) => String::from_utf8_lossy(bytes).into_owned(),
111 },
112 Column::DateTime(v) => format!("{}ms", v[idx]),
113 }
114 }
115
116 pub fn categorical_adaptive(cc: crate::byte_dict::CategoricalColumn) -> Self {
119 Column::CategoricalAdaptive(Box::new(cc))
120 }
121
122 pub fn to_legacy_categorical(&self) -> Column {
133 match self {
134 Column::CategoricalAdaptive(cc) => {
135 if let Some(legacy) = Column::from_categorical_column(cc) {
137 return legacy;
138 }
139 let n = cc.len();
142 let mut out: Vec<String> = Vec::with_capacity(n);
143 for i in 0..n {
144 out.push(match cc.get(i) {
145 None => String::new(),
146 Some(b) => String::from_utf8_lossy(b).into_owned(),
147 });
148 }
149 Column::Str(out)
150 }
151 _ => self.clone(),
152 }
153 }
154
155 pub fn to_categorical_column(&self) -> Option<crate::byte_dict::CategoricalColumn> {
175 use crate::byte_dict::{ByteDictionary, CategoricalColumn};
176 match self {
177 Column::Categorical { levels, codes } => {
178 let explicit: Vec<Vec<u8>> =
179 levels.iter().map(|s| s.as_bytes().to_vec()).collect();
180 let dict = ByteDictionary::from_explicit(explicit).ok()?;
184 let mut col = CategoricalColumn::with_dictionary(dict);
185 for &c in codes {
190 let bytes = levels[c as usize].as_bytes();
191 col.push(bytes).ok()?;
194 }
195 Some(col)
196 }
197 _ => None,
198 }
199 }
200
201 pub fn from_categorical_column(
211 cat: &crate::byte_dict::CategoricalColumn,
212 ) -> Option<Self> {
213 if cat.nulls().is_some() {
214 return None;
215 }
216 let dict = cat.dictionary();
217 let mut levels: Vec<String> = Vec::with_capacity(dict.len());
218 for (_, bytes) in dict.iter() {
219 match std::str::from_utf8(bytes) {
220 Ok(s) => levels.push(s.to_string()),
221 Err(_) => return None,
222 }
223 }
224 let mut codes: Vec<u32> = Vec::with_capacity(cat.len());
225 for c in cat.codes().iter() {
226 if c > u32::MAX as u64 {
227 return None;
228 }
229 codes.push(c as u32);
230 }
231 Some(Column::Categorical { levels, codes })
232 }
233}
234
235#[derive(Debug, Clone)]
239pub struct DataFrame {
240 pub columns: Vec<(String, Column)>,
241}
242
243impl DataFrame {
244 pub fn new() -> Self {
246 Self {
247 columns: Vec::new(),
248 }
249 }
250
251 pub fn from_columns(columns: Vec<(String, Column)>) -> Result<Self, DataError> {
255 if columns.is_empty() {
256 return Ok(Self { columns });
257 }
258 let len = columns[0].1.len();
259 for (name, col) in &columns {
260 if col.len() != len {
261 return Err(DataError::ColumnLengthMismatch {
262 expected: len,
263 got: col.len(),
264 column: name.clone(),
265 });
266 }
267 }
268 Ok(Self { columns })
269 }
270
271 pub fn nrows(&self) -> usize {
273 self.columns.first().map(|(_, c)| c.len()).unwrap_or(0)
274 }
275
276 pub fn ncols(&self) -> usize {
278 self.columns.len()
279 }
280
281 pub fn column_names(&self) -> Vec<&str> {
283 self.columns.iter().map(|(n, _)| n.as_str()).collect()
284 }
285
286 pub fn get_column(&self, name: &str) -> Option<&Column> {
288 self.columns
289 .iter()
290 .find(|(n, _)| n == name)
291 .map(|(_, c)| c)
292 }
293
294 pub fn to_tensor_data(&self, col_names: &[&str]) -> Result<(Vec<f64>, Vec<usize>), DataError> {
296 let nrows = self.nrows();
297 let ncols = col_names.len();
298 let mut data = Vec::with_capacity(nrows * ncols);
299
300 for row in 0..nrows {
301 for &col_name in col_names {
302 let col = self
303 .get_column(col_name)
304 .ok_or_else(|| DataError::ColumnNotFound(col_name.to_string()))?;
305 let val = match col {
306 Column::Float(v) => v[row],
307 Column::Int(v) => v[row] as f64,
308 _ => {
309 return Err(DataError::InvalidOperation(format!(
310 "column `{}` is not numeric",
311 col_name
312 )))
313 }
314 };
315 data.push(val);
316 }
317 }
318
319 Ok((data, vec![nrows, ncols]))
320 }
321}
322
323impl Default for DataFrame {
324 fn default() -> Self {
325 Self::new()
326 }
327}
328
329impl fmt::Display for DataFrame {
330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331 if self.columns.is_empty() {
332 return write!(f, "(empty DataFrame)");
333 }
334
335 let names: Vec<&str> = self.columns.iter().map(|(n, _)| n.as_str()).collect();
337 let mut col_widths: Vec<usize> = names.iter().map(|n| n.len()).collect();
338
339 let nrows = self.nrows();
341 for (col_idx, (_, col)) in self.columns.iter().enumerate() {
342 for row in 0..nrows {
343 let s = col.get_display(row);
344 col_widths[col_idx] = col_widths[col_idx].max(s.len());
345 }
346 }
347
348 for (i, name) in names.iter().enumerate() {
350 if i > 0 {
351 write!(f, " | ")?;
352 }
353 write!(f, "{:>width$}", name, width = col_widths[i])?;
354 }
355 writeln!(f)?;
356
357 for (i, &w) in col_widths.iter().enumerate() {
359 if i > 0 {
360 write!(f, "-+-")?;
361 }
362 write!(f, "{}", "-".repeat(w))?;
363 }
364 writeln!(f)?;
365
366 for row in 0..nrows {
368 for (col_idx, (_, col)) in self.columns.iter().enumerate() {
369 if col_idx > 0 {
370 write!(f, " | ")?;
371 }
372 let s = col.get_display(row);
373 write!(f, "{:>width$}", s, width = col_widths[col_idx])?;
374 }
375 writeln!(f)?;
376 }
377
378 Ok(())
379 }
380}
381
382#[derive(Debug, Clone)]
386pub enum DExpr {
387 Col(String),
389 LitInt(i64),
391 LitFloat(f64),
393 LitBool(bool),
395 LitStr(String),
397 BinOp {
399 op: DBinOp,
400 left: Box<DExpr>,
401 right: Box<DExpr>,
402 },
403 Agg(AggFunc, Box<DExpr>),
405 Count,
407 FnCall(String, Vec<DExpr>),
409 CumSum(Box<DExpr>),
411 CumProd(Box<DExpr>),
413 CumMax(Box<DExpr>),
415 CumMin(Box<DExpr>),
417 Lag(Box<DExpr>, usize),
419 Lead(Box<DExpr>, usize),
421 Rank(Box<DExpr>),
423 DenseRank(Box<DExpr>),
425 RowNumber,
427 RollingSum(String, usize),
429 RollingMean(String, usize),
431 RollingMin(String, usize),
433 RollingMax(String, usize),
435 RollingVar(String, usize),
437 RollingSd(String, usize),
439}
440
441#[derive(Debug, Clone, Copy, PartialEq, Eq)]
443pub enum DBinOp {
444 Add,
446 Sub,
448 Mul,
450 Div,
452 Gt,
454 Lt,
456 Ge,
458 Le,
460 Eq,
462 Ne,
464 And,
466 Or,
468}
469
470#[derive(Debug, Clone, Copy, PartialEq, Eq)]
472pub enum AggFunc {
473 Sum,
475 Mean,
477 Min,
479 Max,
481 Count,
483}
484
485impl fmt::Display for DExpr {
486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487 match self {
488 DExpr::Col(name) => write!(f, "col(\"{}\")", name),
489 DExpr::LitInt(v) => write!(f, "{}", v),
490 DExpr::LitFloat(v) => write!(f, "{}", v),
491 DExpr::LitBool(b) => write!(f, "{}", b),
492 DExpr::LitStr(s) => write!(f, "\"{}\"", s),
493 DExpr::BinOp { op, left, right } => {
494 let op_str = match op {
495 DBinOp::Add => "+",
496 DBinOp::Sub => "-",
497 DBinOp::Mul => "*",
498 DBinOp::Div => "/",
499 DBinOp::Gt => ">",
500 DBinOp::Lt => "<",
501 DBinOp::Ge => ">=",
502 DBinOp::Le => "<=",
503 DBinOp::Eq => "==",
504 DBinOp::Ne => "!=",
505 DBinOp::And => "&&",
506 DBinOp::Or => "||",
507 };
508 write!(f, "({} {} {})", left, op_str, right)
509 }
510 DExpr::Agg(func, expr) => {
511 let name = match func {
512 AggFunc::Sum => "sum",
513 AggFunc::Mean => "mean",
514 AggFunc::Min => "min",
515 AggFunc::Max => "max",
516 AggFunc::Count => "count",
517 };
518 write!(f, "{}({})", name, expr)
519 }
520 DExpr::Count => write!(f, "count()"),
521 DExpr::FnCall(name, args) => {
522 let args_str: Vec<String> = args.iter().map(|a| format!("{}", a)).collect();
523 write!(f, "{}({})", name, args_str.join(", "))
524 }
525 DExpr::CumSum(e) => write!(f, "cumsum({})", e),
526 DExpr::CumProd(e) => write!(f, "cumprod({})", e),
527 DExpr::CumMax(e) => write!(f, "cummax({})", e),
528 DExpr::CumMin(e) => write!(f, "cummin({})", e),
529 DExpr::Lag(e, k) => write!(f, "lag({}, {})", e, k),
530 DExpr::Lead(e, k) => write!(f, "lead({}, {})", e, k),
531 DExpr::Rank(e) => write!(f, "rank({})", e),
532 DExpr::DenseRank(e) => write!(f, "dense_rank({})", e),
533 DExpr::RowNumber => write!(f, "row_number()"),
534 DExpr::RollingSum(col, w) => write!(f, "rolling_sum(\"{}\", {})", col, w),
535 DExpr::RollingMean(col, w) => write!(f, "rolling_mean(\"{}\", {})", col, w),
536 DExpr::RollingMin(col, w) => write!(f, "rolling_min(\"{}\", {})", col, w),
537 DExpr::RollingMax(col, w) => write!(f, "rolling_max(\"{}\", {})", col, w),
538 DExpr::RollingVar(col, w) => write!(f, "rolling_var(\"{}\", {})", col, w),
539 DExpr::RollingSd(col, w) => write!(f, "rolling_sd(\"{}\", {})", col, w),
540 }
541 }
542}
543
544#[derive(Debug, Clone)]
548pub enum LogicalPlan {
549 Scan {
551 source: DataFrame,
552 },
553 Filter {
555 input: Box<LogicalPlan>,
556 predicate: DExpr,
557 },
558 GroupBy {
560 input: Box<LogicalPlan>,
561 keys: Vec<String>,
562 },
563 Aggregate {
565 input: Box<LogicalPlan>,
566 keys: Vec<String>,
567 aggs: Vec<(String, DExpr)>,
568 },
569 Project {
571 input: Box<LogicalPlan>,
572 columns: Vec<String>,
573 },
574 InnerJoin {
576 left: Box<LogicalPlan>,
577 right: Box<LogicalPlan>,
578 left_on: String,
579 right_on: String,
580 },
581 LeftJoin {
583 left: Box<LogicalPlan>,
584 right: Box<LogicalPlan>,
585 left_on: String,
586 right_on: String,
587 },
588 CrossJoin {
590 left: Box<LogicalPlan>,
591 right: Box<LogicalPlan>,
592 },
593}
594
595impl LogicalPlan {
596 pub fn referenced_columns(&self) -> Vec<String> {
598 let mut cols = Vec::new();
599 self.collect_columns(&mut cols);
600 cols.sort();
601 cols.dedup();
602 cols
603 }
604
605 fn collect_columns(&self, cols: &mut Vec<String>) {
606 match self {
607 LogicalPlan::Scan { .. } => {}
608 LogicalPlan::Filter { input, predicate } => {
609 input.collect_columns(cols);
610 collect_expr_columns(predicate, cols);
611 }
612 LogicalPlan::GroupBy { input, keys } => {
613 input.collect_columns(cols);
614 cols.extend(keys.clone());
615 }
616 LogicalPlan::Aggregate {
617 input, keys, aggs, ..
618 } => {
619 input.collect_columns(cols);
620 cols.extend(keys.clone());
621 for (_, expr) in aggs {
622 collect_expr_columns(expr, cols);
623 }
624 }
625 LogicalPlan::Project { input, columns } => {
626 input.collect_columns(cols);
627 cols.extend(columns.clone());
628 }
629 LogicalPlan::InnerJoin {
630 left,
631 right,
632 left_on,
633 right_on,
634 }
635 | LogicalPlan::LeftJoin {
636 left,
637 right,
638 left_on,
639 right_on,
640 } => {
641 left.collect_columns(cols);
642 right.collect_columns(cols);
643 cols.push(left_on.clone());
644 cols.push(right_on.clone());
645 }
646 LogicalPlan::CrossJoin { left, right } => {
647 left.collect_columns(cols);
648 right.collect_columns(cols);
649 }
650 }
651 }
652}
653
654fn collect_expr_columns(expr: &DExpr, cols: &mut Vec<String>) {
655 match expr {
656 DExpr::Col(name) => cols.push(name.clone()),
657 DExpr::BinOp { left, right, .. } => {
658 collect_expr_columns(left, cols);
659 collect_expr_columns(right, cols);
660 }
661 DExpr::Agg(_, inner) => collect_expr_columns(inner, cols),
662 DExpr::FnCall(_, args) => {
663 for arg in args {
664 collect_expr_columns(arg, cols);
665 }
666 }
667 DExpr::CumSum(e) | DExpr::CumProd(e) | DExpr::CumMax(e) | DExpr::CumMin(e)
668 | DExpr::Lag(e, _) | DExpr::Lead(e, _) | DExpr::Rank(e) | DExpr::DenseRank(e) => {
669 collect_expr_columns(e, cols);
670 }
671 DExpr::RollingSum(col, _) | DExpr::RollingMean(col, _)
672 | DExpr::RollingMin(col, _) | DExpr::RollingMax(col, _)
673 | DExpr::RollingVar(col, _) | DExpr::RollingSd(col, _) => {
674 cols.push(col.clone());
675 }
676 _ => {}
677 }
678}
679
680pub fn optimize(plan: LogicalPlan) -> LogicalPlan {
684 let plan = push_down_predicates(plan);
685 let plan = prune_columns(plan);
686 plan
687}
688
689fn push_down_predicates(plan: LogicalPlan) -> LogicalPlan {
691 match plan {
692 LogicalPlan::Filter {
693 input,
694 predicate,
695 } => {
696 let optimized_input = push_down_predicates(*input);
697 match optimized_input {
698 LogicalPlan::GroupBy {
700 input: inner,
701 keys,
702 } => {
703 let pred_cols = {
704 let mut c = Vec::new();
705 collect_expr_columns(&predicate, &mut c);
706 c
707 };
708 let can_push = pred_cols.iter().all(|c| !keys.contains(c))
709 || pred_cols.iter().all(|c| {
710 !keys.contains(c) || keys.contains(c)
712 });
713 if can_push && pred_cols.iter().all(|c| !keys.contains(c)) {
715 LogicalPlan::GroupBy {
716 input: Box::new(LogicalPlan::Filter {
717 input: inner,
718 predicate,
719 }),
720 keys,
721 }
722 } else {
723 LogicalPlan::Filter {
724 input: Box::new(LogicalPlan::GroupBy {
725 input: inner,
726 keys,
727 }),
728 predicate,
729 }
730 }
731 }
732 other => LogicalPlan::Filter {
733 input: Box::new(other),
734 predicate,
735 },
736 }
737 }
738 LogicalPlan::GroupBy { input, keys } => LogicalPlan::GroupBy {
739 input: Box::new(push_down_predicates(*input)),
740 keys,
741 },
742 LogicalPlan::Aggregate {
743 input,
744 keys,
745 aggs,
746 } => LogicalPlan::Aggregate {
747 input: Box::new(push_down_predicates(*input)),
748 keys,
749 aggs,
750 },
751 LogicalPlan::Project { input, columns } => LogicalPlan::Project {
752 input: Box::new(push_down_predicates(*input)),
753 columns,
754 },
755 LogicalPlan::InnerJoin {
756 left,
757 right,
758 left_on,
759 right_on,
760 } => LogicalPlan::InnerJoin {
761 left: Box::new(push_down_predicates(*left)),
762 right: Box::new(push_down_predicates(*right)),
763 left_on,
764 right_on,
765 },
766 LogicalPlan::LeftJoin {
767 left,
768 right,
769 left_on,
770 right_on,
771 } => LogicalPlan::LeftJoin {
772 left: Box::new(push_down_predicates(*left)),
773 right: Box::new(push_down_predicates(*right)),
774 left_on,
775 right_on,
776 },
777 LogicalPlan::CrossJoin { left, right } => LogicalPlan::CrossJoin {
778 left: Box::new(push_down_predicates(*left)),
779 right: Box::new(push_down_predicates(*right)),
780 },
781 other => other,
782 }
783}
784
785fn prune_columns(plan: LogicalPlan) -> LogicalPlan {
787 plan
790}
791
792pub fn execute(plan: &LogicalPlan) -> Result<DataFrame, DataError> {
796 match plan {
797 LogicalPlan::Scan { source } => Ok(source.clone()),
798
799 LogicalPlan::Filter { input, predicate } => {
800 let df = execute(input)?;
801 execute_filter(&df, predicate)
802 }
803
804 LogicalPlan::GroupBy { input, keys: _ } => {
805 let df = execute(input)?;
807 Ok(df)
809 }
810
811 LogicalPlan::Aggregate { input, keys, aggs } => {
812 let df = execute(input)?;
813 execute_aggregate(&df, keys, aggs)
814 }
815
816 LogicalPlan::Project { input, columns } => {
817 let df = execute(input)?;
818 let projected = df
819 .columns
820 .into_iter()
821 .filter(|(name, _)| columns.contains(name))
822 .collect();
823 Ok(DataFrame { columns: projected })
824 }
825
826 LogicalPlan::InnerJoin {
827 left,
828 right,
829 left_on,
830 right_on,
831 } => {
832 let left_df = execute(left)?;
833 let right_df = execute(right)?;
834 execute_inner_join(&left_df, &right_df, left_on, right_on)
835 }
836
837 LogicalPlan::LeftJoin {
838 left,
839 right,
840 left_on,
841 right_on,
842 } => {
843 let left_df = execute(left)?;
844 let right_df = execute(right)?;
845 execute_left_join(&left_df, &right_df, left_on, right_on)
846 }
847
848 LogicalPlan::CrossJoin { left, right } => {
849 let left_df = execute(left)?;
850 let right_df = execute(right)?;
851 execute_cross_join(&left_df, &right_df)
852 }
853 }
854}
855
856fn execute_filter(df: &DataFrame, predicate: &DExpr) -> Result<DataFrame, DataError> {
857 let nrows = df.nrows();
858 let mut mask = vec![false; nrows];
859
860 for row in 0..nrows {
861 let val = eval_expr_row(df, predicate, row)?;
862 mask[row] = match val {
863 ExprValue::Bool(b) => b,
864 _ => return Err(DataError::InvalidOperation("filter predicate must be boolean".into())),
865 };
866 }
867
868 let mut new_columns = Vec::new();
869 for (name, col) in &df.columns {
870 let filtered = filter_column(col, &mask);
871 new_columns.push((name.clone(), filtered));
872 }
873
874 Ok(DataFrame {
875 columns: new_columns,
876 })
877}
878
879fn filter_column(col: &Column, mask: &[bool]) -> Column {
880 if matches!(col, Column::CategoricalAdaptive(_)) {
881 return filter_column(&col.to_legacy_categorical(), mask);
882 }
883 match col {
884 Column::Int(v) => Column::Int(
885 v.iter()
886 .zip(mask)
887 .filter(|(_, &m)| m)
888 .map(|(v, _)| *v)
889 .collect(),
890 ),
891 Column::Float(v) => Column::Float(
892 v.iter()
893 .zip(mask)
894 .filter(|(_, &m)| m)
895 .map(|(v, _)| *v)
896 .collect(),
897 ),
898 Column::Str(v) => Column::Str(
899 v.iter()
900 .zip(mask)
901 .filter(|(_, &m)| m)
902 .map(|(v, _)| v.clone())
903 .collect(),
904 ),
905 Column::Bool(v) => Column::Bool(
906 v.iter()
907 .zip(mask)
908 .filter(|(_, &m)| m)
909 .map(|(v, _)| *v)
910 .collect(),
911 ),
912 Column::Categorical { levels, codes } => Column::Categorical {
913 levels: levels.clone(),
914 codes: codes
915 .iter()
916 .zip(mask)
917 .filter(|(_, &m)| m)
918 .map(|(v, _)| *v)
919 .collect(),
920 },
921 Column::DateTime(v) => Column::DateTime(
922 v.iter()
923 .zip(mask)
924 .filter(|(_, &m)| m)
925 .map(|(v, _)| *v)
926 .collect(),
927 ),
928 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
929 }
930}
931
932fn execute_aggregate(
933 df: &DataFrame,
934 keys: &[String],
935 aggs: &[(String, DExpr)],
936) -> Result<DataFrame, DataError> {
937 let nrows = df.nrows();
939 let mut groups: BTreeMap<Vec<String>, Vec<usize>> = BTreeMap::new();
940
941 for row in 0..nrows {
942 let key: Vec<String> = keys
943 .iter()
944 .map(|k| {
945 df.get_column(k)
946 .map(|col| col.get_display(row))
947 .ok_or_else(|| DataError::ColumnNotFound(k.to_string()))
948 })
949 .collect::<Result<Vec<String>, DataError>>()?;
950 groups.entry(key).or_default().push(row);
951 }
952
953 let mut sorted_groups: Vec<(Vec<String>, Vec<usize>)> = groups.into_iter().collect();
955 sorted_groups.sort_by(|a, b| a.0.cmp(&b.0));
956
957 let mut result_columns: Vec<(String, Column)> = Vec::new();
959
960 for (key_idx, key_name) in keys.iter().enumerate() {
962 let values: Vec<String> = sorted_groups
963 .iter()
964 .map(|(key, _)| key[key_idx].clone())
965 .collect();
966 let source_col = df.get_column(key_name).ok_or_else(|| {
968 DataError::ColumnNotFound(key_name.clone())
969 })?;
970 match source_col {
971 Column::Int(_) => {
972 let int_vals: Vec<i64> = values.iter().map(|s| s.parse().unwrap_or(0)).collect();
973 result_columns.push((key_name.clone(), Column::Int(int_vals)));
974 }
975 Column::Str(_) => {
976 result_columns.push((key_name.clone(), Column::Str(values)));
977 }
978 _ => {
979 result_columns.push((key_name.clone(), Column::Str(values)));
980 }
981 }
982 }
983
984 for (agg_name, agg_expr) in aggs {
986 let mut values = Vec::new();
987 for (_, row_indices) in &sorted_groups {
988 let val = eval_agg_expr(df, agg_expr, row_indices)?;
989 values.push(val);
990 }
991 result_columns.push((agg_name.clone(), Column::Float(values)));
992 }
993
994 Ok(DataFrame {
995 columns: result_columns,
996 })
997}
998
999#[derive(Debug, Clone)]
1002enum ExprValue {
1003 Int(i64),
1004 Float(f64),
1005 Str(String),
1006 Bool(bool),
1007}
1008
1009fn eval_expr_row(df: &DataFrame, expr: &DExpr, row: usize) -> Result<ExprValue, DataError> {
1010 match expr {
1011 DExpr::Col(name) => {
1012 let col = df
1013 .get_column(name)
1014 .ok_or_else(|| DataError::ColumnNotFound(name.clone()))?;
1015 match col {
1016 Column::Int(v) => Ok(ExprValue::Int(v[row])),
1017 Column::Float(v) => Ok(ExprValue::Float(v[row])),
1018 Column::Str(v) => Ok(ExprValue::Str(v[row].clone())),
1019 Column::Bool(v) => Ok(ExprValue::Bool(v[row])),
1020 Column::Categorical { levels, codes } => {
1021 Ok(ExprValue::Str(levels[codes[row] as usize].clone()))
1022 }
1023 Column::CategoricalAdaptive(cc) => Ok(ExprValue::Str(match cc.get(row) {
1024 None => String::new(),
1025 Some(b) => String::from_utf8_lossy(b).into_owned(),
1026 })),
1027 Column::DateTime(v) => Ok(ExprValue::Int(v[row])),
1028 }
1029 }
1030 DExpr::LitInt(v) => Ok(ExprValue::Int(*v)),
1031 DExpr::LitFloat(v) => Ok(ExprValue::Float(*v)),
1032 DExpr::LitBool(b) => Ok(ExprValue::Bool(*b)),
1033 DExpr::LitStr(s) => Ok(ExprValue::Str(s.clone())),
1034 DExpr::BinOp { op, left, right } => {
1035 let lv = eval_expr_row(df, left, row)?;
1036 let rv = eval_expr_row(df, right, row)?;
1037 eval_binop(*op, lv, rv)
1038 }
1039 DExpr::Agg(_, _) | DExpr::Count => Err(DataError::InvalidOperation(
1040 "aggregation not allowed in row context".into(),
1041 )),
1042 DExpr::FnCall(name, args) => {
1043 if args.len() != 1 {
1044 return Err(DataError::InvalidOperation(
1045 format!("FnCall '{}' requires exactly 1 argument, got {}", name, args.len()),
1046 ));
1047 }
1048 let val = eval_expr_row(df, &args[0], row)?;
1049 let x = match val {
1050 ExprValue::Float(f) => f,
1051 ExprValue::Int(i) => i as f64,
1052 _ => return Err(DataError::InvalidOperation(
1053 format!("FnCall '{}' requires numeric argument", name),
1054 )),
1055 };
1056 let result = match name.as_str() {
1057 "log" => x.ln(),
1058 "exp" => x.exp(),
1059 "sqrt" => x.sqrt(),
1060 "abs" => x.abs(),
1061 "ceil" => x.ceil(),
1062 "floor" => x.floor(),
1063 "round" => x.round(),
1064 "sin" => x.sin(),
1065 "cos" => x.cos(),
1066 "tan" => x.tan(),
1067 other => return Err(DataError::InvalidOperation(
1068 format!("unknown DExpr function: {}", other),
1069 )),
1070 };
1071 Ok(ExprValue::Float(result))
1072 }
1073 DExpr::CumSum(_) | DExpr::CumProd(_) | DExpr::CumMax(_) | DExpr::CumMin(_)
1074 | DExpr::Lag(_, _) | DExpr::Lead(_, _) | DExpr::Rank(_) | DExpr::DenseRank(_)
1075 | DExpr::RowNumber
1076 | DExpr::RollingSum(..) | DExpr::RollingMean(..) | DExpr::RollingMin(..)
1077 | DExpr::RollingMax(..) | DExpr::RollingVar(..) | DExpr::RollingSd(..) => {
1078 Err(DataError::InvalidOperation(
1079 "window function not allowed in row context; use eval_expr_column".into(),
1080 ))
1081 }
1082 }
1083}
1084
1085fn eval_binop(op: DBinOp, left: ExprValue, right: ExprValue) -> Result<ExprValue, DataError> {
1086 match (left, right) {
1087 (ExprValue::Int(a), ExprValue::Int(b)) => match op {
1088 DBinOp::Add => Ok(ExprValue::Int(a + b)),
1089 DBinOp::Sub => Ok(ExprValue::Int(a - b)),
1090 DBinOp::Mul => Ok(ExprValue::Int(a * b)),
1091 DBinOp::Div => Ok(ExprValue::Int(a / b)),
1092 DBinOp::Gt => Ok(ExprValue::Bool(a > b)),
1093 DBinOp::Lt => Ok(ExprValue::Bool(a < b)),
1094 DBinOp::Ge => Ok(ExprValue::Bool(a >= b)),
1095 DBinOp::Le => Ok(ExprValue::Bool(a <= b)),
1096 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1097 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1098 _ => Err(DataError::InvalidOperation(format!(
1099 "unsupported op {:?} on Int",
1100 op
1101 ))),
1102 },
1103 (ExprValue::Float(a), ExprValue::Float(b)) => match op {
1104 DBinOp::Add => Ok(ExprValue::Float(a + b)),
1105 DBinOp::Sub => Ok(ExprValue::Float(a - b)),
1106 DBinOp::Mul => Ok(ExprValue::Float(a * b)),
1107 DBinOp::Div => Ok(ExprValue::Float(a / b)),
1108 DBinOp::Gt => Ok(ExprValue::Bool(a > b)),
1109 DBinOp::Lt => Ok(ExprValue::Bool(a < b)),
1110 DBinOp::Ge => Ok(ExprValue::Bool(a >= b)),
1111 DBinOp::Le => Ok(ExprValue::Bool(a <= b)),
1112 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1113 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1114 _ => Err(DataError::InvalidOperation(format!(
1115 "unsupported op {:?} on Float",
1116 op
1117 ))),
1118 },
1119 (ExprValue::Int(a), ExprValue::Float(b)) => {
1121 eval_binop(op, ExprValue::Float(a as f64), ExprValue::Float(b))
1122 }
1123 (ExprValue::Float(a), ExprValue::Int(b)) => {
1124 eval_binop(op, ExprValue::Float(a), ExprValue::Float(b as f64))
1125 }
1126 (ExprValue::Bool(a), ExprValue::Bool(b)) => match op {
1127 DBinOp::And => Ok(ExprValue::Bool(a && b)),
1128 DBinOp::Or => Ok(ExprValue::Bool(a || b)),
1129 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1130 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1131 _ => Err(DataError::InvalidOperation(format!(
1132 "unsupported op {:?} on Bool",
1133 op
1134 ))),
1135 },
1136 (ExprValue::Str(a), ExprValue::Str(b)) => match op {
1137 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1138 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1139 _ => Err(DataError::InvalidOperation(format!(
1140 "unsupported op {:?} on String",
1141 op
1142 ))),
1143 },
1144 _ => Err(DataError::InvalidOperation(
1145 "type mismatch in binary operation".into(),
1146 )),
1147 }
1148}
1149
1150fn eval_agg_expr(
1151 df: &DataFrame,
1152 expr: &DExpr,
1153 rows: &[usize],
1154) -> Result<f64, DataError> {
1155 match expr {
1156 DExpr::Agg(func, inner) => {
1157 let values = extract_float_values(df, inner, rows)?;
1158 match func {
1159 AggFunc::Sum => Ok(kahan_sum_f64(&values)),
1160 AggFunc::Mean => {
1161 if values.is_empty() {
1162 Ok(0.0)
1163 } else {
1164 Ok(kahan_sum_f64(&values) / values.len() as f64)
1165 }
1166 }
1167 AggFunc::Min => Ok(values
1168 .iter()
1169 .cloned()
1170 .fold(f64::INFINITY, f64::min)),
1171 AggFunc::Max => Ok(values
1172 .iter()
1173 .cloned()
1174 .fold(f64::NEG_INFINITY, f64::max)),
1175 AggFunc::Count => Ok(values.len() as f64),
1176 }
1177 }
1178 DExpr::Count => Ok(rows.len() as f64),
1179 _ => Err(DataError::InvalidOperation(
1180 "expected aggregation expression".into(),
1181 )),
1182 }
1183}
1184
1185fn extract_float_values(
1186 df: &DataFrame,
1187 expr: &DExpr,
1188 rows: &[usize],
1189) -> Result<Vec<f64>, DataError> {
1190 match expr {
1191 DExpr::Col(name) => {
1192 let col = df
1193 .get_column(name)
1194 .ok_or_else(|| DataError::ColumnNotFound(name.clone()))?;
1195 let vals: Vec<f64> = match col {
1196 Column::Float(v) => rows.iter().map(|&r| v[r]).collect(),
1197 Column::Int(v) => rows.iter().map(|&r| v[r] as f64).collect(),
1198 _ => {
1199 return Err(DataError::InvalidOperation(format!(
1200 "cannot aggregate non-numeric column `{}`",
1201 name
1202 )))
1203 }
1204 };
1205 Ok(vals)
1206 }
1207 _ => Err(DataError::InvalidOperation(
1208 "expected column reference in aggregation".into(),
1209 )),
1210 }
1211}
1212
1213pub struct Pipeline {
1217 plan: LogicalPlan,
1218}
1219
1220impl Pipeline {
1221 pub fn scan(df: DataFrame) -> Self {
1223 Self {
1224 plan: LogicalPlan::Scan { source: df },
1225 }
1226 }
1227
1228 pub fn filter(self, predicate: DExpr) -> Self {
1230 Self {
1231 plan: LogicalPlan::Filter {
1232 input: Box::new(self.plan),
1233 predicate,
1234 },
1235 }
1236 }
1237
1238 pub fn group_by(self, keys: Vec<String>) -> Self {
1240 Self {
1241 plan: LogicalPlan::GroupBy {
1242 input: Box::new(self.plan),
1243 keys,
1244 },
1245 }
1246 }
1247
1248 pub fn summarize(self, keys: Vec<String>, aggs: Vec<(String, DExpr)>) -> Self {
1250 Self {
1251 plan: LogicalPlan::Aggregate {
1252 input: Box::new(self.plan),
1253 keys,
1254 aggs,
1255 },
1256 }
1257 }
1258
1259 pub fn select(self, columns: Vec<String>) -> Self {
1261 Self {
1262 plan: LogicalPlan::Project {
1263 input: Box::new(self.plan),
1264 columns,
1265 },
1266 }
1267 }
1268
1269 pub fn inner_join(self, right: DataFrame, left_on: &str, right_on: &str) -> Self {
1271 Self {
1272 plan: LogicalPlan::InnerJoin {
1273 left: Box::new(self.plan),
1274 right: Box::new(LogicalPlan::Scan { source: right }),
1275 left_on: left_on.to_string(),
1276 right_on: right_on.to_string(),
1277 },
1278 }
1279 }
1280
1281 pub fn left_join(self, right: DataFrame, left_on: &str, right_on: &str) -> Self {
1283 Self {
1284 plan: LogicalPlan::LeftJoin {
1285 left: Box::new(self.plan),
1286 right: Box::new(LogicalPlan::Scan { source: right }),
1287 left_on: left_on.to_string(),
1288 right_on: right_on.to_string(),
1289 },
1290 }
1291 }
1292
1293 pub fn cross_join(self, right: DataFrame) -> Self {
1295 Self {
1296 plan: LogicalPlan::CrossJoin {
1297 left: Box::new(self.plan),
1298 right: Box::new(LogicalPlan::Scan { source: right }),
1299 },
1300 }
1301 }
1302
1303 pub fn collect(self) -> Result<DataFrame, DataError> {
1305 let optimized = optimize(self.plan);
1306 execute(&optimized)
1307 }
1308
1309 pub fn plan(&self) -> &LogicalPlan {
1311 &self.plan
1312 }
1313}
1314
1315#[derive(Debug, Clone)]
1319pub enum DataError {
1320 ColumnNotFound(String),
1322 ColumnLengthMismatch {
1324 expected: usize,
1326 got: usize,
1328 column: String,
1330 },
1331 InvalidOperation(String),
1333}
1334
1335impl fmt::Display for DataError {
1336 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1337 match self {
1338 DataError::ColumnNotFound(name) => write!(f, "column `{}` not found", name),
1339 DataError::ColumnLengthMismatch {
1340 expected,
1341 got,
1342 column,
1343 } => write!(
1344 f,
1345 "column `{}` has {} rows, expected {}",
1346 column, got, expected
1347 ),
1348 DataError::InvalidOperation(msg) => write!(f, "invalid operation: {}", msg),
1349 }
1350 }
1351}
1352
1353impl std::error::Error for DataError {}
1354
1355fn column_value_str(col: &Column, row: usize) -> String {
1359 match col {
1360 Column::Int(v) => v[row].to_string(),
1361 Column::Float(v) => v[row].to_string(),
1362 Column::Str(v) => v[row].clone(),
1363 Column::Bool(v) => v[row].to_string(),
1364 Column::Categorical { levels, codes } => levels[codes[row] as usize].clone(),
1365 Column::CategoricalAdaptive(cc) => match cc.get(row) {
1366 None => String::new(),
1367 Some(b) => String::from_utf8_lossy(b).into_owned(),
1368 },
1369 Column::DateTime(v) => v[row].to_string(),
1370 }
1371}
1372
1373fn execute_inner_join(
1374 left: &DataFrame,
1375 right: &DataFrame,
1376 left_on: &str,
1377 right_on: &str,
1378) -> Result<DataFrame, DataError> {
1379 let left_col = left.get_column(left_on)
1380 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in left", left_on)))?;
1381 let right_col = right.get_column(right_on)
1382 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in right", right_on)))?;
1383
1384 let right_nrows = right.nrows();
1386 let mut index: std::collections::BTreeMap<String, Vec<usize>> = std::collections::BTreeMap::new();
1387 for i in 0..right_nrows {
1388 let key = column_value_str(right_col, i);
1389 index.entry(key).or_default().push(i);
1390 }
1391
1392 let left_nrows = left.nrows();
1393 let mut left_indices = Vec::new();
1394 let mut right_indices = Vec::new();
1395
1396 for i in 0..left_nrows {
1397 let key = column_value_str(left_col, i);
1398 if let Some(matches) = index.get(&key) {
1399 for &j in matches {
1400 left_indices.push(i);
1401 right_indices.push(j);
1402 }
1403 }
1404 }
1405
1406 build_join_result(left, right, &left_indices, &right_indices, right_on)
1407}
1408
1409fn execute_left_join(
1410 left: &DataFrame,
1411 right: &DataFrame,
1412 left_on: &str,
1413 right_on: &str,
1414) -> Result<DataFrame, DataError> {
1415 let left_col = left.get_column(left_on)
1416 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in left", left_on)))?;
1417 let right_col = right.get_column(right_on)
1418 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in right", right_on)))?;
1419
1420 let right_nrows = right.nrows();
1421 let mut index: std::collections::BTreeMap<String, Vec<usize>> = std::collections::BTreeMap::new();
1422 for i in 0..right_nrows {
1423 let key = column_value_str(right_col, i);
1424 index.entry(key).or_default().push(i);
1425 }
1426
1427 let left_nrows = left.nrows();
1428 let mut left_indices = Vec::new();
1429 let mut right_indices: Vec<Option<usize>> = Vec::new();
1430
1431 for i in 0..left_nrows {
1432 let key = column_value_str(left_col, i);
1433 if let Some(matches) = index.get(&key) {
1434 for &j in matches {
1435 left_indices.push(i);
1436 right_indices.push(Some(j));
1437 }
1438 } else {
1439 left_indices.push(i);
1440 right_indices.push(None);
1441 }
1442 }
1443
1444 build_left_join_result(left, right, &left_indices, &right_indices, right_on)
1445}
1446
1447fn execute_cross_join(left: &DataFrame, right: &DataFrame) -> Result<DataFrame, DataError> {
1448 let left_nrows = left.nrows();
1449 let right_nrows = right.nrows();
1450 let mut left_indices = Vec::with_capacity(left_nrows * right_nrows);
1451 let mut right_indices = Vec::with_capacity(left_nrows * right_nrows);
1452
1453 for i in 0..left_nrows {
1454 for j in 0..right_nrows {
1455 left_indices.push(i);
1456 right_indices.push(j);
1457 }
1458 }
1459
1460 build_join_result(left, right, &left_indices, &right_indices, "")
1461}
1462
1463fn build_join_result(
1464 left: &DataFrame,
1465 right: &DataFrame,
1466 left_indices: &[usize],
1467 right_indices: &[usize],
1468 right_on: &str,
1469) -> Result<DataFrame, DataError> {
1470 let mut columns = Vec::new();
1471
1472 for (name, col) in &left.columns {
1474 columns.push((name.clone(), gather_column(col, left_indices)));
1475 }
1476
1477 for (name, col) in &right.columns {
1479 if name == right_on {
1480 continue;
1481 }
1482 let out_name = if left.get_column(name).is_some() {
1483 format!("{}_right", name)
1484 } else {
1485 name.clone()
1486 };
1487 columns.push((out_name, gather_column(col, right_indices)));
1488 }
1489
1490 Ok(DataFrame { columns })
1491}
1492
1493fn build_left_join_result(
1494 left: &DataFrame,
1495 right: &DataFrame,
1496 left_indices: &[usize],
1497 right_indices: &[Option<usize>],
1498 right_on: &str,
1499) -> Result<DataFrame, DataError> {
1500 let mut columns = Vec::new();
1501
1502 for (name, col) in &left.columns {
1503 columns.push((name.clone(), gather_column(col, left_indices)));
1504 }
1505
1506 for (name, col) in &right.columns {
1507 if name == right_on {
1508 continue;
1509 }
1510 let out_name = if left.get_column(name).is_some() {
1511 format!("{}_right", name)
1512 } else {
1513 name.clone()
1514 };
1515 columns.push((out_name, gather_column_nullable(col, right_indices)));
1516 }
1517
1518 Ok(DataFrame { columns })
1519}
1520
1521fn gather_column(col: &Column, indices: &[usize]) -> Column {
1522 if matches!(col, Column::CategoricalAdaptive(_)) {
1523 return gather_column(&col.to_legacy_categorical(), indices);
1524 }
1525 match col {
1526 Column::Int(v) => Column::Int(indices.iter().map(|&i| v[i]).collect()),
1527 Column::Float(v) => Column::Float(indices.iter().map(|&i| v[i]).collect()),
1528 Column::Str(v) => Column::Str(indices.iter().map(|&i| v[i].clone()).collect()),
1529 Column::Bool(v) => Column::Bool(indices.iter().map(|&i| v[i]).collect()),
1530 Column::Categorical { levels, codes } => Column::Categorical {
1531 levels: levels.clone(),
1532 codes: indices.iter().map(|&i| codes[i]).collect(),
1533 },
1534 Column::DateTime(v) => Column::DateTime(indices.iter().map(|&i| v[i]).collect()),
1535 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
1536 }
1537}
1538
1539fn gather_column_nullable(col: &Column, indices: &[Option<usize>]) -> Column {
1540 if matches!(col, Column::CategoricalAdaptive(_)) {
1541 return gather_column_nullable(&col.to_legacy_categorical(), indices);
1542 }
1543 match col {
1544 Column::Int(v) => Column::Int(indices.iter().map(|opt| opt.map_or(0, |i| v[i])).collect()),
1545 Column::Float(v) => Column::Float(indices.iter().map(|opt| opt.map_or(f64::NAN, |i| v[i])).collect()),
1546 Column::Str(v) => Column::Str(indices.iter().map(|opt| opt.map_or_else(String::new, |i| v[i].clone())).collect()),
1547 Column::Bool(v) => Column::Bool(indices.iter().map(|opt| opt.map_or(false, |i| v[i])).collect()),
1548 Column::Categorical { levels, codes } => Column::Categorical {
1549 levels: levels.clone(),
1550 codes: indices.iter().map(|opt| opt.map_or(0, |i| codes[i])).collect(),
1551 },
1552 Column::DateTime(v) => Column::DateTime(indices.iter().map(|opt| opt.map_or(0, |i| v[i])).collect()),
1553 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
1554 }
1555}
1556
1557#[cfg(test)]
1560mod tests {
1561 use super::*;
1562
1563 fn sample_df() -> DataFrame {
1564 DataFrame::from_columns(vec![
1565 (
1566 "name".into(),
1567 Column::Str(vec![
1568 "Alice".into(),
1569 "Bob".into(),
1570 "Carol".into(),
1571 "Dave".into(),
1572 "Eve".into(),
1573 "Frank".into(),
1574 ]),
1575 ),
1576 (
1577 "dept".into(),
1578 Column::Str(vec![
1579 "eng".into(),
1580 "eng".into(),
1581 "sales".into(),
1582 "eng".into(),
1583 "sales".into(),
1584 "eng".into(),
1585 ]),
1586 ),
1587 (
1588 "salary".into(),
1589 Column::Float(vec![95000.0, 102000.0, 78000.0, 110000.0, 82000.0, 98000.0]),
1590 ),
1591 (
1592 "tenure".into(),
1593 Column::Int(vec![3, 7, 2, 10, 1, 5]),
1594 ),
1595 ])
1596 .unwrap()
1597 }
1598
1599 #[test]
1600 fn test_dataframe_creation() {
1601 let df = sample_df();
1602 assert_eq!(df.nrows(), 6);
1603 assert_eq!(df.ncols(), 4);
1604 assert_eq!(
1605 df.column_names(),
1606 vec!["name", "dept", "salary", "tenure"]
1607 );
1608 }
1609
1610 #[test]
1611 fn test_filter() {
1612 let df = sample_df();
1613
1614 let result = Pipeline::scan(df)
1616 .filter(DExpr::BinOp {
1617 op: DBinOp::Gt,
1618 left: Box::new(DExpr::Col("tenure".into())),
1619 right: Box::new(DExpr::LitInt(2)),
1620 })
1621 .collect()
1622 .unwrap();
1623
1624 assert_eq!(result.nrows(), 4); }
1626
1627 #[test]
1628 fn test_group_by_summarize() {
1629 let df = sample_df();
1630
1631 let result = Pipeline::scan(df)
1632 .summarize(
1633 vec!["dept".into()],
1634 vec![
1635 (
1636 "avg_salary".into(),
1637 DExpr::Agg(AggFunc::Mean, Box::new(DExpr::Col("salary".into()))),
1638 ),
1639 ("headcount".into(), DExpr::Count),
1640 ],
1641 )
1642 .collect()
1643 .unwrap();
1644
1645 assert_eq!(result.nrows(), 2); let dept_col = result.get_column("dept").unwrap();
1649 let avg_col = result.get_column("avg_salary").unwrap();
1650 let count_col = result.get_column("headcount").unwrap();
1651
1652 if let (Column::Str(depts), Column::Float(avgs), Column::Float(counts)) =
1653 (dept_col, avg_col, count_col)
1654 {
1655 let eng_idx = depts.iter().position(|d| d == "eng").unwrap();
1656 let sales_idx = depts.iter().position(|d| d == "sales").unwrap();
1657
1658 assert!((avgs[eng_idx] - 101250.0).abs() < 0.01);
1660 assert!((counts[eng_idx] - 4.0).abs() < 0.01);
1661
1662 assert!((avgs[sales_idx] - 80000.0).abs() < 0.01);
1664 assert!((counts[sales_idx] - 2.0).abs() < 0.01);
1665 } else {
1666 panic!("unexpected column types");
1667 }
1668 }
1669
1670 #[test]
1671 fn test_filter_then_aggregate() {
1672 let df = sample_df();
1673
1674 let result = Pipeline::scan(df)
1676 .filter(DExpr::BinOp {
1677 op: DBinOp::Gt,
1678 left: Box::new(DExpr::Col("tenure".into())),
1679 right: Box::new(DExpr::LitInt(2)),
1680 })
1681 .summarize(
1682 vec!["dept".into()],
1683 vec![
1684 (
1685 "avg_salary".into(),
1686 DExpr::Agg(AggFunc::Mean, Box::new(DExpr::Col("salary".into()))),
1687 ),
1688 (
1689 "max_tenure".into(),
1690 DExpr::Agg(AggFunc::Max, Box::new(DExpr::Col("tenure".into()))),
1691 ),
1692 ("headcount".into(), DExpr::Count),
1693 ],
1694 )
1695 .collect()
1696 .unwrap();
1697
1698 assert_eq!(result.nrows(), 1);
1701
1702 if let Column::Float(avgs) = result.get_column("avg_salary").unwrap() {
1703 assert!((avgs[0] - 101250.0).abs() < 0.01);
1705 }
1706 if let Column::Float(maxes) = result.get_column("max_tenure").unwrap() {
1707 assert!((maxes[0] - 10.0).abs() < 0.01);
1708 }
1709 }
1710
1711 #[test]
1712 fn test_to_tensor_data() {
1713 let df = DataFrame::from_columns(vec![
1714 ("x".into(), Column::Float(vec![1.0, 2.0, 3.0])),
1715 ("y".into(), Column::Float(vec![4.0, 5.0, 6.0])),
1716 ])
1717 .unwrap();
1718
1719 let (data, shape) = df.to_tensor_data(&["x", "y"]).unwrap();
1720 assert_eq!(shape, vec![3, 2]);
1721 assert_eq!(data, vec![1.0, 4.0, 2.0, 5.0, 3.0, 6.0]);
1722 }
1723
1724 #[test]
1725 fn test_display() {
1726 let df = DataFrame::from_columns(vec![
1727 ("x".into(), Column::Int(vec![1, 2, 3])),
1728 ("y".into(), Column::Float(vec![4.5, 5.5, 6.5])),
1729 ])
1730 .unwrap();
1731
1732 let output = format!("{}", df);
1733 assert!(output.contains("x"));
1734 assert!(output.contains("y"));
1735 assert!(output.contains("4.5"));
1736 }
1737
1738 #[test]
1739 fn test_column_not_found() {
1740 let df = sample_df();
1741 let result = Pipeline::scan(df)
1742 .filter(DExpr::BinOp {
1743 op: DBinOp::Gt,
1744 left: Box::new(DExpr::Col("nonexistent".into())),
1745 right: Box::new(DExpr::LitInt(0)),
1746 })
1747 .collect();
1748
1749 assert!(result.is_err());
1750 }
1751
1752 #[test]
1753 fn test_aggregation_functions() {
1754 let df = DataFrame::from_columns(vec![
1755 ("group".into(), Column::Str(vec!["a".into(), "a".into(), "a".into()])),
1756 ("val".into(), Column::Float(vec![10.0, 20.0, 30.0])),
1757 ])
1758 .unwrap();
1759
1760 let result = Pipeline::scan(df)
1761 .summarize(
1762 vec!["group".into()],
1763 vec![
1764 ("total".into(), DExpr::Agg(AggFunc::Sum, Box::new(DExpr::Col("val".into())))),
1765 ("avg".into(), DExpr::Agg(AggFunc::Mean, Box::new(DExpr::Col("val".into())))),
1766 ("lo".into(), DExpr::Agg(AggFunc::Min, Box::new(DExpr::Col("val".into())))),
1767 ("hi".into(), DExpr::Agg(AggFunc::Max, Box::new(DExpr::Col("val".into())))),
1768 ("n".into(), DExpr::Count),
1769 ],
1770 )
1771 .collect()
1772 .unwrap();
1773
1774 if let Column::Float(totals) = result.get_column("total").unwrap() {
1775 assert!((totals[0] - 60.0).abs() < 0.01);
1776 }
1777 if let Column::Float(avgs) = result.get_column("avg").unwrap() {
1778 assert!((avgs[0] - 20.0).abs() < 0.01);
1779 }
1780 if let Column::Float(mins) = result.get_column("lo").unwrap() {
1781 assert!((mins[0] - 10.0).abs() < 0.01);
1782 }
1783 if let Column::Float(maxs) = result.get_column("hi").unwrap() {
1784 assert!((maxs[0] - 30.0).abs() < 0.01);
1785 }
1786 if let Column::Float(counts) = result.get_column("n").unwrap() {
1787 assert!((counts[0] - 3.0).abs() < 0.01);
1788 }
1789 }
1790
1791 #[test]
1792 fn test_empty_dataframe() {
1793 let df = DataFrame::new();
1794 assert_eq!(df.nrows(), 0);
1795 assert_eq!(df.ncols(), 0);
1796 }
1797
1798 #[test]
1799 fn test_expr_display() {
1800 let expr = DExpr::BinOp {
1801 op: DBinOp::Gt,
1802 left: Box::new(DExpr::Col("age".into())),
1803 right: Box::new(DExpr::LitInt(18)),
1804 };
1805 assert_eq!(format!("{}", expr), "(col(\"age\") > 18)");
1806 }
1807
1808 #[test]
1811 fn test_categorical_column_basics() {
1812 let col = Column::Categorical {
1813 levels: vec!["bird".into(), "cat".into(), "dog".into()],
1814 codes: vec![1, 2, 1, 0],
1815 };
1816 assert_eq!(col.len(), 4);
1817 assert_eq!(col.type_name(), "Categorical");
1818 assert_eq!(col.get_display(0), "cat");
1819 assert_eq!(col.get_display(1), "dog");
1820 assert_eq!(col.get_display(2), "cat");
1821 assert_eq!(col.get_display(3), "bird");
1822 }
1823
1824 #[test]
1825 fn test_datetime_column_basics() {
1826 let col = Column::DateTime(vec![1000, 2000, 3000]);
1827 assert_eq!(col.len(), 3);
1828 assert_eq!(col.type_name(), "DateTime");
1829 assert_eq!(col.get_display(0), "1000ms");
1830 assert_eq!(col.get_display(1), "2000ms");
1831 }
1832
1833 #[test]
1834 fn test_label_encode() {
1835 let data: Vec<String> = vec!["cat".into(), "dog".into(), "cat".into(), "bird".into()];
1836 let (levels, codes) = label_encode(&data);
1837 assert_eq!(levels, vec!["bird", "cat", "dog"]);
1838 assert_eq!(codes, vec![1, 2, 1, 0]);
1839 }
1840
1841 #[test]
1842 fn test_label_encode_empty() {
1843 let data: Vec<String> = vec![];
1844 let (levels, codes) = label_encode(&data);
1845 assert!(levels.is_empty());
1846 assert!(codes.is_empty());
1847 }
1848
1849 #[test]
1850 fn test_label_encode_single_level() {
1851 let data: Vec<String> = vec!["x".into(), "x".into(), "x".into()];
1852 let (levels, codes) = label_encode(&data);
1853 assert_eq!(levels, vec!["x"]);
1854 assert_eq!(codes, vec![0, 0, 0]);
1855 }
1856
1857 #[test]
1858 fn test_label_encode_deterministic() {
1859 let data: Vec<String> = vec!["z".into(), "a".into(), "m".into(), "a".into(), "z".into()];
1861 let (levels1, codes1) = label_encode(&data);
1862 let (levels2, codes2) = label_encode(&data);
1863 assert_eq!(levels1, levels2);
1864 assert_eq!(codes1, codes2);
1865 assert_eq!(levels1, vec!["a", "m", "z"]);
1867 }
1868
1869 #[test]
1870 fn test_ordinal_encode() {
1871 let data: Vec<String> = vec!["low".into(), "high".into(), "medium".into(), "low".into()];
1872 let order: Vec<String> = vec!["low".into(), "medium".into(), "high".into()];
1873 let (levels, codes) = ordinal_encode(&data, &order).unwrap();
1874 assert_eq!(levels, vec!["low", "medium", "high"]);
1875 assert_eq!(codes, vec![0, 2, 1, 0]);
1876 }
1877
1878 #[test]
1879 fn test_ordinal_encode_missing_value() {
1880 let data: Vec<String> = vec!["low".into(), "unknown".into()];
1881 let order: Vec<String> = vec!["low".into(), "medium".into(), "high".into()];
1882 let result = ordinal_encode(&data, &order);
1883 assert!(result.is_err());
1884 assert!(result.unwrap_err().contains("unknown"));
1885 }
1886
1887 #[test]
1888 fn test_one_hot_encode() {
1889 let levels = vec!["bird".to_string(), "cat".to_string(), "dog".to_string()];
1890 let codes = vec![1u32, 2, 1, 0];
1891 let (names, cols) = one_hot_encode(&levels, &codes);
1892 assert_eq!(names, vec!["bird", "cat", "dog"]);
1893 assert_eq!(cols.len(), 3);
1894 assert_eq!(cols[0], vec![false, false, false, true]);
1896 assert_eq!(cols[1], vec![true, false, true, false]);
1898 assert_eq!(cols[2], vec![false, true, false, false]);
1900
1901 for row in 0..4 {
1903 let count: usize = cols.iter().map(|c| if c[row] { 1 } else { 0 }).sum();
1904 assert_eq!(count, 1, "row {} should have exactly one true", row);
1905 }
1906 }
1907
1908 #[test]
1909 fn test_one_hot_encode_empty() {
1910 let levels = vec!["a".to_string(), "b".to_string()];
1911 let codes: Vec<u32> = vec![];
1912 let (names, cols) = one_hot_encode(&levels, &codes);
1913 assert_eq!(names.len(), 2);
1914 assert!(cols[0].is_empty());
1915 assert!(cols[1].is_empty());
1916 }
1917
1918 #[test]
1919 fn test_categorical_column_in_dataframe() {
1920 let data: Vec<String> = vec!["cat".into(), "dog".into(), "cat".into()];
1921 let (levels, codes) = label_encode(&data);
1922 let df = DataFrame::from_columns(vec![
1923 ("animal".into(), Column::Categorical { levels, codes }),
1924 ("score".into(), Column::Float(vec![1.0, 2.0, 3.0])),
1925 ])
1926 .unwrap();
1927 assert_eq!(df.nrows(), 3);
1928 assert_eq!(df.ncols(), 2);
1929 assert_eq!(df.get_column("animal").unwrap().type_name(), "Categorical");
1930 }
1931
1932 #[test]
1933 fn test_datetime_column_in_dataframe() {
1934 let df = DataFrame::from_columns(vec![
1935 ("ts".into(), Column::DateTime(vec![1000, 2000, 3000])),
1936 ("val".into(), Column::Float(vec![1.0, 2.0, 3.0])),
1937 ])
1938 .unwrap();
1939 assert_eq!(df.nrows(), 3);
1940 assert_eq!(df.get_column("ts").unwrap().type_name(), "DateTime");
1941 }
1942
1943 #[test]
1944 fn test_label_encode_to_column_roundtrip() {
1945 let data: Vec<String> = vec!["cat".into(), "dog".into(), "cat".into(), "bird".into()];
1946 let (levels, codes) = label_encode(&data);
1947 let col = Column::Categorical { levels: levels.clone(), codes: codes.clone() };
1948 for (i, original) in data.iter().enumerate() {
1950 assert_eq!(col.get_display(i), *original);
1951 }
1952 }
1953}
1954
1955impl DataFrame {
1960 pub fn to_tensor(
1967 &self,
1968 col_names: &[&str],
1969 ) -> Result<cjc_runtime::Tensor, DataError> {
1970 let (data, shape) = self.to_tensor_data(col_names)?;
1971 cjc_runtime::Tensor::from_vec(data, &shape)
1972 .map_err(|e| DataError::InvalidOperation(format!("tensor conversion: {}", e)))
1973 }
1974
1975 pub fn push_row(&mut self, values: &[&str]) -> Result<(), DataError> {
1984 if values.len() != self.ncols() {
1985 return Err(DataError::ColumnLengthMismatch {
1986 expected: self.ncols(),
1987 got: values.len(),
1988 column: "row".to_string(),
1989 });
1990 }
1991 for (i, (_, col)) in self.columns.iter_mut().enumerate() {
1992 let s = values[i];
1993 match col {
1994 Column::Float(v) => v.push(s.trim().parse::<f64>().unwrap_or(0.0)),
1995 Column::Int(v) => v.push(s.trim().parse::<i64>().unwrap_or(0)),
1996 Column::Str(v) => v.push(s.to_string()),
1997 Column::Bool(v) => v.push(matches!(s.trim(), "true" | "1")),
1998 Column::Categorical { .. } => {
1999 }
2001 Column::CategoricalAdaptive(_) => {
2002 }
2004 Column::DateTime(v) => v.push(s.trim().parse::<i64>().unwrap_or(0)),
2005 }
2006 }
2007 Ok(())
2008 }
2009}
2010
2011#[derive(Debug, Clone, PartialEq, Eq)]
2027pub struct BitMask {
2028 words: Vec<u64>,
2029 nrows: usize,
2030}
2031
2032impl BitMask {
2033 pub fn all_true(nrows: usize) -> Self {
2035 let nwords = nwords_for(nrows);
2036 let mut words = vec![u64::MAX; nwords];
2037 if nrows % 64 != 0 && nwords > 0 {
2039 let tail = nrows % 64;
2040 words[nwords - 1] = (1u64 << tail) - 1;
2041 }
2042 BitMask { words, nrows }
2043 }
2044
2045 pub fn all_false(nrows: usize) -> Self {
2047 let nwords = nwords_for(nrows);
2048 BitMask {
2049 words: vec![0u64; nwords],
2050 nrows,
2051 }
2052 }
2053
2054 pub fn from_bools(bools: &[bool]) -> Self {
2056 let nrows = bools.len();
2057 let nwords = nwords_for(nrows);
2058 let mut words = vec![0u64; nwords];
2059 for (i, &b) in bools.iter().enumerate() {
2060 if b {
2061 words[i / 64] |= 1u64 << (i % 64);
2062 }
2063 }
2064 BitMask { words, nrows }
2065 }
2066
2067 #[inline]
2069 pub fn get(&self, i: usize) -> bool {
2070 debug_assert!(i < self.nrows);
2071 (self.words[i / 64] >> (i % 64)) & 1 == 1
2072 }
2073
2074 pub fn count_ones(&self) -> usize {
2076 self.words.iter().map(|w| w.count_ones() as usize).sum()
2077 }
2078
2079 pub fn and(&self, other: &BitMask) -> BitMask {
2083 assert_eq!(
2084 self.nrows, other.nrows,
2085 "BitMask::and: nrows mismatch ({} vs {})",
2086 self.nrows, other.nrows
2087 );
2088 let words = self
2089 .words
2090 .iter()
2091 .zip(other.words.iter())
2092 .map(|(a, b)| a & b)
2093 .collect();
2094 BitMask {
2095 words,
2096 nrows: self.nrows,
2097 }
2098 }
2099
2100 pub fn iter_set(&self) -> impl Iterator<Item = usize> + '_ {
2102 (0..self.nrows).filter(move |&i| self.get(i))
2103 }
2104
2105 pub fn nrows(&self) -> usize {
2107 self.nrows
2108 }
2109
2110 pub fn nwords(&self) -> usize {
2112 self.words.len()
2113 }
2114
2115 pub fn words_slice(&self) -> &[u64] {
2118 &self.words
2119 }
2120
2121 pub fn from_words_for_test(words: Vec<u64>, nrows: usize) -> Self {
2126 debug_assert_eq!(words.len(), nwords_for(nrows));
2127 BitMask { words, nrows }
2128 }
2129}
2130
2131#[inline]
2132pub(crate) fn nwords_for(nrows: usize) -> usize {
2133 (nrows + 63) / 64
2134}
2135
2136#[derive(Debug, Clone, PartialEq, Eq)]
2144pub struct ProjectionMap {
2145 indices: Vec<usize>,
2147}
2148
2149impl ProjectionMap {
2150 pub fn identity(ncols: usize) -> Self {
2152 ProjectionMap {
2153 indices: (0..ncols).collect(),
2154 }
2155 }
2156
2157 pub fn from_indices(indices: Vec<usize>) -> Self {
2159 ProjectionMap { indices }
2160 }
2161
2162 pub fn len(&self) -> usize {
2164 self.indices.len()
2165 }
2166
2167 pub fn is_empty(&self) -> bool {
2169 self.indices.is_empty()
2170 }
2171
2172 pub fn indices(&self) -> &[usize] {
2174 &self.indices
2175 }
2176}
2177
2178#[derive(Debug, Clone)]
2189pub struct TidyView {
2190 base: Rc<DataFrame>,
2191 mask: AdaptiveSelection,
2192 proj: ProjectionMap,
2193}
2194
2195fn try_eval_predicate_columnar(
2209 base: &DataFrame,
2210 predicate: &DExpr,
2211 existing_mask: &BitMask,
2212) -> Option<BitMask> {
2213 match predicate {
2214 DExpr::BinOp {
2216 op: DBinOp::And,
2217 left,
2218 right,
2219 } => {
2220 let lmask = try_eval_predicate_columnar(base, left, existing_mask)?;
2221 let rmask = try_eval_predicate_columnar(base, right, &lmask)?;
2222 Some(rmask)
2223 }
2224 DExpr::BinOp {
2227 op: DBinOp::Or,
2228 left,
2229 right,
2230 } => {
2231 let all_mask = BitMask::all_true(existing_mask.nrows);
2234 let lmask = try_eval_predicate_columnar(base, left, &all_mask)?;
2235 let rmask = try_eval_predicate_columnar(base, right, &all_mask)?;
2236 let nrows = existing_mask.nrows;
2238 let or_words: Vec<u64> = lmask
2239 .words
2240 .iter()
2241 .zip(rmask.words.iter())
2242 .map(|(a, b)| a | b)
2243 .collect();
2244 let final_words: Vec<u64> = or_words
2246 .iter()
2247 .zip(existing_mask.words.iter())
2248 .map(|(a, b)| a & b)
2249 .collect();
2250 Some(BitMask {
2251 words: final_words,
2252 nrows,
2253 })
2254 }
2255 DExpr::BinOp { op, left, right } => {
2257 if !matches!(
2259 op,
2260 DBinOp::Gt | DBinOp::Lt | DBinOp::Ge | DBinOp::Le | DBinOp::Eq | DBinOp::Ne
2261 ) {
2262 return None;
2263 }
2264
2265 enum LitVal {
2268 F(f64),
2269 I(i64),
2270 }
2271
2272 let (col_name, lit, reversed) = match (left.as_ref(), right.as_ref()) {
2273 (DExpr::Col(name), DExpr::LitFloat(v)) => (name.as_str(), LitVal::F(*v), false),
2274 (DExpr::LitFloat(v), DExpr::Col(name)) => (name.as_str(), LitVal::F(*v), true),
2275 (DExpr::Col(name), DExpr::LitInt(v)) => (name.as_str(), LitVal::I(*v), false),
2276 (DExpr::LitInt(v), DExpr::Col(name)) => (name.as_str(), LitVal::I(*v), true),
2277 _ => return None,
2278 };
2279
2280 let column = base.get_column(col_name)?;
2281
2282 let effective_op = if reversed {
2284 match op {
2285 DBinOp::Gt => DBinOp::Lt,
2286 DBinOp::Lt => DBinOp::Gt,
2287 DBinOp::Ge => DBinOp::Le,
2288 DBinOp::Le => DBinOp::Ge,
2289 other => *other, }
2291 } else {
2292 *op
2293 };
2294
2295 let nrows = existing_mask.nrows;
2296 let nwords = nwords_for(nrows);
2297 let mut words = vec![0u64; nwords];
2298
2299 match (column, &lit) {
2300 (Column::Float(data), LitVal::F(v)) => {
2302 columnar_cmp_f64(data, *v, effective_op, &mut words);
2303 }
2304 (Column::Float(data), LitVal::I(v)) => {
2306 columnar_cmp_f64(data, *v as f64, effective_op, &mut words);
2307 }
2308 (Column::Int(data), LitVal::I(v)) => {
2310 columnar_cmp_i64(data, *v, effective_op, &mut words);
2311 }
2312 (Column::Int(data), LitVal::F(v)) => {
2314 let floats: Vec<f64> = data.iter().map(|&x| x as f64).collect();
2316 columnar_cmp_f64(&floats, *v, effective_op, &mut words);
2317 }
2318 _ => return None,
2319 }
2320
2321 for (w, ew) in words.iter_mut().zip(existing_mask.words.iter()) {
2323 *w &= *ew;
2324 }
2325
2326 Some(BitMask { words, nrows })
2327 }
2328 _ => None,
2329 }
2330}
2331
2332#[inline]
2336pub(crate) fn columnar_cmp_f64(data: &[f64], lit: f64, op: DBinOp, out_words: &mut [u64]) {
2337 for (i, &val) in data.iter().enumerate() {
2338 let pass = match op {
2339 DBinOp::Gt => val > lit,
2340 DBinOp::Lt => val < lit,
2341 DBinOp::Ge => val >= lit,
2342 DBinOp::Le => val <= lit,
2343 DBinOp::Eq => val == lit,
2344 DBinOp::Ne => val != lit,
2345 _ => false,
2346 };
2347 if pass {
2348 out_words[i / 64] |= 1u64 << (i % 64);
2349 }
2350 }
2351}
2352
2353#[inline]
2356pub(crate) fn columnar_cmp_i64(data: &[i64], lit: i64, op: DBinOp, out_words: &mut [u64]) {
2357 for (i, &val) in data.iter().enumerate() {
2358 let pass = match op {
2359 DBinOp::Gt => val > lit,
2360 DBinOp::Lt => val < lit,
2361 DBinOp::Ge => val >= lit,
2362 DBinOp::Le => val <= lit,
2363 DBinOp::Eq => val == lit,
2364 DBinOp::Ne => val != lit,
2365 _ => false,
2366 };
2367 if pass {
2368 out_words[i / 64] |= 1u64 << (i % 64);
2369 }
2370 }
2371}
2372
2373impl TidyView {
2374 pub fn from_df(df: DataFrame) -> Self {
2378 let nrows = df.nrows();
2379 let ncols = df.ncols();
2380 TidyView {
2381 base: Rc::new(df),
2382 mask: AdaptiveSelection::all(nrows),
2383 proj: ProjectionMap::identity(ncols),
2384 }
2385 }
2386
2387 pub fn from_rc(df: Rc<DataFrame>) -> Self {
2389 let nrows = df.nrows();
2390 let ncols = df.ncols();
2391 TidyView {
2392 base: df,
2393 mask: AdaptiveSelection::all(nrows),
2394 proj: ProjectionMap::identity(ncols),
2395 }
2396 }
2397
2398 pub fn explain_selection_mode(&self) -> &'static str {
2401 self.mask.explain_selection_mode()
2402 }
2403
2404 pub fn nrows(&self) -> usize {
2408 self.mask.count()
2409 }
2410
2411 pub fn ncols(&self) -> usize {
2413 self.proj.len()
2414 }
2415
2416 pub fn column_names(&self) -> Vec<&str> {
2418 self.proj
2419 .indices()
2420 .iter()
2421 .map(|&ci| self.base.columns[ci].0.as_str())
2422 .collect()
2423 }
2424
2425 pub fn filter(&self, predicate: &DExpr) -> Result<TidyView, TidyError> {
2438 validate_expr_columns_proj(predicate, &self.base, &self.proj)?;
2440
2441 let nrows_base = self.base.nrows();
2442
2443 if let Some(bc) = predicate_bytecode::PredicateBytecode::lower(predicate, &self.base) {
2447 let count = self.mask.count();
2448
2449 if matches!(self.mask, AdaptiveSelection::Hybrid { .. })
2469 && !predicate_bytecode::should_use_sparse_path(count, nrows_base)
2470 {
2471 let fresh = bc.evaluate_to_selection(&self.base, nrows_base);
2472 let intersected = self.mask.intersect(&fresh);
2473 return Ok(TidyView {
2474 base: Rc::clone(&self.base),
2475 mask: intersected,
2476 proj: self.proj.clone(),
2477 });
2478 }
2479
2480 let new_mask = if predicate_bytecode::should_use_sparse_path(count, nrows_base) {
2481 let existing_indices: Vec<usize> = self.mask.iter_indices().collect();
2482 bc.interpret_sparse(&self.base, &existing_indices, nrows_base)
2483 } else {
2484 let current_mask = self.mask.materialize_mask();
2485 bc.interpret(&self.base, ¤t_mask)
2486 };
2487
2488 let words: Vec<u64> = new_mask.words_slice().to_vec();
2489 return Ok(TidyView {
2490 base: Rc::clone(&self.base),
2491 mask: AdaptiveSelection::from_predicate_result(words, nrows_base),
2492 proj: self.proj.clone(),
2493 });
2494 }
2495
2496 let current_mask = self.mask.materialize_mask();
2500
2501 if let Some(new_mask) = try_eval_predicate_columnar(&self.base, predicate, ¤t_mask) {
2506 let words: Vec<u64> = new_mask.words_slice().to_vec();
2507 return Ok(TidyView {
2508 base: Rc::clone(&self.base),
2509 mask: AdaptiveSelection::from_predicate_result(words, nrows_base),
2510 proj: self.proj.clone(),
2511 });
2512 }
2513
2514 let mut new_words: Vec<u64> = current_mask.words_slice().to_vec();
2516
2517 for row in self.mask.iter_indices() {
2520 let b = eval_expr_row_proj(&self.base, predicate, row, &self.proj)?;
2521 let pass = match b {
2522 ExprValue::Bool(v) => v,
2523 _ => {
2524 return Err(TidyError::PredicateNotBool {
2525 got: b.type_name().to_string(),
2526 })
2527 }
2528 };
2529 if !pass {
2530 new_words[row / 64] &= !(1u64 << (row % 64));
2531 }
2532 }
2533
2534 Ok(TidyView {
2535 base: Rc::clone(&self.base),
2536 mask: AdaptiveSelection::from_predicate_result(new_words, nrows_base),
2537 proj: self.proj.clone(),
2538 })
2539 }
2540
2541 pub fn select(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
2554 {
2556 let mut seen = std::collections::BTreeSet::new();
2557 for &name in cols {
2558 if !seen.insert(name) {
2559 return Err(TidyError::DuplicateColumn(name.to_string()));
2560 }
2561 }
2562 }
2563
2564 let mut new_indices = Vec::with_capacity(cols.len());
2566 for &name in cols {
2567 let idx = self
2568 .base
2569 .columns
2570 .iter()
2571 .position(|(n, _)| n == name)
2572 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
2573 new_indices.push(idx);
2574 }
2575
2576 Ok(TidyView {
2577 base: Rc::clone(&self.base),
2578 mask: self.mask.clone(),
2579 proj: ProjectionMap::from_indices(new_indices),
2580 })
2581 }
2582
2583 pub fn mutate(&self, assignments: &[(&str, DExpr)]) -> Result<TidyFrame, TidyError> {
2603 {
2605 let mut seen = std::collections::BTreeSet::new();
2606 for &(name, _) in assignments {
2607 if !seen.insert(name) {
2608 return Err(TidyError::DuplicateColumn(name.to_string()));
2609 }
2610 }
2611 }
2612
2613 let mut df = self.materialize()?;
2615
2616 let snapshot_names: Vec<String> = df.columns.iter().map(|(n, _)| n.clone()).collect();
2618
2619 for &(col_name, ref expr) in assignments {
2620 validate_expr_columns_snapshot(expr, &snapshot_names)?;
2622
2623 let nrows = df.nrows();
2624 let new_col = eval_expr_column(&df, expr, nrows)?;
2626
2627 if let Some(pos) = df.columns.iter().position(|(n, _)| n == col_name) {
2629 df.columns[pos].1 = new_col;
2630 } else {
2631 df.columns.push((col_name.to_string(), new_col));
2632 }
2633 }
2634
2635 Ok(TidyFrame {
2636 inner: Rc::new(RefCell::new(df)),
2637 })
2638 }
2639
2640 pub fn materialize(&self) -> Result<DataFrame, TidyError> {
2652 let row_indices: Vec<usize> = self.mask.iter_indices().collect();
2653
2654 let mut columns = Vec::with_capacity(self.proj.len());
2655 for &ci in self.proj.indices() {
2656 let (name, col) = &self.base.columns[ci];
2657 let new_col = gather_column(col, &row_indices);
2658 columns.push((name.clone(), new_col));
2659 }
2660
2661 DataFrame::from_columns(columns)
2662 .map_err(|e| TidyError::Internal(e.to_string()))
2663 }
2664
2665 pub fn to_tensor(&self, col_names: &[&str]) -> Result<cjc_runtime::Tensor, TidyError> {
2669 let df = self.materialize()?;
2670 df.to_tensor(col_names)
2671 .map_err(|e| TidyError::Internal(e.to_string()))
2672 }
2673
2674 pub fn mask(&self) -> BitMask {
2679 self.mask.materialize_mask()
2680 }
2681
2682 pub fn selection(&self) -> &AdaptiveSelection {
2686 &self.mask
2687 }
2688
2689 pub fn proj(&self) -> &ProjectionMap {
2691 &self.proj
2692 }
2693
2694 pub fn base_column(&self, name: &str) -> Option<&Column> {
2699 self.base.columns.iter()
2700 .find(|(n, _)| n == name)
2701 .map(|(_, c)| c)
2702 }
2703}
2704
2705#[derive(Debug, Clone)]
2713pub struct TidyFrame {
2714 inner: Rc<RefCell<DataFrame>>,
2715}
2716
2717impl TidyFrame {
2718 pub fn from_df(df: DataFrame) -> Self {
2720 TidyFrame {
2721 inner: Rc::new(RefCell::new(df)),
2722 }
2723 }
2724
2725 pub fn borrow(&self) -> std::cell::Ref<'_, DataFrame> {
2727 self.inner.borrow()
2728 }
2729
2730 pub fn view(&self) -> TidyView {
2732 let df = self.inner.borrow().clone();
2733 TidyView::from_df(df)
2734 }
2735
2736 pub fn mutate(&mut self, assignments: &[(&str, DExpr)]) -> Result<(), TidyError> {
2738 if Rc::strong_count(&self.inner) > 1 {
2740 let cloned = self.inner.borrow().clone();
2741 self.inner = Rc::new(RefCell::new(cloned));
2742 }
2743
2744 {
2746 let mut seen = std::collections::BTreeSet::new();
2747 for &(name, _) in assignments {
2748 if !seen.insert(name) {
2749 return Err(TidyError::DuplicateColumn(name.to_string()));
2750 }
2751 }
2752 }
2753
2754 let mut df = self.inner.borrow_mut();
2755
2756 let snapshot_names: Vec<String> = df.columns.iter().map(|(n, _)| n.clone()).collect();
2758
2759 for &(col_name, ref expr) in assignments {
2760 validate_expr_columns_snapshot(expr, &snapshot_names)?;
2761
2762 let nrows = df.nrows();
2763 let new_col = eval_expr_column(&df, expr, nrows)?;
2764
2765 if let Some(pos) = df.columns.iter().position(|(n, _)| n == col_name) {
2766 df.columns[pos].1 = new_col;
2767 } else {
2768 df.columns.push((col_name.to_string(), new_col));
2769 }
2770 }
2771
2772 Ok(())
2773 }
2774}
2775
2776#[derive(Debug, Clone, PartialEq)]
2780pub enum TidyError {
2781 ColumnNotFound(String),
2783 DuplicateColumn(String),
2785 PredicateNotBool { got: String },
2787 TypeMismatch { expected: String, got: String },
2789 LengthMismatch { expected: usize, got: usize },
2791 Internal(String),
2793 EmptyGroup,
2795 CapacityExceeded { limit: usize, got: usize },
2797}
2798
2799impl fmt::Display for TidyError {
2800 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2801 match self {
2802 TidyError::ColumnNotFound(n) => write!(f, "column `{}` not found", n),
2803 TidyError::DuplicateColumn(n) => write!(f, "duplicate column `{}`", n),
2804 TidyError::PredicateNotBool { got } => {
2805 write!(f, "filter predicate must be Bool, got {}", got)
2806 }
2807 TidyError::TypeMismatch { expected, got } => {
2808 write!(f, "type mismatch: expected {}, got {}", expected, got)
2809 }
2810 TidyError::LengthMismatch { expected, got } => {
2811 write!(
2812 f,
2813 "length mismatch: expected {} rows, got {}",
2814 expected, got
2815 )
2816 }
2817 TidyError::Internal(msg) => write!(f, "internal error: {}", msg),
2818 TidyError::EmptyGroup => write!(f, "aggregation on empty group"),
2819 TidyError::CapacityExceeded { limit, got } => {
2820 write!(f, "factor capacity exceeded: limit {} distinct levels, got {}", limit, got)
2821 }
2822 }
2823 }
2824}
2825
2826impl std::error::Error for TidyError {}
2827
2828fn extract_f64_column(df: &DataFrame, expr: &DExpr, nrows: usize) -> Result<Vec<f64>, TidyError> {
2840 let col = eval_expr_column(df, expr, nrows)?;
2841 match col {
2842 Column::Float(v) => Ok(v),
2843 Column::Int(v) => Ok(v.into_iter().map(|i| i as f64).collect()),
2844 _ => Err(TidyError::TypeMismatch {
2845 expected: "numeric".into(),
2846 got: "non-numeric".into(),
2847 }),
2848 }
2849}
2850
2851fn eval_window_column(
2854 df: &DataFrame,
2855 expr: &DExpr,
2856 nrows: usize,
2857) -> Result<Option<Column>, TidyError> {
2858 match expr {
2859 DExpr::RowNumber => {
2860 let vals: Vec<i64> = (1..=nrows as i64).collect();
2861 Ok(Some(Column::Int(vals)))
2862 }
2863 DExpr::CumSum(inner) => {
2864 let src = extract_f64_column(df, inner, nrows)?;
2865 let mut result = Vec::with_capacity(nrows);
2866 let mut sum = 0.0_f64;
2867 let mut comp = 0.0_f64; for &v in &src {
2869 let y = v - comp;
2870 let t = sum + y;
2871 comp = (t - sum) - y;
2872 sum = t;
2873 result.push(sum);
2874 }
2875 Ok(Some(Column::Float(result)))
2876 }
2877 DExpr::CumProd(inner) => {
2878 let src = extract_f64_column(df, inner, nrows)?;
2879 let mut result = Vec::with_capacity(nrows);
2880 let mut prod = 1.0_f64;
2881 for &v in &src {
2882 prod *= v;
2883 result.push(prod);
2884 }
2885 Ok(Some(Column::Float(result)))
2886 }
2887 DExpr::CumMax(inner) => {
2888 let src = extract_f64_column(df, inner, nrows)?;
2889 let mut result = Vec::with_capacity(nrows);
2890 let mut max = f64::NEG_INFINITY;
2891 for &v in &src {
2892 if v > max { max = v; }
2893 result.push(max);
2894 }
2895 Ok(Some(Column::Float(result)))
2896 }
2897 DExpr::CumMin(inner) => {
2898 let src = extract_f64_column(df, inner, nrows)?;
2899 let mut result = Vec::with_capacity(nrows);
2900 let mut min = f64::INFINITY;
2901 for &v in &src {
2902 if v < min { min = v; }
2903 result.push(min);
2904 }
2905 Ok(Some(Column::Float(result)))
2906 }
2907 DExpr::Lag(inner, k) => {
2908 let src = extract_f64_column(df, inner, nrows)?;
2909 let mut result = Vec::with_capacity(nrows);
2910 for i in 0..nrows {
2911 if i < *k {
2912 result.push(f64::NAN);
2913 } else {
2914 result.push(src[i - k]);
2915 }
2916 }
2917 Ok(Some(Column::Float(result)))
2918 }
2919 DExpr::Lead(inner, k) => {
2920 let src = extract_f64_column(df, inner, nrows)?;
2921 let mut result = Vec::with_capacity(nrows);
2922 for i in 0..nrows {
2923 if i + k >= nrows {
2924 result.push(f64::NAN);
2925 } else {
2926 result.push(src[i + k]);
2927 }
2928 }
2929 Ok(Some(Column::Float(result)))
2930 }
2931 DExpr::Rank(inner) => {
2932 let src = extract_f64_column(df, inner, nrows)?;
2933 let mut indexed: Vec<(usize, f64)> = src.iter().cloned().enumerate().collect();
2935 indexed.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2936 let mut ranks = vec![0.0_f64; nrows];
2937 let mut i = 0;
2938 while i < nrows {
2939 let mut j = i;
2940 while j < nrows && indexed[j].1 == indexed[i].1 {
2941 j += 1;
2942 }
2943 let avg_rank = (i + 1 + j) as f64 / 2.0; for idx in i..j {
2945 ranks[indexed[idx].0] = avg_rank;
2946 }
2947 i = j;
2948 }
2949 Ok(Some(Column::Float(ranks)))
2950 }
2951 DExpr::DenseRank(inner) => {
2952 let src = extract_f64_column(df, inner, nrows)?;
2953 let mut indexed: Vec<(usize, f64)> = src.iter().cloned().enumerate().collect();
2954 indexed.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2955 let mut ranks = vec![0_i64; nrows];
2956 let mut rank = 0_i64;
2957 let mut prev: Option<f64> = None;
2958 for &(orig_idx, val) in &indexed {
2959 if prev.is_none() || prev.unwrap() != val {
2960 rank += 1;
2961 }
2962 ranks[orig_idx] = rank;
2963 prev = Some(val);
2964 }
2965 Ok(Some(Column::Int(ranks)))
2966 }
2967 DExpr::RollingSum(col_name, window) => {
2968 let vals = rolling_get_floats(df, col_name)?;
2969 let n = vals.len();
2970 let w = *window;
2971 let mut result = Vec::with_capacity(n);
2972 let mut sum = 0.0_f64;
2973 let mut comp = 0.0_f64;
2974 for i in 0..n {
2975 let y = vals[i] - comp;
2977 let t = sum + y;
2978 comp = (t - sum) - y;
2979 sum = t;
2980 if i >= w {
2982 let y2 = -vals[i - w] - comp;
2983 let t2 = sum + y2;
2984 comp = (t2 - sum) - y2;
2985 sum = t2;
2986 }
2987 result.push(sum);
2988 }
2989 Ok(Some(Column::Float(result)))
2990 }
2991 DExpr::RollingMean(col_name, window) => {
2992 let vals = rolling_get_floats(df, col_name)?;
2993 let n = vals.len();
2994 let w = *window;
2995 let mut result = Vec::with_capacity(n);
2996 let mut sum = 0.0_f64;
2997 let mut comp = 0.0_f64;
2998 for i in 0..n {
2999 let y = vals[i] - comp;
3000 let t = sum + y;
3001 comp = (t - sum) - y;
3002 sum = t;
3003 if i >= w {
3004 let y2 = -vals[i - w] - comp;
3005 let t2 = sum + y2;
3006 comp = (t2 - sum) - y2;
3007 sum = t2;
3008 }
3009 let count = if i < w { i + 1 } else { w };
3010 result.push(sum / count as f64);
3011 }
3012 Ok(Some(Column::Float(result)))
3013 }
3014 DExpr::RollingMin(col_name, window) => {
3015 let vals = rolling_get_floats(df, col_name)?;
3016 let n = vals.len();
3017 let w = *window;
3018 let mut result = Vec::with_capacity(n);
3019 let mut deque: VecDeque<usize> = VecDeque::new();
3020 for i in 0..n {
3021 while !deque.is_empty() && *deque.front().unwrap() + w <= i {
3023 deque.pop_front();
3024 }
3025 while !deque.is_empty() && vals[*deque.back().unwrap()] >= vals[i] {
3027 deque.pop_back();
3028 }
3029 deque.push_back(i);
3030 result.push(vals[*deque.front().unwrap()]);
3031 }
3032 Ok(Some(Column::Float(result)))
3033 }
3034 DExpr::RollingMax(col_name, window) => {
3035 let vals = rolling_get_floats(df, col_name)?;
3036 let n = vals.len();
3037 let w = *window;
3038 let mut result = Vec::with_capacity(n);
3039 let mut deque: VecDeque<usize> = VecDeque::new();
3040 for i in 0..n {
3041 while !deque.is_empty() && *deque.front().unwrap() + w <= i {
3042 deque.pop_front();
3043 }
3044 while !deque.is_empty() && vals[*deque.back().unwrap()] <= vals[i] {
3046 deque.pop_back();
3047 }
3048 deque.push_back(i);
3049 result.push(vals[*deque.front().unwrap()]);
3050 }
3051 Ok(Some(Column::Float(result)))
3052 }
3053 DExpr::RollingVar(col_name, window) => {
3054 let vals = rolling_get_floats(df, col_name)?;
3055 let n = vals.len();
3056 let w = *window;
3057 let mut result = Vec::with_capacity(n);
3058 let mut count = 0_usize;
3060 let mut mean = 0.0_f64;
3061 let mut m2 = 0.0_f64;
3062 for i in 0..n {
3063 count += 1;
3065 let delta = vals[i] - mean;
3066 mean += delta / count as f64;
3067 let delta2 = vals[i] - mean;
3068 m2 += delta * delta2;
3069 if i >= w {
3071 let old = vals[i - w];
3072 count -= 1;
3073 if count == 0 {
3074 mean = 0.0;
3075 m2 = 0.0;
3076 } else {
3077 let delta_old = old - mean;
3078 mean -= delta_old / count as f64;
3079 let delta_old2 = old - mean;
3080 m2 -= delta_old * delta_old2;
3081 }
3082 }
3083 if count < 2 {
3084 result.push(0.0);
3085 } else {
3086 result.push(m2 / (count - 1) as f64);
3089 }
3090 }
3091 Ok(Some(Column::Float(result)))
3092 }
3093 DExpr::RollingSd(col_name, window) => {
3094 let vals = rolling_get_floats(df, col_name)?;
3095 let n = vals.len();
3096 let w = *window;
3097 let mut result = Vec::with_capacity(n);
3098 let mut count = 0_usize;
3099 let mut mean = 0.0_f64;
3100 let mut m2 = 0.0_f64;
3101 for i in 0..n {
3102 count += 1;
3103 let delta = vals[i] - mean;
3104 mean += delta / count as f64;
3105 let delta2 = vals[i] - mean;
3106 m2 += delta * delta2;
3107 if i >= w {
3108 let old = vals[i - w];
3109 count -= 1;
3110 if count == 0 {
3111 mean = 0.0;
3112 m2 = 0.0;
3113 } else {
3114 let delta_old = old - mean;
3115 mean -= delta_old / count as f64;
3116 let delta_old2 = old - mean;
3117 m2 -= delta_old * delta_old2;
3118 }
3119 }
3120 if count < 2 {
3121 result.push(0.0);
3122 } else {
3123 result.push((m2 / (count - 1) as f64).sqrt());
3124 }
3125 }
3126 Ok(Some(Column::Float(result)))
3127 }
3128 _ => Ok(None),
3129 }
3130}
3131
3132fn rolling_get_floats(df: &DataFrame, col_name: &str) -> Result<Vec<f64>, TidyError> {
3134 let col = df
3135 .get_column(col_name)
3136 .ok_or_else(|| TidyError::ColumnNotFound(col_name.to_string()))?;
3137 match col {
3138 Column::Float(v) => Ok(v.clone()),
3139 Column::Int(v) => Ok(v.iter().map(|&i| i as f64).collect()),
3140 _ => Err(TidyError::TypeMismatch {
3141 expected: "numeric".into(),
3142 got: "non-numeric".into(),
3143 }),
3144 }
3145}
3146
3147fn vectorized_binop(op: DBinOp, left: &Column, right: &Column) -> Result<Column, TidyError> {
3152 match (left, right) {
3153 (Column::Int(a), Column::Int(b)) => {
3154 let n = a.len();
3155 match op {
3156 DBinOp::Add => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] + b[i]; } Ok(Column::Int(r)) }
3157 DBinOp::Sub => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] - b[i]; } Ok(Column::Int(r)) }
3158 DBinOp::Mul => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] * b[i]; } Ok(Column::Int(r)) }
3159 DBinOp::Div => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] / b[i]; } Ok(Column::Int(r)) }
3160 DBinOp::Gt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] > b[i]; } Ok(Column::Bool(r)) }
3161 DBinOp::Lt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] < b[i]; } Ok(Column::Bool(r)) }
3162 DBinOp::Ge => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] >= b[i]; } Ok(Column::Bool(r)) }
3163 DBinOp::Le => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] <= b[i]; } Ok(Column::Bool(r)) }
3164 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3165 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3166 _ => Err(TidyError::Internal(format!("unsupported op {:?} on Int", op))),
3167 }
3168 }
3169 (Column::Float(a), Column::Float(b)) => {
3170 let n = a.len();
3171 match op {
3172 DBinOp::Add => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] + b[i]; } Ok(Column::Float(r)) }
3173 DBinOp::Sub => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] - b[i]; } Ok(Column::Float(r)) }
3174 DBinOp::Mul => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] * b[i]; } Ok(Column::Float(r)) }
3175 DBinOp::Div => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] / b[i]; } Ok(Column::Float(r)) }
3176 DBinOp::Gt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] > b[i]; } Ok(Column::Bool(r)) }
3177 DBinOp::Lt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] < b[i]; } Ok(Column::Bool(r)) }
3178 DBinOp::Ge => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] >= b[i]; } Ok(Column::Bool(r)) }
3179 DBinOp::Le => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] <= b[i]; } Ok(Column::Bool(r)) }
3180 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3181 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3182 _ => Err(TidyError::Internal(format!("unsupported op {:?} on Float", op))),
3183 }
3184 }
3185 (Column::Int(a), Column::Float(_b)) => {
3186 let promoted: Vec<f64> = a.iter().map(|&v| v as f64).collect();
3187 vectorized_binop(op, &Column::Float(promoted), right)
3188 }
3189 (Column::Float(_a), Column::Int(b)) => {
3190 let promoted: Vec<f64> = b.iter().map(|&v| v as f64).collect();
3191 vectorized_binop(op, left, &Column::Float(promoted))
3192 }
3193 (Column::Bool(a), Column::Bool(b)) => {
3194 let n = a.len();
3195 match op {
3196 DBinOp::And => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] && b[i]; } Ok(Column::Bool(r)) }
3197 DBinOp::Or => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] || b[i]; } Ok(Column::Bool(r)) }
3198 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3199 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3200 _ => Err(TidyError::Internal(format!("unsupported op {:?} on Bool", op))),
3201 }
3202 }
3203 (Column::Str(a), Column::Str(b)) => {
3204 let n = a.len();
3205 match op {
3206 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3207 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3208 _ => Err(TidyError::Internal(format!("unsupported op {:?} on String", op))),
3209 }
3210 }
3211 _ => Err(TidyError::Internal("type mismatch in binary operation".into())),
3212 }
3213}
3214
3215fn vectorized_fn_call(name: &str, arg: &Column) -> Result<Column, TidyError> {
3218 let floats: Vec<f64> = match arg {
3219 Column::Float(v) => v.clone(),
3220 Column::Int(v) => v.iter().map(|&i| i as f64).collect(),
3221 _ => return Err(TidyError::Internal(format!(
3222 "FnCall '{}' requires numeric argument", name
3223 ))),
3224 };
3225 let f: fn(f64) -> f64 = match name {
3226 "log" => f64::ln,
3227 "exp" => f64::exp,
3228 "sqrt" => f64::sqrt,
3229 "abs" => f64::abs,
3230 "ceil" => f64::ceil,
3231 "floor" => f64::floor,
3232 "round" => f64::round,
3233 "sin" => f64::sin,
3234 "cos" => f64::cos,
3235 "tan" => f64::tan,
3236 _ => return Err(TidyError::Internal(format!(
3237 "unknown DExpr function: {}", name
3238 ))),
3239 };
3240 let mut result = vec![0.0f64; floats.len()];
3241 for i in 0..floats.len() {
3242 result[i] = f(floats[i]);
3243 }
3244 Ok(Column::Float(result))
3245}
3246
3247fn try_eval_expr_column_vectorized(
3251 df: &DataFrame,
3252 expr: &DExpr,
3253 nrows: usize,
3254) -> Option<Result<Column, TidyError>> {
3255 match expr {
3256 DExpr::Col(name) => {
3257 let col = df.get_column(name)?;
3258 let result = match col {
3259 Column::Int(v) => Column::Int(v[..nrows].to_vec()),
3260 Column::Float(v) => Column::Float(v[..nrows].to_vec()),
3261 Column::Str(v) => Column::Str(v[..nrows].to_vec()),
3262 Column::Bool(v) => Column::Bool(v[..nrows].to_vec()),
3263 Column::Categorical { levels, codes } => {
3264 let strs: Vec<String> = codes[..nrows]
3265 .iter()
3266 .map(|&c| levels[c as usize].clone())
3267 .collect();
3268 Column::Str(strs)
3269 }
3270 Column::CategoricalAdaptive(cc) => {
3271 let strs: Vec<String> = (0..nrows)
3272 .map(|i| match cc.get(i) {
3273 None => String::new(),
3274 Some(b) => String::from_utf8_lossy(b).into_owned(),
3275 })
3276 .collect();
3277 Column::Str(strs)
3278 }
3279 Column::DateTime(v) => Column::Int(v[..nrows].to_vec()),
3280 };
3281 Some(Ok(result))
3282 }
3283 DExpr::LitFloat(v) => Some(Ok(Column::Float(vec![*v; nrows]))),
3284 DExpr::LitInt(v) => Some(Ok(Column::Int(vec![*v; nrows]))),
3285 DExpr::LitBool(b) => Some(Ok(Column::Bool(vec![*b; nrows]))),
3286 DExpr::LitStr(s) => Some(Ok(Column::Str(vec![s.clone(); nrows]))),
3287 DExpr::BinOp { op, left, right } => {
3288 let left_col = try_eval_expr_column_vectorized(df, left, nrows)?.ok()?;
3289 let right_col = try_eval_expr_column_vectorized(df, right, nrows)?.ok()?;
3290 Some(vectorized_binop(*op, &left_col, &right_col))
3291 }
3292 DExpr::FnCall(name, args) if args.len() == 1 => {
3293 let arg_col = try_eval_expr_column_vectorized(df, &args[0], nrows)?.ok()?;
3294 Some(vectorized_fn_call(name, &arg_col))
3295 }
3296 _ => None,
3297 }
3298}
3299
3300fn eval_expr_column(df: &DataFrame, expr: &DExpr, nrows: usize) -> Result<Column, TidyError> {
3301 if nrows == 0 {
3302 return Ok(Column::Float(vec![]));
3304 }
3305
3306 if let DExpr::Col(name) = expr {
3314 if let Some(src) = df.get_column(name) {
3315 match src {
3316 Column::Categorical { .. } | Column::CategoricalAdaptive(_) => {
3317 return Ok(src.clone());
3318 }
3319 _ => {}
3320 }
3321 }
3322 }
3323
3324 if let Some(col) = eval_window_column(df, expr, nrows)? {
3326 return Ok(col);
3327 }
3328
3329 if let Some(result) = try_eval_expr_column_vectorized(df, expr, nrows) {
3331 return result;
3332 }
3333
3334 let sample = eval_dexpr_row(df, expr, 0)?;
3336 match sample {
3337 ExprValue::Int(_) => {
3338 let vals: Result<Vec<i64>, TidyError> = (0..nrows)
3339 .map(|r| {
3340 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3341 ExprValue::Int(i) => Ok(i),
3342 ExprValue::Float(f) => Ok(f as i64),
3343 other => Err(TidyError::TypeMismatch {
3344 expected: "Int".into(),
3345 got: other.type_name().into(),
3346 }),
3347 })
3348 })
3349 .collect();
3350 Ok(Column::Int(vals?))
3351 }
3352 ExprValue::Float(_) => {
3353 let vals: Result<Vec<f64>, TidyError> = (0..nrows)
3354 .map(|r| {
3355 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3356 ExprValue::Float(f) => Ok(f),
3357 ExprValue::Int(i) => Ok(i as f64),
3358 other => Err(TidyError::TypeMismatch {
3359 expected: "Float".into(),
3360 got: other.type_name().into(),
3361 }),
3362 })
3363 })
3364 .collect();
3365 Ok(Column::Float(vals?))
3366 }
3367 ExprValue::Bool(_) => {
3368 let vals: Result<Vec<bool>, TidyError> = (0..nrows)
3369 .map(|r| {
3370 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3371 ExprValue::Bool(b) => Ok(b),
3372 other => Err(TidyError::TypeMismatch {
3373 expected: "Bool".into(),
3374 got: other.type_name().into(),
3375 }),
3376 })
3377 })
3378 .collect();
3379 Ok(Column::Bool(vals?))
3380 }
3381 ExprValue::Str(_) => {
3382 let vals: Result<Vec<String>, TidyError> = (0..nrows)
3383 .map(|r| {
3384 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3385 ExprValue::Str(s) => Ok(s),
3386 other => Err(TidyError::TypeMismatch {
3387 expected: "Str".into(),
3388 got: other.type_name().into(),
3389 }),
3390 })
3391 })
3392 .collect();
3393 Ok(Column::Str(vals?))
3394 }
3395 }
3396}
3397
3398fn eval_dexpr_row(df: &DataFrame, expr: &DExpr, row: usize) -> Result<ExprValue, TidyError> {
3400 eval_expr_row(df, expr, row).map_err(|e| TidyError::Internal(e.to_string()))
3401}
3402
3403fn eval_expr_row_proj(
3405 base: &DataFrame,
3406 expr: &DExpr,
3407 row: usize,
3408 _proj: &ProjectionMap,
3409) -> Result<ExprValue, TidyError> {
3410 eval_expr_row(base, expr, row).map_err(|e| TidyError::Internal(e.to_string()))
3414}
3415
3416fn validate_expr_columns_proj(
3423 expr: &DExpr,
3424 base: &DataFrame,
3425 _proj: &ProjectionMap,
3426) -> Result<(), TidyError> {
3427 let mut refs = Vec::new();
3428 collect_expr_columns(expr, &mut refs);
3429 for col_name in refs {
3430 if base.get_column(&col_name).is_none() {
3431 return Err(TidyError::ColumnNotFound(col_name));
3432 }
3433 }
3434 Ok(())
3435}
3436
3437fn validate_expr_columns_snapshot(
3439 expr: &DExpr,
3440 snapshot_names: &[String],
3441) -> Result<(), TidyError> {
3442 let mut refs = Vec::new();
3443 collect_expr_columns(expr, &mut refs);
3444 for col_name in refs {
3445 if !snapshot_names.iter().any(|n| n == &col_name) {
3446 return Err(TidyError::ColumnNotFound(col_name));
3447 }
3448 }
3449 Ok(())
3450}
3451
3452impl ExprValue {
3453 fn type_name(&self) -> &'static str {
3454 match self {
3455 ExprValue::Int(_) => "Int",
3456 ExprValue::Float(_) => "Float",
3457 ExprValue::Str(_) => "Str",
3458 ExprValue::Bool(_) => "Bool",
3459 }
3460 }
3461}
3462
3463impl DataFrame {
3466 pub fn tidy(self) -> TidyView {
3470 TidyView::from_df(self)
3471 }
3472
3473 pub fn tidy_mut(self) -> TidyFrame {
3475 TidyFrame::from_df(self)
3476 }
3477}
3478
3479#[derive(Debug, Clone, PartialEq, Eq)]
3537pub struct RowIndexMap {
3538 pub(crate) indices: Vec<usize>,
3541}
3542
3543impl RowIndexMap {
3544 pub fn new(indices: Vec<usize>) -> Self {
3546 RowIndexMap { indices }
3547 }
3548
3549 pub fn len(&self) -> usize {
3551 self.indices.len()
3552 }
3553
3554 pub fn is_empty(&self) -> bool {
3556 self.indices.is_empty()
3557 }
3558
3559 pub fn as_slice(&self) -> &[usize] {
3561 &self.indices
3562 }
3563}
3564
3565#[derive(Debug, Clone)]
3569pub struct GroupMeta {
3570 pub key_values: Vec<String>,
3572 pub row_indices: Vec<usize>,
3574}
3575
3576#[derive(Debug, Clone)]
3587pub struct GroupIndex {
3588 pub groups: Vec<GroupMeta>,
3590 pub key_names: Vec<String>,
3592}
3593
3594impl GroupIndex {
3595 pub fn build(
3600 base: &DataFrame,
3601 key_col_indices: &[usize],
3602 visible_rows: &[usize],
3603 key_names: Vec<String>,
3604 ) -> Self {
3605 let mut group_order: Vec<Vec<String>> = Vec::new(); let mut group_map: Vec<(Vec<String>, usize)> = Vec::new(); for &row in visible_rows {
3611 let key: Vec<String> = key_col_indices
3612 .iter()
3613 .map(|&ci| base.columns[ci].1.get_display(row))
3614 .collect();
3615
3616 let slot = group_map
3618 .iter()
3619 .position(|(k, _)| k == &key)
3620 .unwrap_or_else(|| {
3621 let s = group_map.len();
3622 group_map.push((key.clone(), s));
3623 group_order.push(key);
3624 s
3625 });
3626
3627 let _ = slot; }
3629
3630 let mut groups: Vec<GroupMeta> = group_order
3632 .iter()
3633 .map(|k| GroupMeta {
3634 key_values: k.clone(),
3635 row_indices: Vec::new(),
3636 })
3637 .collect();
3638
3639 let key_to_slot: Vec<(Vec<String>, usize)> = group_order
3641 .iter()
3642 .enumerate()
3643 .map(|(i, k)| (k.clone(), i))
3644 .collect();
3645
3646 for &row in visible_rows {
3647 let key: Vec<String> = key_col_indices
3648 .iter()
3649 .map(|&ci| base.columns[ci].1.get_display(row))
3650 .collect();
3651 if let Some((_, slot)) = key_to_slot.iter().find(|(k, _)| k == &key) {
3652 groups[*slot].row_indices.push(row);
3653 }
3654 }
3655
3656 GroupIndex { groups, key_names }
3657 }
3658}
3659
3660#[derive(Debug, Clone)]
3670pub struct GroupedTidyView {
3676 view: TidyView,
3677 index: GroupIndex,
3678}
3679
3680impl GroupedTidyView {
3681 pub fn ngroups(&self) -> usize {
3683 self.index.groups.len()
3684 }
3685
3686 pub fn ungroup(self) -> TidyView {
3688 self.view
3689 }
3690
3691 pub fn group_index(&self) -> &GroupIndex {
3693 &self.index
3694 }
3695
3696 pub fn summarise(
3715 &self,
3716 assignments: &[(&str, TidyAgg)],
3717 ) -> Result<TidyFrame, TidyError> {
3718 {
3720 let mut seen = std::collections::BTreeSet::new();
3721 for &(name, _) in assignments {
3722 if !seen.insert(name) {
3723 return Err(TidyError::DuplicateColumn(name.to_string()));
3724 }
3725 }
3726 }
3727
3728 let base = &self.view.base;
3729 let n_groups = self.index.groups.len();
3730
3731 let mut result_columns: Vec<(String, Column)> = Vec::new();
3733
3734 for key_name in &self.index.key_names {
3735 let base_col = base
3736 .get_column(key_name)
3737 .ok_or_else(|| TidyError::ColumnNotFound(key_name.clone()))?;
3738
3739 let col = match base_col {
3740 Column::Int(_) => {
3741 let vals: Vec<i64> = self
3742 .index
3743 .groups
3744 .iter()
3745 .map(|g| {
3746 g.key_values[self
3747 .index
3748 .key_names
3749 .iter()
3750 .position(|k| k == key_name)
3751 .unwrap()]
3752 .parse::<i64>()
3753 .unwrap_or(0)
3754 })
3755 .collect();
3756 Column::Int(vals)
3757 }
3758 Column::Bool(_) => {
3759 let vals: Vec<bool> = self
3760 .index
3761 .groups
3762 .iter()
3763 .map(|g| {
3764 let s = &g.key_values[self
3765 .index
3766 .key_names
3767 .iter()
3768 .position(|k| k == key_name)
3769 .unwrap()];
3770 matches!(s.as_str(), "true" | "1")
3771 })
3772 .collect();
3773 Column::Bool(vals)
3774 }
3775 _ => {
3776 let vals: Vec<String> = self
3778 .index
3779 .groups
3780 .iter()
3781 .map(|g| {
3782 g.key_values[self
3783 .index
3784 .key_names
3785 .iter()
3786 .position(|k| k == key_name)
3787 .unwrap()]
3788 .clone()
3789 })
3790 .collect();
3791 Column::Str(vals)
3792 }
3793 };
3794 result_columns.push((key_name.clone(), col));
3795 }
3796
3797 for &(out_name, ref agg) in assignments {
3799 let col_vals = self.eval_agg_over_groups_fast(agg, n_groups, base)?;
3800 result_columns.push((out_name.to_string(), col_vals));
3801 }
3802
3803 let df = DataFrame::from_columns(result_columns)
3804 .map_err(|e| TidyError::Internal(e.to_string()))?;
3805 Ok(TidyFrame::from_df(df))
3806 }
3807
3808 #[allow(dead_code)]
3810 fn eval_agg_over_groups(
3811 &self,
3812 agg: &TidyAgg,
3813 n_groups: usize,
3814 base: &DataFrame,
3815 ) -> Result<Column, TidyError> {
3816 match agg {
3817 TidyAgg::Count => {
3818 let counts: Vec<i64> = self
3819 .index
3820 .groups
3821 .iter()
3822 .map(|g| g.row_indices.len() as i64)
3823 .collect();
3824 Ok(Column::Int(counts))
3825 }
3826
3827 TidyAgg::Sum(col_name) | TidyAgg::Mean(col_name)
3828 | TidyAgg::Min(col_name) | TidyAgg::Max(col_name)
3829 | TidyAgg::First(col_name) | TidyAgg::Last(col_name)
3830 | TidyAgg::Median(col_name) | TidyAgg::Sd(col_name)
3831 | TidyAgg::Var(col_name) | TidyAgg::Quantile(col_name, _)
3832 | TidyAgg::NDistinct(col_name) | TidyAgg::Iqr(col_name) => {
3833 let src = base
3834 .get_column(col_name)
3835 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3836
3837 let mut vals = Vec::with_capacity(n_groups);
3838 for group in &self.index.groups {
3839 let v = agg_reduce(agg, src, &group.row_indices)?;
3840 vals.push(v);
3841 }
3842 Ok(Column::Float(vals))
3843 }
3844 }
3845 }
3846
3847 fn eval_agg_over_groups_fast(
3850 &self,
3851 agg: &TidyAgg,
3852 n_groups: usize,
3853 base: &DataFrame,
3854 ) -> Result<Column, TidyError> {
3855 match agg {
3856 TidyAgg::Count => {
3857 let counts: Vec<i64> = self
3858 .index
3859 .groups
3860 .iter()
3861 .map(|g| g.row_indices.len() as i64)
3862 .collect();
3863 Ok(Column::Int(counts))
3864 }
3865 TidyAgg::Sum(col_name) => {
3866 let src = base.get_column(col_name)
3867 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3868 Ok(Column::Float(fast_agg_sum(&self.index.groups, src)?))
3869 }
3870 TidyAgg::Mean(col_name) => {
3871 let src = base.get_column(col_name)
3872 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3873 Ok(Column::Float(fast_agg_mean(&self.index.groups, src)?))
3874 }
3875 TidyAgg::Min(col_name) => {
3876 let src = base.get_column(col_name)
3877 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3878 Ok(Column::Float(fast_agg_min(&self.index.groups, src)?))
3879 }
3880 TidyAgg::Max(col_name) => {
3881 let src = base.get_column(col_name)
3882 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3883 Ok(Column::Float(fast_agg_max(&self.index.groups, src)?))
3884 }
3885 TidyAgg::First(col_name) => {
3886 let src = base.get_column(col_name)
3887 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3888 Ok(Column::Float(fast_agg_first(&self.index.groups, src)?))
3889 }
3890 TidyAgg::Last(col_name) => {
3891 let src = base.get_column(col_name)
3892 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3893 Ok(Column::Float(fast_agg_last(&self.index.groups, src)?))
3894 }
3895 TidyAgg::Var(col_name)
3896 | TidyAgg::Sd(col_name)
3897 | TidyAgg::Median(col_name)
3898 | TidyAgg::Quantile(col_name, _)
3899 | TidyAgg::NDistinct(col_name)
3900 | TidyAgg::Iqr(col_name) => {
3901 let src = base.get_column(col_name)
3902 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3903 Ok(Column::Float(fast_agg_arena(
3904 agg, &self.index.groups, src, n_groups,
3905 )?))
3906 }
3907 }
3908 }
3909}
3910
3911enum ColRef<'a> {
3914 Float(&'a [f64]),
3915 Int(&'a [i64]),
3916}
3917
3918fn col_to_ref(col: &Column) -> Result<ColRef<'_>, TidyError> {
3919 match col {
3920 Column::Float(v) => Ok(ColRef::Float(v)),
3921 Column::Int(v) => Ok(ColRef::Int(v)),
3922 _ => Err(TidyError::TypeMismatch {
3923 expected: "numeric (Int or Float)".into(),
3924 got: col.type_name().into(),
3925 }),
3926 }
3927}
3928
3929fn fast_agg_sum(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3930 use cjc_repro::kahan::KahanAccumulatorF64;
3931 let cr = col_to_ref(col)?;
3932 Ok(groups.iter().map(|g| {
3933 let mut acc = KahanAccumulatorF64::new();
3934 match cr {
3935 ColRef::Float(d) => { for &i in &g.row_indices { acc.add(d[i]); } }
3936 ColRef::Int(d) => { for &i in &g.row_indices { acc.add(d[i] as f64); } }
3937 }
3938 acc.finalize()
3939 }).collect())
3940}
3941
3942fn fast_agg_mean(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3943 use cjc_repro::kahan::KahanAccumulatorF64;
3944 let cr = col_to_ref(col)?;
3945 Ok(groups.iter().map(|g| {
3946 if g.row_indices.is_empty() { return f64::NAN; }
3947 let mut acc = KahanAccumulatorF64::new();
3948 match cr {
3949 ColRef::Float(d) => { for &i in &g.row_indices { acc.add(d[i]); } }
3950 ColRef::Int(d) => { for &i in &g.row_indices { acc.add(d[i] as f64); } }
3951 }
3952 acc.finalize() / g.row_indices.len() as f64
3953 }).collect())
3954}
3955
3956fn fast_agg_min(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3957 let cr = col_to_ref(col)?;
3958 Ok(groups.iter().map(|g| {
3959 if g.row_indices.is_empty() { return f64::NAN; }
3960 match cr {
3961 ColRef::Float(d) => g.row_indices.iter().fold(f64::INFINITY, |a, &i| {
3962 let b = d[i]; if b.is_nan() || b < a { b } else { a }
3963 }),
3964 ColRef::Int(d) => g.row_indices.iter().fold(f64::INFINITY, |a, &i| {
3965 let b = d[i] as f64; if b.is_nan() || b < a { b } else { a }
3966 }),
3967 }
3968 }).collect())
3969}
3970
3971fn fast_agg_max(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3972 let cr = col_to_ref(col)?;
3973 Ok(groups.iter().map(|g| {
3974 if g.row_indices.is_empty() { return f64::NAN; }
3975 match cr {
3976 ColRef::Float(d) => g.row_indices.iter().fold(f64::NEG_INFINITY, |a, &i| {
3977 let b = d[i]; if b.is_nan() || b > a { b } else { a }
3978 }),
3979 ColRef::Int(d) => g.row_indices.iter().fold(f64::NEG_INFINITY, |a, &i| {
3980 let b = d[i] as f64; if b.is_nan() || b > a { b } else { a }
3981 }),
3982 }
3983 }).collect())
3984}
3985
3986fn fast_agg_first(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3987 let cr = col_to_ref(col)?;
3988 let mut vals = Vec::with_capacity(groups.len());
3989 for g in groups {
3990 if g.row_indices.is_empty() { return Err(TidyError::EmptyGroup); }
3991 match cr {
3992 ColRef::Float(d) => vals.push(d[g.row_indices[0]]),
3993 ColRef::Int(d) => vals.push(d[g.row_indices[0]] as f64),
3994 }
3995 }
3996 Ok(vals)
3997}
3998
3999fn fast_agg_last(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
4000 let cr = col_to_ref(col)?;
4001 let mut vals = Vec::with_capacity(groups.len());
4002 for g in groups {
4003 if g.row_indices.is_empty() { return Err(TidyError::EmptyGroup); }
4004 let last = *g.row_indices.last().unwrap();
4005 match cr {
4006 ColRef::Float(d) => vals.push(d[last]),
4007 ColRef::Int(d) => vals.push(d[last] as f64),
4008 }
4009 }
4010 Ok(vals)
4011}
4012
4013fn fast_agg_arena(
4016 agg: &TidyAgg,
4017 groups: &[GroupMeta],
4018 col: &Column,
4019 n_groups: usize,
4020) -> Result<Vec<f64>, TidyError> {
4021 let cr = col_to_ref(col)?;
4022 let max_size = groups.iter().map(|g| g.row_indices.len()).max().unwrap_or(0);
4023 let mut arena: Vec<f64> = Vec::with_capacity(max_size);
4024 let mut results = Vec::with_capacity(n_groups);
4025 for group in groups {
4026 arena.clear();
4027 match cr {
4028 ColRef::Float(d) => { for &i in &group.row_indices { arena.push(d[i]); } }
4029 ColRef::Int(d) => { for &i in &group.row_indices { arena.push(d[i] as f64); } }
4030 }
4031 let val = agg_reduce_slice(agg, &mut arena)?;
4032 results.push(val);
4033 }
4034 Ok(results)
4035}
4036
4037fn agg_reduce_slice(agg: &TidyAgg, values: &mut [f64]) -> Result<f64, TidyError> {
4040 match agg {
4041 TidyAgg::Var(_) => {
4042 if values.len() < 2 {
4043 Ok(f64::NAN)
4044 } else {
4045 let n = values.len() as f64;
4046 let mean = kahan_sum_f64(values) / n;
4047 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4048 Ok(kahan_sum_f64(&sq_diffs) / (n - 1.0))
4049 }
4050 }
4051 TidyAgg::Sd(_) => {
4052 if values.len() < 2 {
4053 Ok(f64::NAN)
4054 } else {
4055 let n = values.len() as f64;
4056 let mean = kahan_sum_f64(values) / n;
4057 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4058 Ok((kahan_sum_f64(&sq_diffs) / (n - 1.0)).sqrt())
4059 }
4060 }
4061 TidyAgg::Median(_) => {
4062 if values.is_empty() {
4063 Ok(f64::NAN)
4064 } else {
4065 values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4066 let n = values.len();
4067 if n % 2 == 1 { Ok(values[n / 2]) }
4068 else { Ok((values[n / 2 - 1] + values[n / 2]) / 2.0) }
4069 }
4070 }
4071 TidyAgg::Quantile(_, p) => {
4072 if values.is_empty() {
4073 Ok(f64::NAN)
4074 } else {
4075 values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4076 let n = values.len();
4077 if n == 1 { return Ok(values[0]); }
4078 let pos = p * (n as f64 - 1.0);
4079 let lo = pos.floor() as usize;
4080 let hi = pos.ceil() as usize;
4081 let frac = pos - lo as f64;
4082 if lo == hi || hi >= n { Ok(values[lo.min(n - 1)]) }
4083 else { Ok(values[lo] + frac * (values[hi] - values[lo])) }
4084 }
4085 }
4086 TidyAgg::NDistinct(_) => {
4087 let distinct: std::collections::BTreeSet<u64> = values.iter().map(|v| v.to_bits()).collect();
4088 Ok(distinct.len() as f64)
4089 }
4090 TidyAgg::Iqr(_) => {
4091 if values.is_empty() {
4092 Ok(f64::NAN)
4093 } else {
4094 values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4095 let n = values.len();
4096 if n == 1 { return Ok(0.0); }
4097 let q1 = {
4098 let pos = 0.25 * (n as f64 - 1.0);
4099 let lo = pos.floor() as usize;
4100 let hi = pos.ceil() as usize;
4101 let frac = pos - lo as f64;
4102 if lo == hi || hi >= n { values[lo.min(n - 1)] }
4103 else { values[lo] + frac * (values[hi] - values[lo]) }
4104 };
4105 let q3 = {
4106 let pos = 0.75 * (n as f64 - 1.0);
4107 let lo = pos.floor() as usize;
4108 let hi = pos.ceil() as usize;
4109 let frac = pos - lo as f64;
4110 if lo == hi || hi >= n { values[lo.min(n - 1)] }
4111 else { values[lo] + frac * (values[hi] - values[lo]) }
4112 };
4113 Ok(q3 - q1)
4114 }
4115 }
4116 _ => unreachable!("agg_reduce_slice called for non-arena aggregator"),
4117 }
4118}
4119
4120#[allow(dead_code)]
4122fn agg_reduce(
4123 agg: &TidyAgg,
4124 col: &Column,
4125 rows: &[usize],
4126) -> Result<f64, TidyError> {
4127 let values: Vec<f64> = match col {
4129 Column::Int(v) => rows.iter().map(|&r| v[r] as f64).collect(),
4130 Column::Float(v) => rows.iter().map(|&r| v[r]).collect(),
4131 _ => {
4132 return Err(TidyError::TypeMismatch {
4133 expected: "numeric (Int or Float)".into(),
4134 got: col.type_name().into(),
4135 })
4136 }
4137 };
4138
4139 match agg {
4140 TidyAgg::Sum(_) => Ok(kahan_sum_f64(&values)),
4141 TidyAgg::Mean(_) => {
4142 if values.is_empty() {
4143 Ok(f64::NAN)
4144 } else {
4145 Ok(kahan_sum_f64(&values) / values.len() as f64)
4146 }
4147 }
4148 TidyAgg::Min(_) => {
4149 if values.is_empty() {
4150 Ok(f64::NAN)
4151 } else {
4152 Ok(values.iter().cloned().fold(f64::INFINITY, |a, b| {
4153 if b.is_nan() || b < a { b } else { a }
4154 }))
4155 }
4156 }
4157 TidyAgg::Max(_) => {
4158 if values.is_empty() {
4159 Ok(f64::NAN)
4160 } else {
4161 Ok(values.iter().cloned().fold(f64::NEG_INFINITY, |a, b| {
4162 if b.is_nan() || b > a { b } else { a }
4163 }))
4164 }
4165 }
4166 TidyAgg::First(_) => {
4167 if values.is_empty() {
4168 Err(TidyError::EmptyGroup)
4169 } else {
4170 Ok(values[0])
4171 }
4172 }
4173 TidyAgg::Last(_) => {
4174 if values.is_empty() {
4175 Err(TidyError::EmptyGroup)
4176 } else {
4177 Ok(*values.last().unwrap())
4178 }
4179 }
4180 TidyAgg::Count => Ok(values.len() as f64),
4181 TidyAgg::Median(_) => {
4182 if values.is_empty() {
4183 Ok(f64::NAN)
4184 } else {
4185 let mut sorted = values.clone();
4186 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4187 let n = sorted.len();
4188 if n % 2 == 1 {
4189 Ok(sorted[n / 2])
4190 } else {
4191 Ok((sorted[n / 2 - 1] + sorted[n / 2]) / 2.0)
4192 }
4193 }
4194 }
4195 TidyAgg::Var(_) => {
4196 if values.len() < 2 {
4197 Ok(f64::NAN)
4198 } else {
4199 let n = values.len() as f64;
4200 let mean = kahan_sum_f64(&values) / n;
4201 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4202 Ok(kahan_sum_f64(&sq_diffs) / (n - 1.0))
4203 }
4204 }
4205 TidyAgg::Sd(_) => {
4206 if values.len() < 2 {
4207 Ok(f64::NAN)
4208 } else {
4209 let n = values.len() as f64;
4210 let mean = kahan_sum_f64(&values) / n;
4211 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4212 Ok((kahan_sum_f64(&sq_diffs) / (n - 1.0)).sqrt())
4213 }
4214 }
4215 TidyAgg::Quantile(_, p) => {
4216 if values.is_empty() {
4217 Ok(f64::NAN)
4218 } else {
4219 let mut sorted = values.clone();
4220 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4221 let n = sorted.len();
4222 if n == 1 {
4223 return Ok(sorted[0]);
4224 }
4225 let pos = p * (n as f64 - 1.0);
4226 let lo = pos.floor() as usize;
4227 let hi = pos.ceil() as usize;
4228 let frac = pos - lo as f64;
4229 if lo == hi || hi >= n {
4230 Ok(sorted[lo.min(n - 1)])
4231 } else {
4232 Ok(sorted[lo] + frac * (sorted[hi] - sorted[lo]))
4233 }
4234 }
4235 }
4236 TidyAgg::NDistinct(_) => {
4237 use std::collections::BTreeSet;
4238 let distinct: BTreeSet<u64> = values.iter().map(|v| v.to_bits()).collect();
4239 Ok(distinct.len() as f64)
4240 }
4241 TidyAgg::Iqr(_) => {
4242 if values.is_empty() {
4243 Ok(f64::NAN)
4244 } else {
4245 let mut sorted = values.clone();
4246 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4247 let n = sorted.len();
4248 if n == 1 {
4249 return Ok(0.0);
4250 }
4251 let q1 = {
4252 let pos = 0.25 * (n as f64 - 1.0);
4253 let lo = pos.floor() as usize;
4254 let hi = pos.ceil() as usize;
4255 let frac = pos - lo as f64;
4256 if lo == hi || hi >= n { sorted[lo.min(n - 1)] }
4257 else { sorted[lo] + frac * (sorted[hi] - sorted[lo]) }
4258 };
4259 let q3 = {
4260 let pos = 0.75 * (n as f64 - 1.0);
4261 let lo = pos.floor() as usize;
4262 let hi = pos.ceil() as usize;
4263 let frac = pos - lo as f64;
4264 if lo == hi || hi >= n { sorted[lo.min(n - 1)] }
4265 else { sorted[lo] + frac * (sorted[hi] - sorted[lo]) }
4266 };
4267 Ok(q3 - q1)
4268 }
4269 }
4270 }
4271}
4272
4273#[derive(Debug, Clone)]
4277pub enum TidyAgg {
4278 Count,
4280 Sum(String),
4282 Mean(String),
4284 Min(String),
4286 Max(String),
4288 First(String),
4290 Last(String),
4292 Median(String),
4294 Sd(String),
4296 Var(String),
4298 Quantile(String, f64),
4300 NDistinct(String),
4302 Iqr(String),
4304}
4305
4306#[derive(Debug, Clone)]
4310pub struct ArrangeKey {
4311 pub col_name: String,
4313 pub descending: bool,
4315}
4316
4317impl ArrangeKey {
4318 pub fn asc(col_name: &str) -> Self {
4320 ArrangeKey { col_name: col_name.to_string(), descending: false }
4321 }
4322 pub fn desc(col_name: &str) -> Self {
4324 ArrangeKey { col_name: col_name.to_string(), descending: true }
4325 }
4326}
4327
4328impl TidyView {
4331
4332 pub fn group_by(&self, keys: &[&str]) -> Result<GroupedTidyView, TidyError> {
4346 let mut key_col_indices = Vec::with_capacity(keys.len());
4348 for &key in keys {
4349 let idx = self
4350 .base
4351 .columns
4352 .iter()
4353 .position(|(n, _)| n == key)
4354 .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
4355 key_col_indices.push(idx);
4356 }
4357
4358 let key_names: Vec<String> = keys.iter().map(|s| s.to_string()).collect();
4359
4360 let index = GroupIndex::build_fast(&self.base, &key_col_indices, self.mask.iter_indices(), key_names);
4364
4365 Ok(GroupedTidyView {
4366 view: self.clone(),
4367 index,
4368 })
4369 }
4370
4371 pub fn arrange(&self, keys: &[ArrangeKey]) -> Result<TidyView, TidyError> {
4391 for key in keys {
4393 if self.base.get_column(&key.col_name).is_none() {
4394 return Err(TidyError::ColumnNotFound(key.col_name.clone()));
4395 }
4396 }
4397
4398 let mut row_indices: Vec<usize> = self.mask.iter_indices().collect();
4400
4401 enum ArrangeKeyResolved<'a> {
4412 CatCodes { codes: &'a [u32], descending: bool },
4413 Legacy { col: &'a Column, descending: bool },
4414 }
4415
4416 fn levels_are_sorted(levels: &[String]) -> bool {
4417 levels.windows(2).all(|w| w[0] <= w[1])
4418 }
4419
4420 let resolved: Vec<ArrangeKeyResolved> = keys
4421 .iter()
4422 .map(|key| {
4423 let col = self.base.get_column(&key.col_name).unwrap();
4424 match col {
4425 Column::Categorical { levels, codes } if levels_are_sorted(levels) => {
4426 ArrangeKeyResolved::CatCodes {
4427 codes: codes.as_slice(),
4428 descending: key.descending,
4429 }
4430 }
4431 _ => ArrangeKeyResolved::Legacy { col, descending: key.descending },
4432 }
4433 })
4434 .collect();
4435
4436 row_indices.sort_by(|&a, &b| {
4438 for resolved_key in &resolved {
4439 let ord = match resolved_key {
4440 ArrangeKeyResolved::CatCodes { codes, descending } => {
4441 let raw = codes[a].cmp(&codes[b]);
4442 if *descending { raw.reverse() } else { raw }
4443 }
4444 ArrangeKeyResolved::Legacy { col, descending } => {
4445 let raw = compare_column_rows(col, a, b);
4446 if *descending { raw.reverse() } else { raw }
4447 }
4448 };
4449 if ord != std::cmp::Ordering::Equal {
4450 return ord;
4451 }
4452 }
4453 std::cmp::Ordering::Equal
4454 });
4455
4456 let mut new_columns = Vec::with_capacity(self.proj.len());
4458 for &ci in self.proj.indices() {
4459 let (name, col) = &self.base.columns[ci];
4460 let new_col = gather_column(col, &row_indices);
4461 new_columns.push((name.clone(), new_col));
4462 }
4463 let mut sorted_all_cols = Vec::with_capacity(self.base.ncols());
4466 for (name, col) in &self.base.columns {
4467 sorted_all_cols.push((name.clone(), gather_column(col, &row_indices)));
4468 }
4469
4470 let new_base = DataFrame::from_columns(sorted_all_cols)
4471 .map_err(|e| TidyError::Internal(e.to_string()))?;
4472 let nrows = new_base.nrows();
4473 let new_proj = self.proj.clone();
4474
4475 Ok(TidyView {
4476 base: Rc::new(new_base),
4477 mask: AdaptiveSelection::all(nrows),
4478 proj: new_proj,
4479 })
4480 }
4481
4482 pub fn slice(&self, start: usize, end: usize) -> TidyView {
4489 let visible: Vec<usize> = self.mask.iter_indices().collect();
4490 let n = visible.len();
4491 let s = start.min(n);
4492 let e = end.min(n);
4493 let selected = if s >= e { vec![] } else { visible[s..e].to_vec() };
4494 self.view_from_row_indices(selected)
4495 }
4496
4497 pub fn slice_head(&self, n: usize) -> TidyView {
4499 self.slice(0, n)
4500 }
4501
4502 pub fn slice_tail(&self, n: usize) -> TidyView {
4504 let total = self.mask.count();
4505 let start = total.saturating_sub(n);
4506 self.slice(start, total)
4507 }
4508
4509 pub fn slice_sample(&self, n: usize, seed: u64) -> TidyView {
4514 let mut visible: Vec<usize> = self.mask.iter_indices().collect();
4515 let total = visible.len();
4516 if n >= total {
4517 return self.view_from_row_indices(visible);
4518 }
4519 let mut rng = seed;
4521 let selected_count = n;
4522 for i in 0..selected_count {
4523 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407);
4525 let j = i + (rng as usize % (total - i));
4526 visible.swap(i, j);
4527 }
4528 visible.truncate(selected_count);
4529 visible.sort_unstable();
4531 self.view_from_row_indices(visible)
4532 }
4533
4534 pub fn distinct(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
4546 let mut col_indices = Vec::with_capacity(cols.len());
4548 for &name in cols {
4549 let idx = self
4550 .base
4551 .columns
4552 .iter()
4553 .position(|(n, _)| n == name)
4554 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
4555 col_indices.push(idx);
4556 }
4557
4558 if let Some(cat_keys) = collect_categorical_keys(&self.base, &col_indices) {
4564 let mut seen_codes: BTreeSet<Vec<u32>> = BTreeSet::new();
4565 let mut selected_rows: Vec<usize> = Vec::new();
4566 let mut key_buf: Vec<u32> = Vec::with_capacity(cat_keys.codes.len());
4567 for row in self.mask.iter_indices() {
4568 key_buf.clear();
4569 for c in &cat_keys.codes {
4570 key_buf.push(c[row]);
4571 }
4572 if seen_codes.insert(key_buf.clone()) {
4573 selected_rows.push(row);
4574 }
4575 }
4576 return Ok(self.view_from_row_indices(selected_rows));
4577 }
4578
4579 let mut seen_keys: BTreeSet<Vec<String>> = BTreeSet::new();
4581 let mut selected_rows: Vec<usize> = Vec::new();
4582
4583 for row in self.mask.iter_indices() {
4584 let key: Vec<String> = if col_indices.is_empty() {
4585 vec!["__all__".into()]
4586 } else {
4587 col_indices
4588 .iter()
4589 .map(|&ci| self.base.columns[ci].1.get_display(row))
4590 .collect()
4591 };
4592
4593 if seen_keys.insert(key) {
4594 selected_rows.push(row);
4595 }
4596 }
4597
4598 Ok(self.view_from_row_indices(selected_rows))
4599 }
4600
4601 pub fn inner_join(
4614 &self,
4615 right: &TidyView,
4616 on: &[(&str, &str)],
4617 ) -> Result<TidyFrame, TidyError> {
4618 let (left_rows, right_rows) = join_match_rows(self, right, on, JoinKind::Inner)?;
4619 build_join_frame(self, right, &left_rows, &right_rows, on, false)
4620 }
4621
4622 pub fn left_join(
4626 &self,
4627 right: &TidyView,
4628 on: &[(&str, &str)],
4629 ) -> Result<TidyFrame, TidyError> {
4630 let (left_rows, right_rows_opt) =
4631 join_match_rows_optional(self, right, on, JoinKind::Left)?;
4632 build_left_join_frame(self, right, &left_rows, &right_rows_opt, on)
4633 }
4634
4635 pub fn semi_join(
4639 &self,
4640 right: &TidyView,
4641 on: &[(&str, &str)],
4642 ) -> Result<TidyView, TidyError> {
4643 let included = semi_anti_match_rows(self, right, on, true)?;
4644 Ok(self.view_from_row_indices(included))
4645 }
4646
4647 pub fn anti_join(
4651 &self,
4652 right: &TidyView,
4653 on: &[(&str, &str)],
4654 ) -> Result<TidyView, TidyError> {
4655 let included = semi_anti_match_rows(self, right, on, false)?;
4656 Ok(self.view_from_row_indices(included))
4657 }
4658
4659 fn view_from_row_indices(&self, row_indices: Vec<usize>) -> TidyView {
4664 let nrows_base = self.base.nrows();
4665 let mut words = vec![0u64; nwords_for(nrows_base)];
4666 for &r in &row_indices {
4667 words[r / 64] |= 1u64 << (r % 64);
4668 }
4669 TidyView {
4670 base: Rc::clone(&self.base),
4671 mask: AdaptiveSelection::from_predicate_result(words, nrows_base),
4672 proj: self.proj.clone(),
4673 }
4674 }
4675}
4676
4677#[derive(Clone, Copy)]
4680enum JoinKind { Inner, Left }
4681
4682fn resolve_join_keys(
4684 left: &TidyView,
4685 right: &TidyView,
4686 on: &[(&str, &str)],
4687) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
4688 let mut li = Vec::new();
4689 let mut ri = Vec::new();
4690 for &(lk, rk) in on {
4691 let l = left.base.columns.iter().position(|(n, _)| n == lk)
4692 .ok_or_else(|| TidyError::ColumnNotFound(lk.to_string()))?;
4693 let r = right.base.columns.iter().position(|(n, _)| n == rk)
4694 .ok_or_else(|| TidyError::ColumnNotFound(rk.to_string()))?;
4695 li.push(l);
4696 ri.push(r);
4697 }
4698 Ok((li, ri))
4699}
4700
4701fn row_key(base: &DataFrame, col_indices: &[usize], row: usize) -> Vec<String> {
4703 col_indices
4704 .iter()
4705 .map(|&ci| base.columns[ci].1.get_display(row))
4706 .collect()
4707}
4708
4709fn build_right_lookup(
4712 right: &TidyView,
4713 right_key_cols: &[usize],
4714) -> Vec<(Vec<String>, usize)> {
4715 let mut lookup: Vec<(Vec<String>, usize)> = right
4716 .mask
4717 .iter_indices()
4718 .map(|r| (row_key(&right.base, right_key_cols, r), r))
4719 .collect();
4720 lookup.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
4722 lookup
4723}
4724
4725fn find_matches(lookup: &[(Vec<String>, usize)], key: &[String]) -> Vec<usize> {
4727 let key_vec = key.to_vec();
4729 let start = lookup.partition_point(|(k, _)| k.as_slice() < key_vec.as_slice());
4730 let mut matches = Vec::new();
4731 for (k, r) in &lookup[start..] {
4732 if k == &key_vec {
4733 matches.push(*r);
4734 } else {
4735 break;
4736 }
4737 }
4738 matches
4739}
4740
4741fn build_right_lookup_btree(
4746 right: &TidyView,
4747 right_key_cols: &[usize],
4748) -> BTreeMap<Vec<String>, Vec<usize>> {
4749 let mut lookup: BTreeMap<Vec<String>, Vec<usize>> = BTreeMap::new();
4750 for r in right.mask.iter_indices() {
4751 let key = row_key(&right.base, right_key_cols, r);
4752 lookup.entry(key).or_default().push(r);
4753 }
4754 lookup
4755}
4756
4757pub(crate) struct CategoricalJoinKeys<'a> {
4776 pub(crate) left_codes: Vec<&'a [u32]>,
4778 pub(crate) right_codes: Vec<&'a [u32]>,
4780 pub(crate) right_to_left: Vec<Vec<Option<u32>>>,
4783}
4784
4785pub(crate) fn collect_categorical_join_keys<'a>(
4790 left_base: &'a DataFrame,
4791 left_cols: &[usize],
4792 right_base: &'a DataFrame,
4793 right_cols: &[usize],
4794) -> Option<CategoricalJoinKeys<'a>> {
4795 if left_cols.is_empty() || left_cols.len() != right_cols.len() {
4796 return None;
4797 }
4798 let mut left_codes = Vec::with_capacity(left_cols.len());
4799 let mut right_codes = Vec::with_capacity(left_cols.len());
4800 let mut right_to_left = Vec::with_capacity(left_cols.len());
4801
4802 for (li, ri) in left_cols.iter().zip(right_cols.iter()) {
4803 match (&left_base.columns[*li].1, &right_base.columns[*ri].1) {
4804 (
4805 Column::Categorical { levels: ll, codes: lc },
4806 Column::Categorical { levels: rl, codes: rc },
4807 ) => {
4808 let mut left_lookup: BTreeMap<&str, u32> = BTreeMap::new();
4812 for (i, lv) in ll.iter().enumerate() {
4813 left_lookup.insert(lv.as_str(), i as u32);
4814 }
4815 let remap: Vec<Option<u32>> = rl
4817 .iter()
4818 .map(|rv| left_lookup.get(rv.as_str()).copied())
4819 .collect();
4820 left_codes.push(lc.as_slice());
4821 right_codes.push(rc.as_slice());
4822 right_to_left.push(remap);
4823 }
4824 _ => return None,
4825 }
4826 }
4827 Some(CategoricalJoinKeys {
4828 left_codes,
4829 right_codes,
4830 right_to_left,
4831 })
4832}
4833
4834fn build_right_lookup_btree_categorical<'a>(
4839 cat: &CategoricalJoinKeys<'a>,
4840 right_visible: impl Iterator<Item = usize>,
4841) -> BTreeMap<Vec<u32>, Vec<usize>> {
4842 let nkeys = cat.right_codes.len();
4843 let mut lookup: BTreeMap<Vec<u32>, Vec<usize>> = BTreeMap::new();
4844 let mut key_buf: Vec<u32> = Vec::with_capacity(nkeys);
4845 for r in right_visible {
4846 key_buf.clear();
4847 let mut all_mappable = true;
4848 for i in 0..nkeys {
4849 let rc = cat.right_codes[i][r];
4850 match cat.right_to_left[i][rc as usize] {
4851 Some(lc) => key_buf.push(lc),
4852 None => {
4853 all_mappable = false;
4854 break;
4855 }
4856 }
4857 }
4858 if all_mappable {
4859 lookup.entry(key_buf.clone()).or_default().push(r);
4860 }
4861 }
4862 lookup
4863}
4864
4865#[inline]
4867fn left_join_key_codes(cat: &CategoricalJoinKeys<'_>, row: usize, buf: &mut Vec<u32>) {
4868 buf.clear();
4869 for codes in &cat.left_codes {
4870 buf.push(codes[row]);
4871 }
4872}
4873
4874fn join_match_rows(
4876 left: &TidyView,
4877 right: &TidyView,
4878 on: &[(&str, &str)],
4879 _kind: JoinKind,
4880) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
4881 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
4882
4883 if let Some(cat) =
4887 collect_categorical_join_keys(&left.base, &left_key_cols, &right.base, &right_key_cols)
4888 {
4889 let lookup = build_right_lookup_btree_categorical(&cat, right.mask.iter_indices());
4890 let mut out_left = Vec::new();
4891 let mut out_right = Vec::new();
4892 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.left_codes.len());
4893 for l_row in left.mask.iter_indices() {
4894 left_join_key_codes(&cat, l_row, &mut key_buf);
4895 if let Some(matches) = lookup.get(&key_buf) {
4896 for &r_row in matches {
4897 out_left.push(l_row);
4898 out_right.push(r_row);
4899 }
4900 }
4901 }
4902 return Ok((out_left, out_right));
4903 }
4904
4905 let lookup = build_right_lookup_btree(right, &right_key_cols);
4907
4908 let mut out_left = Vec::new();
4909 let mut out_right = Vec::new();
4910
4911 for l_row in left.mask.iter_indices() {
4912 let key = row_key(&left.base, &left_key_cols, l_row);
4913 if let Some(matches) = lookup.get(&key) {
4914 for &r_row in matches {
4915 out_left.push(l_row);
4916 out_right.push(r_row);
4917 }
4918 }
4919 }
4920 Ok((out_left, out_right))
4921}
4922
4923fn join_match_rows_optional(
4925 left: &TidyView,
4926 right: &TidyView,
4927 on: &[(&str, &str)],
4928 _kind: JoinKind,
4929) -> Result<(Vec<usize>, Vec<Option<usize>>), TidyError> {
4930 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
4931
4932 if let Some(cat) =
4934 collect_categorical_join_keys(&left.base, &left_key_cols, &right.base, &right_key_cols)
4935 {
4936 let lookup = build_right_lookup_btree_categorical(&cat, right.mask.iter_indices());
4937 let mut out_left = Vec::new();
4938 let mut out_right: Vec<Option<usize>> = Vec::new();
4939 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.left_codes.len());
4940 for l_row in left.mask.iter_indices() {
4941 left_join_key_codes(&cat, l_row, &mut key_buf);
4942 match lookup.get(&key_buf) {
4943 Some(matches) if !matches.is_empty() => {
4944 for &r_row in matches {
4945 out_left.push(l_row);
4946 out_right.push(Some(r_row));
4947 }
4948 }
4949 _ => {
4950 out_left.push(l_row);
4951 out_right.push(None);
4952 }
4953 }
4954 }
4955 return Ok((out_left, out_right));
4956 }
4957
4958 let lookup = build_right_lookup_btree(right, &right_key_cols);
4960
4961 let mut out_left = Vec::new();
4962 let mut out_right: Vec<Option<usize>> = Vec::new();
4963
4964 for l_row in left.mask.iter_indices() {
4965 let key = row_key(&left.base, &left_key_cols, l_row);
4966 match lookup.get(&key) {
4967 Some(matches) if !matches.is_empty() => {
4968 for &r_row in matches {
4969 out_left.push(l_row);
4970 out_right.push(Some(r_row));
4971 }
4972 }
4973 _ => {
4974 out_left.push(l_row);
4975 out_right.push(None);
4976 }
4977 }
4978 }
4979 Ok((out_left, out_right))
4980}
4981
4982fn semi_anti_match_rows(
4984 left: &TidyView,
4985 right: &TidyView,
4986 on: &[(&str, &str)],
4987 semi: bool,
4988) -> Result<Vec<usize>, TidyError> {
4989 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
4990
4991 if let Some(cat) =
4993 collect_categorical_join_keys(&left.base, &left_key_cols, &right.base, &right_key_cols)
4994 {
4995 let lookup = build_right_lookup_btree_categorical(&cat, right.mask.iter_indices());
4996 let mut out = Vec::new();
4997 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.left_codes.len());
4998 for l_row in left.mask.iter_indices() {
4999 left_join_key_codes(&cat, l_row, &mut key_buf);
5000 let has_match = lookup.contains_key(&key_buf);
5001 if has_match == semi {
5002 out.push(l_row);
5003 }
5004 }
5005 return Ok(out);
5006 }
5007
5008 let lookup = build_right_lookup_btree(right, &right_key_cols);
5010
5011 let mut out = Vec::new();
5012 for l_row in left.mask.iter_indices() {
5013 let key = row_key(&left.base, &left_key_cols, l_row);
5014 let has_match = lookup.contains_key(&key);
5015 if has_match == semi {
5016 out.push(l_row);
5017 }
5018 }
5019 Ok(out)
5020}
5021
5022fn build_join_frame(
5025 left: &TidyView,
5026 right: &TidyView,
5027 left_rows: &[usize],
5028 right_rows: &[usize],
5029 on: &[(&str, &str)],
5030 _include_unmatched: bool,
5031) -> Result<TidyFrame, TidyError> {
5032 let right_key_names: std::collections::BTreeSet<&str> =
5033 on.iter().map(|(_, rk)| *rk).collect();
5034
5035 let n = left_rows.len();
5036 let mut columns: Vec<(String, Column)> = Vec::new();
5037
5038 for &ci in left.proj.indices() {
5040 let (name, col) = &left.base.columns[ci];
5041 columns.push((name.clone(), gather_column(col, left_rows)));
5042 }
5043
5044 for &ci in right.proj.indices() {
5046 let (name, col) = &right.base.columns[ci];
5047 if right_key_names.contains(name.as_str()) {
5048 continue;
5049 }
5050 columns.push((name.clone(), gather_column(col, right_rows)));
5051 }
5052
5053 assert_eq!(n, left_rows.len());
5054 let df = DataFrame::from_columns(columns)
5055 .map_err(|e| TidyError::Internal(e.to_string()))?;
5056 Ok(TidyFrame::from_df(df))
5057}
5058
5059fn build_left_join_frame(
5061 left: &TidyView,
5062 right: &TidyView,
5063 left_rows: &[usize],
5064 right_rows_opt: &[Option<usize>],
5065 on: &[(&str, &str)],
5066) -> Result<TidyFrame, TidyError> {
5067 let right_key_names: std::collections::BTreeSet<&str> =
5068 on.iter().map(|(_, rk)| *rk).collect();
5069
5070 let mut columns: Vec<(String, Column)> = Vec::new();
5071
5072 for &ci in left.proj.indices() {
5074 let (name, col) = &left.base.columns[ci];
5075 columns.push((name.clone(), gather_column(col, left_rows)));
5076 }
5077
5078 for &ci in right.proj.indices() {
5080 let (name, col) = &right.base.columns[ci];
5081 if right_key_names.contains(name.as_str()) {
5082 continue;
5083 }
5084 let new_col = gather_column_nullable(col, right_rows_opt);
5085 columns.push((name.clone(), new_col));
5086 }
5087
5088 let df = DataFrame::from_columns(columns)
5089 .map_err(|e| TidyError::Internal(e.to_string()))?;
5090 Ok(TidyFrame::from_df(df))
5091}
5092
5093fn compare_column_rows(col: &Column, a: usize, b: usize) -> std::cmp::Ordering {
5100 match col {
5101 Column::Int(v) => v[a].cmp(&v[b]),
5102 Column::Float(v) => {
5103 let va = v[a];
5104 let vb = v[b];
5105 match (va.is_nan(), vb.is_nan()) {
5106 (true, true) => std::cmp::Ordering::Equal,
5107 (true, false) => std::cmp::Ordering::Greater, (false, true) => std::cmp::Ordering::Less,
5109 (false, false) => va.partial_cmp(&vb).unwrap_or(std::cmp::Ordering::Equal),
5110 }
5111 }
5112 Column::Bool(v) => v[a].cmp(&v[b]),
5113 Column::Str(v) => v[a].cmp(&v[b]),
5114 Column::Categorical { levels, codes } => {
5115 levels[codes[a] as usize].cmp(&levels[codes[b] as usize])
5117 }
5118 Column::CategoricalAdaptive(cc) => {
5119 cc.get(a).cmp(&cc.get(b))
5122 }
5123 Column::DateTime(v) => v[a].cmp(&v[b]),
5124 }
5125}
5126
5127#[cfg(test)]
5148mod phase10_unit_tests {
5149 use super::*;
5150
5151 fn make_df() -> DataFrame {
5152 DataFrame::from_columns(vec![
5153 ("x".into(), Column::Int(vec![1, 2, 3, 4, 5])),
5154 ("y".into(), Column::Float(vec![1.0, 2.0, 3.0, 4.0, 5.0])),
5155 ("flag".into(), Column::Bool(vec![true, false, true, false, true])),
5156 ])
5157 .unwrap()
5158 }
5159
5160 #[test]
5161 fn bitmask_all_true_count() {
5162 let m = BitMask::all_true(5);
5163 assert_eq!(m.count_ones(), 5);
5164 }
5165
5166 #[test]
5167 fn bitmask_all_false_count() {
5168 let m = BitMask::all_false(5);
5169 assert_eq!(m.count_ones(), 0);
5170 }
5171
5172 #[test]
5173 fn bitmask_tail_bits_clean() {
5174 let m = BitMask::all_true(65);
5176 assert_eq!(m.count_ones(), 65);
5177 assert_eq!(m.words.len(), 2);
5178 assert_eq!(m.words[1], 1u64); }
5180
5181 #[test]
5182 fn bitmask_and_semantics() {
5183 let a = BitMask::from_bools(&[true, true, false, true]);
5184 let b = BitMask::from_bools(&[true, false, true, true]);
5185 let c = a.and(&b);
5186 assert!(c.get(0));
5187 assert!(!c.get(1));
5188 assert!(!c.get(2));
5189 assert!(c.get(3));
5190 }
5191
5192 #[test]
5193 fn tidy_view_nrows_ncols() {
5194 let df = make_df();
5195 let v = df.tidy();
5196 assert_eq!(v.nrows(), 5);
5197 assert_eq!(v.ncols(), 3);
5198 }
5199
5200 #[test]
5201 fn filter_basic() {
5202 let df = make_df();
5203 let v = df.tidy();
5204 let filtered = v
5205 .filter(&DExpr::BinOp {
5206 op: DBinOp::Gt,
5207 left: Box::new(DExpr::Col("x".into())),
5208 right: Box::new(DExpr::LitInt(2)),
5209 })
5210 .unwrap();
5211 assert_eq!(filtered.nrows(), 3);
5212 }
5213
5214 #[test]
5215 fn filter_empty_df() {
5216 let df = DataFrame::from_columns(vec![
5217 ("x".into(), Column::Int(vec![])),
5218 ])
5219 .unwrap();
5220 let v = df.tidy();
5221 let filtered = v
5222 .filter(&DExpr::BinOp {
5223 op: DBinOp::Gt,
5224 left: Box::new(DExpr::Col("x".into())),
5225 right: Box::new(DExpr::LitInt(0)),
5226 })
5227 .unwrap();
5228 assert_eq!(filtered.nrows(), 0);
5229 }
5230
5231 #[test]
5232 fn select_reorder() {
5233 let df = make_df();
5234 let v = df.tidy();
5235 let s = v.select(&["y", "x"]).unwrap();
5236 assert_eq!(s.column_names(), vec!["y", "x"]);
5237 }
5238
5239 #[test]
5240 fn select_zero_cols() {
5241 let df = make_df();
5242 let v = df.tidy();
5243 let s = v.select(&[]).unwrap();
5244 assert_eq!(s.ncols(), 0);
5245 assert_eq!(s.nrows(), 5);
5246 }
5247
5248 #[test]
5249 fn select_unknown_col() {
5250 let df = make_df();
5251 let v = df.tidy();
5252 let err = v.select(&["nonexistent"]).unwrap_err();
5253 assert!(matches!(err, TidyError::ColumnNotFound(_)));
5254 }
5255
5256 #[test]
5257 fn select_duplicate_col() {
5258 let df = make_df();
5259 let v = df.tidy();
5260 let err = v.select(&["x", "x"]).unwrap_err();
5261 assert!(matches!(err, TidyError::DuplicateColumn(_)));
5262 }
5263
5264 #[test]
5265 fn mutate_new_col() {
5266 let df = make_df();
5267 let v = df.tidy();
5268 let frame = v
5269 .mutate(&[("z", DExpr::BinOp {
5270 op: DBinOp::Add,
5271 left: Box::new(DExpr::Col("x".into())),
5272 right: Box::new(DExpr::LitInt(10)),
5273 })])
5274 .unwrap();
5275 let b = frame.borrow();
5276 let z = b.get_column("z").unwrap();
5277 assert_eq!(z.len(), 5);
5278 if let Column::Int(v) = z {
5279 assert_eq!(v[0], 11);
5280 assert_eq!(v[4], 15);
5281 } else {
5282 panic!("expected Int column");
5283 }
5284 }
5285
5286 #[test]
5287 fn mutate_type_promotion() {
5288 let df = make_df();
5289 let v = df.tidy();
5290 let frame = v
5292 .mutate(&[("promoted", DExpr::BinOp {
5293 op: DBinOp::Add,
5294 left: Box::new(DExpr::Col("x".into())),
5295 right: Box::new(DExpr::Col("y".into())),
5296 })])
5297 .unwrap();
5298 let b = frame.borrow();
5299 let col = b.get_column("promoted").unwrap();
5300 assert!(matches!(col, Column::Float(_)));
5301 }
5302}
5303
5304impl TidyError {
5356 pub fn schema_mismatch(msg: impl Into<String>) -> Self {
5358 TidyError::Internal(format!("schema mismatch: {}", msg.into()))
5359 }
5360 pub fn join_type_mismatch(col: &str, lt: &str, rt: &str) -> Self {
5362 TidyError::TypeMismatch {
5363 expected: format!("{} (from left key `{}`)", lt, col),
5364 got: rt.to_string(),
5365 }
5366 }
5367 pub fn duplicate_key(key: impl Into<String>) -> Self {
5369 TidyError::DuplicateColumn(format!("duplicate key: {}", key.into()))
5370 }
5371 pub fn empty_selection(msg: impl Into<String>) -> Self {
5373 TidyError::Internal(format!("empty selection: {}", msg.into()))
5374 }
5375}
5376
5377#[derive(Debug, Clone)]
5386pub struct NullableColumn<T: Clone> {
5387 pub values: Vec<T>,
5388 pub validity: BitMask,
5389}
5390
5391impl<T: Clone + Default> NullableColumn<T> {
5392 pub fn from_values(values: Vec<T>) -> Self {
5394 let n = values.len();
5395 Self {
5396 values,
5397 validity: BitMask::all_true(n),
5398 }
5399 }
5400
5401 pub fn new(values: Vec<T>, validity: BitMask) -> Self {
5404 assert_eq!(values.len(), validity.nrows(), "NullableColumn: length mismatch");
5405 Self { values, validity }
5406 }
5407
5408 pub fn len(&self) -> usize {
5410 self.values.len()
5411 }
5412
5413 pub fn is_null(&self, i: usize) -> bool {
5415 !self.validity.get(i)
5416 }
5417
5418 pub fn get(&self, i: usize) -> Option<&T> {
5420 if self.validity.get(i) { Some(&self.values[i]) } else { None }
5421 }
5422
5423 pub fn count_valid(&self) -> usize {
5425 self.validity.count_ones()
5426 }
5427
5428 pub fn gather(&self, indices: &[usize]) -> Self {
5430 let mut vals = Vec::with_capacity(indices.len());
5431 let mut bools = Vec::with_capacity(indices.len());
5432 for &i in indices {
5433 vals.push(self.values[i].clone());
5434 bools.push(self.validity.get(i));
5435 }
5436 let validity = BitMask::from_bools(&bools);
5437 Self { values: vals, validity }
5438 }
5439}
5440
5441#[derive(Debug, Clone)]
5454pub enum NullCol {
5455 Int(NullableColumn<i64>),
5457 Float(NullableColumn<f64>),
5459 Str(NullableColumn<String>),
5461 Bool(NullableColumn<bool>),
5463}
5464
5465impl NullCol {
5466 pub fn len(&self) -> usize {
5468 match self {
5469 NullCol::Int(c) => c.len(),
5470 NullCol::Float(c) => c.len(),
5471 NullCol::Str(c) => c.len(),
5472 NullCol::Bool(c) => c.len(),
5473 }
5474 }
5475
5476 pub fn is_null(&self, i: usize) -> bool {
5478 match self {
5479 NullCol::Int(c) => c.is_null(i),
5480 NullCol::Float(c) => c.is_null(i),
5481 NullCol::Str(c) => c.is_null(i),
5482 NullCol::Bool(c) => c.is_null(i),
5483 }
5484 }
5485
5486 pub fn type_name(&self) -> &'static str {
5488 match self {
5489 NullCol::Int(_) => "Int",
5490 NullCol::Float(_) => "Float",
5491 NullCol::Str(_) => "Str",
5492 NullCol::Bool(_) => "Bool",
5493 }
5494 }
5495
5496 pub fn from_column(col: &Column) -> Self {
5498 match col {
5499 Column::Int(v) => NullCol::Int(NullableColumn::from_values(v.clone())),
5500 Column::Float(v) => NullCol::Float(NullableColumn::from_values(v.clone())),
5501 Column::Str(v) => NullCol::Str(NullableColumn::from_values(v.clone())),
5502 Column::Bool(v) => NullCol::Bool(NullableColumn::from_values(v.clone())),
5503 Column::Categorical { levels, codes } => {
5505 let strings: Vec<String> = codes.iter().map(|&c| levels[c as usize].clone()).collect();
5506 NullCol::Str(NullableColumn::from_values(strings))
5507 }
5508 Column::CategoricalAdaptive(cc) => {
5509 let n = cc.len();
5510 let strings: Vec<String> = (0..n)
5511 .map(|i| match cc.get(i) {
5512 None => String::new(),
5513 Some(b) => String::from_utf8_lossy(b).into_owned(),
5514 })
5515 .collect();
5516 NullCol::Str(NullableColumn::from_values(strings))
5517 }
5518 Column::DateTime(v) => NullCol::Int(NullableColumn::from_values(v.clone())),
5519 }
5520 }
5521
5522 pub fn to_column_strict(&self) -> Result<Column, TidyError> {
5525 match self {
5526 NullCol::Int(nc) => {
5527 if nc.count_valid() == nc.len() {
5528 Ok(Column::Int(nc.values.clone()))
5529 } else {
5530 Err(TidyError::Internal("null values in non-nullable context".into()))
5531 }
5532 }
5533 NullCol::Float(nc) => {
5534 if nc.count_valid() == nc.len() {
5535 Ok(Column::Float(nc.values.clone()))
5536 } else {
5537 Err(TidyError::Internal("null values in non-nullable context".into()))
5538 }
5539 }
5540 NullCol::Str(nc) => {
5541 if nc.count_valid() == nc.len() {
5542 Ok(Column::Str(nc.values.clone()))
5543 } else {
5544 Err(TidyError::Internal("null values in non-nullable context".into()))
5545 }
5546 }
5547 NullCol::Bool(nc) => {
5548 if nc.count_valid() == nc.len() {
5549 Ok(Column::Bool(nc.values.clone()))
5550 } else {
5551 Err(TidyError::Internal("null values in non-nullable context".into()))
5552 }
5553 }
5554 }
5555 }
5556
5557 pub fn to_column_filled(&self) -> Column {
5560 match self {
5561 NullCol::Int(nc) => Column::Int(nc.values.clone()),
5562 NullCol::Float(nc) => {
5563 let v: Vec<f64> = (0..nc.len())
5564 .map(|i| if nc.is_null(i) { f64::NAN } else { nc.values[i] })
5565 .collect();
5566 Column::Float(v)
5567 }
5568 NullCol::Str(nc) => Column::Str(nc.values.clone()),
5569 NullCol::Bool(nc) => Column::Bool(nc.values.clone()),
5570 }
5571 }
5572
5573 pub fn get_display(&self, i: usize) -> String {
5575 if self.is_null(i) {
5576 return "null".to_string();
5577 }
5578 match self {
5579 NullCol::Int(nc) => format!("{}", nc.values[i]),
5580 NullCol::Float(nc) => format!("{}", nc.values[i]),
5581 NullCol::Str(nc) => nc.values[i].clone(),
5582 NullCol::Bool(nc) => format!("{}", nc.values[i]),
5583 }
5584 }
5585
5586 pub fn null_of_type(type_name: &str, len: usize) -> Self {
5588 match type_name {
5589 "Int" => NullCol::Int(NullableColumn {
5590 values: vec![0i64; len],
5591 validity: BitMask::all_false(len),
5592 }),
5593 "Float" => NullCol::Float(NullableColumn {
5594 values: vec![0.0f64; len],
5595 validity: BitMask::all_false(len),
5596 }),
5597 "Bool" => NullCol::Bool(NullableColumn {
5598 values: vec![false; len],
5599 validity: BitMask::all_false(len),
5600 }),
5601 _ => NullCol::Str(NullableColumn {
5602 values: vec![String::new(); len],
5603 validity: BitMask::all_false(len),
5604 }),
5605 }
5606 }
5607
5608 pub fn gather(&self, indices: &[usize]) -> Self {
5610 match self {
5611 NullCol::Int(nc) => NullCol::Int(nc.gather(indices)),
5612 NullCol::Float(nc) => NullCol::Float(nc.gather(indices)),
5613 NullCol::Str(nc) => NullCol::Str(nc.gather(indices)),
5614 NullCol::Bool(nc) => NullCol::Bool(nc.gather(indices)),
5615 }
5616 }
5617}
5618
5619#[derive(Debug, Clone)]
5622pub struct NullableFrame {
5623 pub columns: Vec<(String, NullCol)>,
5624}
5625
5626impl NullableFrame {
5627 pub fn new() -> Self {
5629 Self { columns: Vec::new() }
5630 }
5631
5632 pub fn nrows(&self) -> usize {
5634 self.columns.first().map(|(_, c)| c.len()).unwrap_or(0)
5635 }
5636
5637 pub fn ncols(&self) -> usize {
5639 self.columns.len()
5640 }
5641
5642 pub fn column_names(&self) -> Vec<&str> {
5644 self.columns.iter().map(|(n, _)| n.as_str()).collect()
5645 }
5646
5647 pub fn get_column(&self, name: &str) -> Option<&NullCol> {
5649 self.columns.iter().find(|(n, _)| n == name).map(|(_, c)| c)
5650 }
5651
5652 pub fn to_dataframe_filled(&self) -> DataFrame {
5654 let cols: Vec<(String, Column)> = self.columns.iter()
5655 .map(|(n, c)| (n.clone(), c.to_column_filled()))
5656 .collect();
5657 DataFrame { columns: cols }
5659 }
5660
5661 pub fn to_tidy_frame_filled(&self) -> TidyFrame {
5663 TidyFrame::from_df(self.to_dataframe_filled())
5664 }
5665
5666 pub fn to_tidy_view_filled(&self) -> TidyView {
5668 TidyView::from_df(self.to_dataframe_filled())
5669 }
5670}
5671
5672impl Default for NullableFrame {
5673 fn default() -> Self { Self::new() }
5674}
5675
5676fn gather_column_nullable_null(col: &Column, indices: &[Option<usize>]) -> NullCol {
5681 if matches!(col, Column::CategoricalAdaptive(_)) {
5682 return gather_column_nullable_null(&col.to_legacy_categorical(), indices);
5683 }
5684 match col {
5685 Column::Int(v) => {
5686 let mut vals = Vec::with_capacity(indices.len());
5687 let mut valid = Vec::with_capacity(indices.len());
5688 for &idx in indices {
5689 match idx {
5690 Some(i) => { vals.push(v[i]); valid.push(true); }
5691 None => { vals.push(0); valid.push(false); }
5692 }
5693 }
5694 NullCol::Int(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5695 }
5696 Column::Float(v) => {
5697 let mut vals = Vec::with_capacity(indices.len());
5698 let mut valid = Vec::with_capacity(indices.len());
5699 for &idx in indices {
5700 match idx {
5701 Some(i) => { vals.push(v[i]); valid.push(true); }
5702 None => { vals.push(0.0); valid.push(false); }
5703 }
5704 }
5705 NullCol::Float(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5706 }
5707 Column::Str(v) => {
5708 let mut vals = Vec::with_capacity(indices.len());
5709 let mut valid = Vec::with_capacity(indices.len());
5710 for &idx in indices {
5711 match idx {
5712 Some(i) => { vals.push(v[i].clone()); valid.push(true); }
5713 None => { vals.push(String::new()); valid.push(false); }
5714 }
5715 }
5716 NullCol::Str(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5717 }
5718 Column::Bool(v) => {
5719 let mut vals = Vec::with_capacity(indices.len());
5720 let mut valid = Vec::with_capacity(indices.len());
5721 for &idx in indices {
5722 match idx {
5723 Some(i) => { vals.push(v[i]); valid.push(true); }
5724 None => { vals.push(false); valid.push(false); }
5725 }
5726 }
5727 NullCol::Bool(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5728 }
5729 Column::Categorical { levels, codes } => {
5730 let mut vals = Vec::with_capacity(indices.len());
5731 let mut valid = Vec::with_capacity(indices.len());
5732 for &idx in indices {
5733 match idx {
5734 Some(i) => { vals.push(levels[codes[i] as usize].clone()); valid.push(true); }
5735 None => { vals.push(String::new()); valid.push(false); }
5736 }
5737 }
5738 NullCol::Str(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5739 }
5740 Column::DateTime(v) => {
5741 let mut vals = Vec::with_capacity(indices.len());
5742 let mut valid = Vec::with_capacity(indices.len());
5743 for &idx in indices {
5744 match idx {
5745 Some(i) => { vals.push(v[i]); valid.push(true); }
5746 None => { vals.push(0); valid.push(false); }
5747 }
5748 }
5749 NullCol::Int(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5750 }
5751 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
5752 }
5753}
5754
5755pub type AcrossFn = Box<dyn Fn(&str, &Column) -> Result<Column, TidyError>>;
5762
5763pub struct AcrossTransform {
5765 pub fn_name: String,
5767 pub func: AcrossFn,
5769}
5770
5771impl AcrossTransform {
5772 pub fn new(fn_name: impl Into<String>, func: impl Fn(&str, &Column) -> Result<Column, TidyError> + 'static) -> Self {
5774 Self {
5775 fn_name: fn_name.into(),
5776 func: Box::new(func),
5777 }
5778 }
5779}
5780
5781pub struct AcrossSpec {
5783 pub cols: Vec<String>,
5785 pub transform: AcrossTransform,
5787 pub name_template: Option<String>,
5790}
5791
5792impl AcrossSpec {
5793 pub fn new(cols: impl IntoIterator<Item = impl Into<String>>, transform: AcrossTransform) -> Self {
5795 Self {
5796 cols: cols.into_iter().map(|c| c.into()).collect(),
5797 transform,
5798 name_template: None,
5799 }
5800 }
5801
5802 pub fn with_template(mut self, tmpl: impl Into<String>) -> Self {
5804 self.name_template = Some(tmpl.into());
5805 self
5806 }
5807
5808 pub fn output_name(&self, col_name: &str) -> String {
5810 match &self.name_template {
5811 Some(tmpl) => tmpl
5812 .replace("{col}", col_name)
5813 .replace("{fn}", &self.transform.fn_name),
5814 None => format!("{}_{}", col_name, self.transform.fn_name),
5815 }
5816 }
5817}
5818
5819#[derive(Debug, Clone)]
5823pub struct JoinSuffix {
5824 pub left: String,
5825 pub right: String,
5826}
5827
5828impl Default for JoinSuffix {
5829 fn default() -> Self {
5830 Self { left: ".x".into(), right: ".y".into() }
5831 }
5832}
5833
5834impl JoinSuffix {
5835 pub fn new(left: impl Into<String>, right: impl Into<String>) -> Self {
5837 Self { left: left.into(), right: right.into() }
5838 }
5839}
5840
5841fn join_types_compatible(left: &Column, right: &Column) -> bool {
5846 match (left, right) {
5847 (Column::Int(_), Column::Int(_)) => true,
5848 (Column::Float(_), Column::Float(_)) => true,
5849 (Column::Int(_), Column::Float(_)) => true,
5850 (Column::Float(_), Column::Int(_)) => true,
5851 (Column::Str(_), Column::Str(_)) => true,
5852 (Column::Bool(_), Column::Bool(_)) => true,
5853 _ => false,
5854 }
5855}
5856
5857impl TidyView {
5860
5861 pub fn pivot_longer(
5879 &self,
5880 value_cols: &[&str],
5881 names_to: &str,
5882 values_to: &str,
5883 ) -> Result<TidyFrame, TidyError> {
5884 if value_cols.is_empty() {
5885 return Err(TidyError::empty_selection("pivot_longer requires at least one value_col"));
5886 }
5887
5888 let mut seen_vc: Vec<&str> = Vec::new();
5890 let mut vc_indices: Vec<usize> = Vec::new();
5891 for &name in value_cols {
5892 if seen_vc.contains(&name) {
5893 return Err(TidyError::DuplicateColumn(name.to_string()));
5894 }
5895 seen_vc.push(name);
5896 let idx = self.base.columns.iter().position(|(n, _)| n == name)
5897 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
5898 vc_indices.push(idx);
5899 }
5900
5901 let first_type = self.base.columns[vc_indices[0]].1.type_name();
5903 for &idx in &vc_indices[1..] {
5904 let t = self.base.columns[idx].1.type_name();
5905 if t != first_type {
5906 return Err(TidyError::TypeMismatch {
5907 expected: first_type.to_string(),
5908 got: t.to_string(),
5909 });
5910 }
5911 }
5912
5913 let vc_set: std::collections::BTreeSet<usize> = vc_indices.iter().copied().collect();
5915 let id_col_indices: Vec<usize> = self.proj.indices().iter()
5916 .copied()
5917 .filter(|i| !vc_set.contains(i))
5918 .collect();
5919
5920 let visible_rows: Vec<usize> = self.mask.iter_indices().collect();
5921 let n_out = visible_rows.len() * value_cols.len();
5922
5923 let mut out_cols: Vec<(String, Column)> = Vec::new();
5925 for &id_idx in &id_col_indices {
5926 let (name, col_orig) = &self.base.columns[id_idx];
5927 let legacy_owned;
5928 let col: &Column = if matches!(col_orig, Column::CategoricalAdaptive(_)) {
5929 legacy_owned = col_orig.to_legacy_categorical();
5930 &legacy_owned
5931 } else {
5932 col_orig
5933 };
5934 let new_col = match col {
5935 Column::Int(v) => {
5936 let mut out = Vec::with_capacity(n_out);
5937 for &r in &visible_rows {
5938 for _ in 0..value_cols.len() { out.push(v[r]); }
5939 }
5940 Column::Int(out)
5941 }
5942 Column::Float(v) => {
5943 let mut out = Vec::with_capacity(n_out);
5944 for &r in &visible_rows {
5945 for _ in 0..value_cols.len() { out.push(v[r]); }
5946 }
5947 Column::Float(out)
5948 }
5949 Column::Str(v) => {
5950 let mut out = Vec::with_capacity(n_out);
5951 for &r in &visible_rows {
5952 for _ in 0..value_cols.len() { out.push(v[r].clone()); }
5953 }
5954 Column::Str(out)
5955 }
5956 Column::Bool(v) => {
5957 let mut out = Vec::with_capacity(n_out);
5958 for &r in &visible_rows {
5959 for _ in 0..value_cols.len() { out.push(v[r]); }
5960 }
5961 Column::Bool(out)
5962 }
5963 Column::Categorical { levels, codes } => {
5964 let mut out = Vec::with_capacity(n_out);
5965 for &r in &visible_rows {
5966 for _ in 0..value_cols.len() { out.push(codes[r]); }
5967 }
5968 Column::Categorical { levels: levels.clone(), codes: out }
5969 }
5970 Column::DateTime(v) => {
5971 let mut out = Vec::with_capacity(n_out);
5972 for &r in &visible_rows {
5973 for _ in 0..value_cols.len() { out.push(v[r]); }
5974 }
5975 Column::DateTime(out)
5976 }
5977 Column::CategoricalAdaptive(_) => unreachable!("converted via legacy_owned"),
5978 };
5979 out_cols.push((name.clone(), new_col));
5980 }
5981
5982 let names_col: Vec<String> = visible_rows.iter()
5984 .flat_map(|_| value_cols.iter().map(|s| s.to_string()))
5985 .collect();
5986 out_cols.push((names_to.to_string(), Column::Str(names_col)));
5987
5988 match &self.base.columns[vc_indices[0]].1 {
5990 Column::Int(_) => {
5991 let mut vals: Vec<i64> = Vec::with_capacity(n_out);
5992 for &r in &visible_rows {
5993 for &vci in &vc_indices {
5994 if let Column::Int(v) = &self.base.columns[vci].1 {
5995 vals.push(v[r]);
5996 }
5997 }
5998 }
5999 out_cols.push((values_to.to_string(), Column::Int(vals)));
6000 }
6001 Column::Float(_) => {
6002 let mut vals: Vec<f64> = Vec::with_capacity(n_out);
6003 for &r in &visible_rows {
6004 for &vci in &vc_indices {
6005 if let Column::Float(v) = &self.base.columns[vci].1 {
6006 vals.push(v[r]);
6007 }
6008 }
6009 }
6010 out_cols.push((values_to.to_string(), Column::Float(vals)));
6011 }
6012 Column::Str(_) => {
6013 let mut vals: Vec<String> = Vec::with_capacity(n_out);
6014 for &r in &visible_rows {
6015 for &vci in &vc_indices {
6016 if let Column::Str(v) = &self.base.columns[vci].1 {
6017 vals.push(v[r].clone());
6018 }
6019 }
6020 }
6021 out_cols.push((values_to.to_string(), Column::Str(vals)));
6022 }
6023 Column::Bool(_) => {
6024 let mut vals: Vec<bool> = Vec::with_capacity(n_out);
6025 for &r in &visible_rows {
6026 for &vci in &vc_indices {
6027 if let Column::Bool(v) = &self.base.columns[vci].1 {
6028 vals.push(v[r]);
6029 }
6030 }
6031 }
6032 out_cols.push((values_to.to_string(), Column::Bool(vals)));
6033 }
6034 Column::Categorical { .. } | Column::CategoricalAdaptive(_) | Column::DateTime(_) => {
6035 let mut vals: Vec<String> = Vec::with_capacity(n_out);
6037 for &r in &visible_rows {
6038 for &vci in &vc_indices {
6039 vals.push(self.base.columns[vci].1.get_display(r));
6040 }
6041 }
6042 out_cols.push((values_to.to_string(), Column::Str(vals)));
6043 }
6044 }
6045
6046 let df = DataFrame::from_columns(out_cols)
6047 .map_err(|e| TidyError::Internal(e.to_string()))?;
6048 Ok(TidyFrame::from_df(df))
6049 }
6050
6051 pub fn pivot_wider(
6068 &self,
6069 id_cols: &[&str],
6070 names_from: &str,
6071 values_from: &str,
6072 ) -> Result<NullableFrame, TidyError> {
6073 let _names_col_idx = self.base.columns.iter().position(|(n, _)| n == names_from)
6075 .ok_or_else(|| TidyError::ColumnNotFound(names_from.to_string()))?;
6076 let _values_col_idx = self.base.columns.iter().position(|(n, _)| n == values_from)
6077 .ok_or_else(|| TidyError::ColumnNotFound(values_from.to_string()))?;
6078 for &id in id_cols {
6079 let _ = self.base.columns.iter().position(|(n, _)| n == id)
6080 .ok_or_else(|| TidyError::ColumnNotFound(id.to_string()))?;
6081 }
6082
6083 let visible_rows: Vec<usize> = self.mask.iter_indices().collect();
6084
6085 let mut key_values: Vec<String> = Vec::new();
6087 for &r in &visible_rows {
6088 let kv = self.base.get_column(names_from).unwrap().get_display(r);
6089 if !key_values.contains(&kv) {
6090 key_values.push(kv);
6091 }
6092 }
6093
6094 let id_col_refs: Vec<&Column> = id_cols.iter()
6097 .map(|&name| self.base.get_column(name).unwrap())
6098 .collect();
6099
6100 let mut id_order: Vec<Vec<String>> = Vec::new(); let mut id_to_slot: Vec<(Vec<String>, usize)> = Vec::new(); for &r in &visible_rows {
6104 let id_key: Vec<String> = id_col_refs.iter()
6105 .map(|col| col.get_display(r))
6106 .collect();
6107 if !id_to_slot.iter().any(|(k, _)| k == &id_key) {
6108 let slot = id_order.len();
6109 id_order.push(id_key.clone());
6110 id_to_slot.push((id_key, slot));
6111 }
6112 }
6113
6114 let n_rows = id_order.len();
6115 let n_keys = key_values.len();
6116
6117 let mut cell_map: Vec<Vec<Option<usize>>> = vec![vec![None; n_keys]; n_rows];
6120
6121 for &r in &visible_rows {
6122 let id_key: Vec<String> = id_col_refs.iter()
6123 .map(|col| col.get_display(r))
6124 .collect();
6125 let id_slot = id_to_slot.iter().find(|(k, _)| k == &id_key).unwrap().1;
6126
6127 let kv = self.base.get_column(names_from).unwrap().get_display(r);
6128 let key_slot = key_values.iter().position(|v| v == &kv).unwrap();
6129
6130 if cell_map[id_slot][key_slot].is_some() {
6131 return Err(TidyError::duplicate_key(
6132 format!("({}, {})", id_key.join(", "), kv)
6133 ));
6134 }
6135 cell_map[id_slot][key_slot] = Some(r);
6136 }
6137
6138 let mut out_cols: Vec<(String, NullCol)> = Vec::new();
6140
6141 for (id_idx, &id_name) in id_cols.iter().enumerate() {
6143 let id_col = self.base.get_column(id_name).unwrap();
6144 let id_row_indices: Vec<usize> = id_order.iter()
6145 .map(|id_tup| {
6146 *visible_rows.iter().find(|&&r| {
6148 id_col_refs.iter().enumerate().all(|(i, col)| {
6149 col.get_display(r) == id_tup[i]
6150 })
6151 }).unwrap()
6152 })
6153 .collect();
6154 let gathered = gather_column(id_col, &id_row_indices);
6155 out_cols.push((id_name.to_string(), NullCol::from_column(&gathered)));
6156 let _ = id_idx;
6157 }
6158
6159 let values_col = self.base.get_column(values_from).unwrap();
6161 let val_type = values_col.type_name();
6162 for (key_slot, key_val) in key_values.iter().enumerate() {
6163 let row_opts: Vec<Option<usize>> = (0..n_rows)
6164 .map(|id_slot| cell_map[id_slot][key_slot])
6165 .collect();
6166 let null_col = gather_column_nullable_null(values_col, &row_opts);
6167 out_cols.push((key_val.clone(), null_col));
6168 let _ = val_type;
6169 }
6170
6171 Ok(NullableFrame { columns: out_cols })
6172 }
6173
6174 pub fn rename(&self, renames: &[(&str, &str)]) -> Result<TidyView, TidyError> {
6185 let mut rename_map: Vec<(usize, String)> = Vec::new();
6187 let col_names: Vec<&str> = self.base.columns.iter().map(|(n, _)| n.as_str()).collect();
6188
6189 for &(old, new) in renames {
6190 let idx = col_names.iter().position(|&n| n == old)
6191 .ok_or_else(|| TidyError::ColumnNotFound(old.to_string()))?;
6192 if old != new {
6194 let new_name_exists = col_names.iter().any(|&n| n == new)
6195 || rename_map.iter().any(|(_, n)| n == new);
6196 if new_name_exists {
6197 return Err(TidyError::DuplicateColumn(new.to_string()));
6198 }
6199 }
6200 rename_map.push((idx, new.to_string()));
6201 }
6202
6203 let mut new_cols: Vec<(String, Column)> = Vec::new();
6205 for (i, (name, col)) in self.base.columns.iter().enumerate() {
6206 let new_name = rename_map.iter()
6207 .find(|(idx, _)| *idx == i)
6208 .map(|(_, n)| n.clone())
6209 .unwrap_or_else(|| name.clone());
6210 new_cols.push((new_name, col.clone()));
6211 }
6212
6213 let new_base = DataFrame { columns: new_cols };
6214 Ok(TidyView {
6215 base: Rc::new(new_base),
6216 mask: self.mask.clone(),
6217 proj: self.proj.clone(),
6218 })
6219 }
6220
6221 pub fn relocate(&self, cols: &[&str], position: RelocatePos<'_>) -> Result<TidyView, TidyError> {
6236 let proj_names: Vec<&str> = self.column_names();
6238 for &name in cols {
6239 if !proj_names.contains(&name) {
6240 return Err(TidyError::ColumnNotFound(name.to_string()));
6241 }
6242 }
6243
6244 let moved_set: std::collections::BTreeSet<&str> = cols.iter().copied().collect();
6246 let remaining: Vec<&str> = proj_names.iter()
6247 .copied()
6248 .filter(|n| !moved_set.contains(n))
6249 .collect();
6250
6251 let new_order: Vec<&str> = match &position {
6252 RelocatePos::Front => {
6253 let mut v: Vec<&str> = cols.to_vec();
6254 v.extend_from_slice(&remaining);
6255 v
6256 }
6257 RelocatePos::Back => {
6258 let mut v = remaining.clone();
6259 v.extend_from_slice(cols);
6260 v
6261 }
6262 RelocatePos::Before(anchor) => {
6263 if !proj_names.contains(anchor) {
6264 return Err(TidyError::ColumnNotFound(anchor.to_string()));
6265 }
6266 let mut v = Vec::new();
6267 for &n in &remaining {
6268 if n == *anchor {
6269 v.extend_from_slice(cols);
6270 }
6271 v.push(n);
6272 }
6273 v
6274 }
6275 RelocatePos::After(anchor) => {
6276 if !proj_names.contains(anchor) {
6277 return Err(TidyError::ColumnNotFound(anchor.to_string()));
6278 }
6279 let mut v = Vec::new();
6280 for &n in &remaining {
6281 v.push(n);
6282 if n == *anchor {
6283 v.extend_from_slice(cols);
6284 }
6285 }
6286 v
6287 }
6288 };
6289
6290 let new_indices: Vec<usize> = new_order.iter()
6292 .map(|&name| {
6293 self.base.columns.iter().position(|(n, _)| n == name).unwrap()
6294 })
6295 .collect();
6296
6297 Ok(TidyView {
6298 base: Rc::clone(&self.base),
6299 mask: self.mask.clone(),
6300 proj: ProjectionMap::from_indices(new_indices),
6301 })
6302 }
6303
6304 pub fn drop_cols(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
6314 let proj_names = self.column_names();
6315 for &name in cols {
6316 if !proj_names.contains(&name) {
6317 return Err(TidyError::ColumnNotFound(name.to_string()));
6318 }
6319 }
6320 let drop_set: std::collections::BTreeSet<&str> = cols.iter().copied().collect();
6321 let keep: Vec<&str> = proj_names.iter()
6322 .copied()
6323 .filter(|n| !drop_set.contains(n))
6324 .collect();
6325 self.select(&keep)
6326 }
6327
6328 pub fn bind_rows(&self, other: &TidyView) -> Result<TidyFrame, TidyError> {
6339 let self_names = self.column_names();
6340 let other_names = other.column_names();
6341
6342 if self_names != other_names {
6343 return Err(TidyError::schema_mismatch(format!(
6344 "left has {:?}, right has {:?}",
6345 self_names, other_names
6346 )));
6347 }
6348
6349 let self_rows: Vec<usize> = self.mask.iter_indices().collect();
6350 let other_rows: Vec<usize> = other.mask.iter_indices().collect();
6351
6352 let mut out_cols: Vec<(String, Column)> = Vec::new();
6353 for &ci in self.proj.indices() {
6354 let (name, self_col) = &self.base.columns[ci];
6355 let other_ci = other.proj.indices().iter().copied()
6357 .find(|&i| other.base.columns[i].0 == *name)
6358 .ok_or_else(|| TidyError::ColumnNotFound(name.clone()))?;
6359 let other_col = &other.base.columns[other_ci].1;
6360
6361 let col = concat_columns(self_col, &self_rows, other_col, &other_rows)?;
6362 out_cols.push((name.clone(), col));
6363 }
6364
6365 let df = DataFrame::from_columns(out_cols)
6366 .map_err(|e| TidyError::Internal(e.to_string()))?;
6367 Ok(TidyFrame::from_df(df))
6368 }
6369
6370 pub fn bind_cols(&self, other: &TidyView) -> Result<TidyFrame, TidyError> {
6381 let self_nrows = self.nrows();
6382 let other_nrows = other.nrows();
6383
6384 if self_nrows != other_nrows {
6385 return Err(TidyError::LengthMismatch {
6386 expected: self_nrows,
6387 got: other_nrows,
6388 });
6389 }
6390
6391 let self_names = self.column_names();
6392 let other_names = other.column_names();
6393 for name in &other_names {
6394 if self_names.contains(name) {
6395 return Err(TidyError::DuplicateColumn(name.to_string()));
6396 }
6397 }
6398
6399 let self_rows: Vec<usize> = self.mask.iter_indices().collect();
6400 let other_rows: Vec<usize> = other.mask.iter_indices().collect();
6401
6402 let mut out_cols: Vec<(String, Column)> = Vec::new();
6403
6404 for &ci in self.proj.indices() {
6405 let (name, col) = &self.base.columns[ci];
6406 out_cols.push((name.clone(), gather_column(col, &self_rows)));
6407 }
6408 for &ci in other.proj.indices() {
6409 let (name, col) = &other.base.columns[ci];
6410 out_cols.push((name.clone(), gather_column(col, &other_rows)));
6411 }
6412
6413 let df = DataFrame::from_columns(out_cols)
6414 .map_err(|e| TidyError::Internal(e.to_string()))?;
6415 Ok(TidyFrame::from_df(df))
6416 }
6417
6418 pub fn mutate_across(&self, specs: &[AcrossSpec]) -> Result<TidyFrame, TidyError> {
6428 let base_df = self.materialize()?;
6430
6431 let mut output_names: Vec<String> = base_df.column_names()
6433 .into_iter().map(|s| s.to_string()).collect();
6434 let mut extra_cols: Vec<(String, Column)> = Vec::new();
6435
6436 for spec in specs {
6437 for col_name in &spec.cols {
6438 let out_name = spec.output_name(col_name);
6439 if output_names.contains(&out_name) && !base_df.column_names().contains(&out_name.as_str()) {
6441 return Err(TidyError::DuplicateColumn(out_name));
6442 }
6443 let col = base_df.get_column(col_name)
6444 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
6445 let new_col = (spec.transform.func)(col_name, col)?;
6446 if !base_df.column_names().contains(&out_name.as_str()) {
6448 output_names.push(out_name.clone());
6449 }
6450 extra_cols.push((out_name, new_col));
6451 }
6452 }
6453
6454 let mut col_map: indexmap_simple::IndexMap = indexmap_simple::IndexMap::from_df(&base_df);
6456 for (name, col) in extra_cols {
6457 col_map.insert(name, col);
6458 }
6459 let df = col_map.into_df()
6460 .map_err(|e| TidyError::Internal(e.to_string()))?;
6461 Ok(TidyFrame::from_df(df))
6462 }
6463
6464 pub fn right_join(
6472 &self,
6473 right: &TidyView,
6474 on: &[(&str, &str)],
6475 suffix: &JoinSuffix,
6476 ) -> Result<NullableFrame, TidyError> {
6477 validate_join_key_types(self, right, on)?;
6479 let swapped_on: Vec<(&str, &str)> = on.iter().map(|&(l, r)| (r, l)).collect();
6481 let (right_rows, left_rows_opt) =
6482 join_match_rows_optional(right, self, &swapped_on, JoinKind::Left)?;
6483 build_right_join_frame(self, right, &left_rows_opt, &right_rows, on, suffix)
6484 }
6485
6486 pub fn full_join(
6492 &self,
6493 right: &TidyView,
6494 on: &[(&str, &str)],
6495 suffix: &JoinSuffix,
6496 ) -> Result<NullableFrame, TidyError> {
6497 validate_join_key_types(self, right, on)?;
6498 build_full_join_frame(self, right, on, suffix)
6499 }
6500
6501 pub fn inner_join_typed(
6509 &self,
6510 right: &TidyView,
6511 on: &[(&str, &str)],
6512 suffix: &JoinSuffix,
6513 ) -> Result<TidyFrame, TidyError> {
6514 validate_join_key_types(self, right, on)?;
6515 let (left_rows, right_rows) = join_match_rows(self, right, on, JoinKind::Inner)?;
6516 build_join_frame_with_suffix(self, right, &left_rows, &right_rows, on, suffix, false)
6517 }
6518
6519 pub fn left_join_typed(
6523 &self,
6524 right: &TidyView,
6525 on: &[(&str, &str)],
6526 suffix: &JoinSuffix,
6527 ) -> Result<TidyFrame, TidyError> {
6528 validate_join_key_types(self, right, on)?;
6529 let (left_rows, right_rows_opt) =
6530 join_match_rows_optional(self, right, on, JoinKind::Left)?;
6531 build_left_join_frame_with_suffix(self, right, &left_rows, &right_rows_opt, on, suffix)
6532 }
6533}
6534
6535pub enum RelocatePos<'a> {
6539 Front,
6541 Back,
6543 Before(&'a str),
6545 After(&'a str),
6547}
6548
6549fn concat_columns(
6552 left: &Column,
6553 left_rows: &[usize],
6554 right: &Column,
6555 right_rows: &[usize],
6556) -> Result<Column, TidyError> {
6557 match (left, right) {
6558 (Column::Int(lv), Column::Int(rv)) => {
6559 let mut out: Vec<i64> = left_rows.iter().map(|&i| lv[i]).collect();
6560 out.extend(right_rows.iter().map(|&i| rv[i]));
6561 Ok(Column::Int(out))
6562 }
6563 (Column::Float(lv), Column::Float(rv)) => {
6564 let mut out: Vec<f64> = left_rows.iter().map(|&i| lv[i]).collect();
6565 out.extend(right_rows.iter().map(|&i| rv[i]));
6566 Ok(Column::Float(out))
6567 }
6568 (Column::Int(lv), Column::Float(rv)) => {
6569 let mut out: Vec<f64> = left_rows.iter().map(|&i| lv[i] as f64).collect();
6570 out.extend(right_rows.iter().map(|&i| rv[i]));
6571 Ok(Column::Float(out))
6572 }
6573 (Column::Float(lv), Column::Int(rv)) => {
6574 let mut out: Vec<f64> = left_rows.iter().map(|&i| lv[i]).collect();
6575 out.extend(right_rows.iter().map(|&i| rv[i] as f64));
6576 Ok(Column::Float(out))
6577 }
6578 (Column::Str(lv), Column::Str(rv)) => {
6579 let mut out: Vec<String> = left_rows.iter().map(|&i| lv[i].clone()).collect();
6580 out.extend(right_rows.iter().map(|&i| rv[i].clone()));
6581 Ok(Column::Str(out))
6582 }
6583 (Column::Bool(lv), Column::Bool(rv)) => {
6584 let mut out: Vec<bool> = left_rows.iter().map(|&i| lv[i]).collect();
6585 out.extend(right_rows.iter().map(|&i| rv[i]));
6586 Ok(Column::Bool(out))
6587 }
6588 _ => Err(TidyError::schema_mismatch(format!(
6589 "type mismatch in bind_rows: {} vs {}",
6590 left.type_name(), right.type_name()
6591 ))),
6592 }
6593}
6594
6595fn validate_join_key_types(
6598 left: &TidyView,
6599 right: &TidyView,
6600 on: &[(&str, &str)],
6601) -> Result<(), TidyError> {
6602 for &(lk, rk) in on {
6603 let l_col = left.base.get_column(lk)
6604 .ok_or_else(|| TidyError::ColumnNotFound(lk.to_string()))?;
6605 let r_col = right.base.get_column(rk)
6606 .ok_or_else(|| TidyError::ColumnNotFound(rk.to_string()))?;
6607 if !join_types_compatible(l_col, r_col) {
6608 return Err(TidyError::join_type_mismatch(lk, l_col.type_name(), r_col.type_name()));
6609 }
6610 }
6611 Ok(())
6612}
6613
6614fn build_join_frame_with_suffix(
6617 left: &TidyView,
6618 right: &TidyView,
6619 left_rows: &[usize],
6620 right_rows: &[usize],
6621 on: &[(&str, &str)],
6622 suffix: &JoinSuffix,
6623 _include_unmatched: bool,
6624) -> Result<TidyFrame, TidyError> {
6625 let right_key_names: std::collections::BTreeSet<&str> =
6626 on.iter().map(|(_, rk)| *rk).collect();
6627
6628 let left_col_names: Vec<String> = left.proj.indices().iter()
6630 .map(|&ci| left.base.columns[ci].0.clone())
6631 .collect();
6632
6633 let mut columns: Vec<(String, Column)> = Vec::new();
6634
6635 for &ci in left.proj.indices() {
6637 let (name, col) = &left.base.columns[ci];
6638 columns.push((name.clone(), gather_column(col, left_rows)));
6639 }
6640
6641 for &ci in right.proj.indices() {
6643 let (name, col) = &right.base.columns[ci];
6644 if right_key_names.contains(name.as_str()) {
6645 continue; }
6647 let out_name = if left_col_names.contains(name) {
6648 format!("{}{}", name, suffix.right)
6649 } else {
6650 name.clone()
6651 };
6652 if left_col_names.contains(name) {
6654 let left_pos = columns.iter().position(|(n, _)| n == name);
6656 if let Some(pos) = left_pos {
6657 let entry = &mut columns[pos];
6658 entry.0 = format!("{}{}", entry.0, suffix.left);
6659 }
6660 }
6661 columns.push((out_name, gather_column(col, right_rows)));
6662 }
6663
6664 let df = DataFrame::from_columns(columns)
6665 .map_err(|e| TidyError::Internal(e.to_string()))?;
6666 Ok(TidyFrame::from_df(df))
6667}
6668
6669fn build_left_join_frame_with_suffix(
6670 left: &TidyView,
6671 right: &TidyView,
6672 left_rows: &[usize],
6673 right_rows_opt: &[Option<usize>],
6674 on: &[(&str, &str)],
6675 suffix: &JoinSuffix,
6676) -> Result<TidyFrame, TidyError> {
6677 let right_key_names: std::collections::BTreeSet<&str> =
6678 on.iter().map(|(_, rk)| *rk).collect();
6679
6680 let left_col_names: Vec<String> = left.proj.indices().iter()
6681 .map(|&ci| left.base.columns[ci].0.clone())
6682 .collect();
6683
6684 let mut columns: Vec<(String, Column)> = Vec::new();
6685
6686 for &ci in left.proj.indices() {
6688 let (name, col) = &left.base.columns[ci];
6689 columns.push((name.clone(), gather_column(col, left_rows)));
6690 }
6691
6692 for &ci in right.proj.indices() {
6694 let (name, col) = &right.base.columns[ci];
6695 if right_key_names.contains(name.as_str()) { continue; }
6696 let out_name = if left_col_names.contains(name) {
6697 let left_pos = columns.iter().position(|(n, _)| n == name);
6699 if let Some(pos) = left_pos {
6700 columns[pos].0 = format!("{}{}", name, suffix.left);
6701 }
6702 format!("{}{}", name, suffix.right)
6703 } else {
6704 name.clone()
6705 };
6706 let new_col = gather_column_nullable(col, right_rows_opt);
6707 columns.push((out_name, new_col));
6708 }
6709
6710 let df = DataFrame::from_columns(columns)
6711 .map_err(|e| TidyError::Internal(e.to_string()))?;
6712 Ok(TidyFrame::from_df(df))
6713}
6714
6715fn build_right_join_frame(
6716 left: &TidyView,
6717 right: &TidyView,
6718 left_rows_opt: &[Option<usize>],
6719 right_rows: &[usize],
6720 on: &[(&str, &str)],
6721 suffix: &JoinSuffix,
6722) -> Result<NullableFrame, TidyError> {
6723 let right_key_names: std::collections::BTreeSet<&str> =
6724 on.iter().map(|(_, rk)| *rk).collect();
6725 let left_key_names: std::collections::BTreeSet<&str> =
6726 on.iter().map(|(lk, _)| *lk).collect();
6727
6728 let right_col_names: Vec<String> = right.proj.indices().iter()
6729 .map(|&ci| right.base.columns[ci].0.clone())
6730 .collect();
6731
6732 let mut columns: Vec<(String, NullCol)> = Vec::new();
6733
6734 for &ci in left.proj.indices() {
6736 let (name, col) = &left.base.columns[ci];
6737 if left_key_names.contains(name.as_str()) { continue; }
6738 let out_name = if right_col_names.contains(name) {
6739 format!("{}{}", name, suffix.left)
6740 } else {
6741 name.clone()
6742 };
6743 let null_col = gather_column_nullable_null(col, left_rows_opt);
6744 columns.push((out_name, null_col));
6745 }
6746
6747 for &ci in right.proj.indices() {
6749 let (name, col) = &right.base.columns[ci];
6750 let out_name = if !right_key_names.contains(name.as_str())
6751 && left.proj.indices().iter().any(|&lci| left.base.columns[lci].0 == *name)
6752 && !left_key_names.contains(name.as_str())
6753 {
6754 format!("{}{}", name, suffix.right)
6755 } else {
6756 name.clone()
6757 };
6758 columns.push((out_name, NullCol::from_column(&gather_column(col, right_rows))));
6759 }
6760
6761 Ok(NullableFrame { columns })
6762}
6763
6764fn build_full_join_frame(
6765 left: &TidyView,
6766 right: &TidyView,
6767 on: &[(&str, &str)],
6768 suffix: &JoinSuffix,
6769) -> Result<NullableFrame, TidyError> {
6770 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
6771 let lookup = build_right_lookup(right, &right_key_cols);
6772
6773 let mut out_left_rows: Vec<usize> = Vec::new();
6775 let mut out_right_rows: Vec<Option<usize>> = Vec::new();
6776 let mut right_matched: Vec<bool> = vec![false; right.base.nrows()];
6777
6778 for l_row in left.mask.iter_indices() {
6779 let key = row_key(&left.base, &left_key_cols, l_row);
6780 let matches = find_matches(&lookup, &key);
6781 if matches.is_empty() {
6782 out_left_rows.push(l_row);
6783 out_right_rows.push(None);
6784 } else {
6785 for r_row in &matches {
6786 out_left_rows.push(l_row);
6787 out_right_rows.push(Some(*r_row));
6788 if *r_row < right_matched.len() {
6789 right_matched[*r_row] = true;
6790 }
6791 }
6792 }
6793 }
6794
6795 let mut unmatched_right: Vec<usize> = Vec::new();
6797 for r_row in right.mask.iter_indices() {
6798 if r_row < right_matched.len() && !right_matched[r_row] {
6799 unmatched_right.push(r_row);
6800 }
6801 }
6802
6803 let right_key_names: std::collections::BTreeSet<&str> =
6804 on.iter().map(|(_, rk)| *rk).collect();
6805 let left_key_names: std::collections::BTreeSet<&str> =
6806 on.iter().map(|(lk, _)| *lk).collect();
6807 let right_col_names: Vec<String> = right.proj.indices().iter()
6808 .map(|&ci| right.base.columns[ci].0.clone())
6809 .collect();
6810
6811 let n_matched = out_left_rows.len();
6812 let n_unmatched_r = unmatched_right.len();
6813 let total = n_matched + n_unmatched_r;
6814
6815 let mut columns: Vec<(String, NullCol)> = Vec::new();
6816
6817 for &ci in left.proj.indices() {
6819 let (name, col) = &left.base.columns[ci];
6820 let out_name = if right_col_names.contains(name) && !left_key_names.contains(name.as_str()) {
6821 format!("{}{}", name, suffix.left)
6822 } else {
6823 name.clone()
6824 };
6825 let mut matched_vals: Vec<Option<usize>> = out_left_rows.iter()
6826 .map(|&r| Some(r))
6827 .collect();
6828 matched_vals.extend(std::iter::repeat(None).take(n_unmatched_r));
6830 assert_eq!(matched_vals.len(), total);
6831 columns.push((out_name, gather_column_nullable_null(col, &matched_vals)));
6832 }
6833
6834 for &ci in right.proj.indices() {
6836 let (name, col) = &right.base.columns[ci];
6837 if right_key_names.contains(name.as_str()) { continue; }
6838 let out_name = if left.proj.indices().iter().any(|&lci| left.base.columns[lci].0 == *name)
6839 && !left_key_names.contains(name.as_str())
6840 {
6841 format!("{}{}", name, suffix.right)
6842 } else {
6843 name.clone()
6844 };
6845
6846 let mut row_opts: Vec<Option<usize>> = out_right_rows.clone();
6847 row_opts.extend(unmatched_right.iter().map(|&r| Some(r)));
6849 assert_eq!(row_opts.len(), total);
6850 columns.push((out_name, gather_column_nullable_null(col, &row_opts)));
6851 }
6852
6853 Ok(NullableFrame { columns })
6859}
6860
6861impl GroupedTidyView {
6864
6865 pub fn mutate_across(&self, specs: &[AcrossSpec]) -> Result<TidyFrame, TidyError> {
6871 self.view.mutate_across(specs)
6874 }
6875
6876 pub fn summarise_across(&self, specs: &[AcrossSpec]) -> Result<TidyFrame, TidyError> {
6883 let n_groups = self.ngroups();
6884
6885 let key_names = &self.index.key_names;
6887 let mut out_cols: Vec<(String, Column)> = Vec::new();
6888
6889 for ki in 0..key_names.len() {
6891 let col_vals: Vec<String> = self.index.groups.iter()
6892 .map(|g| g.key_values[ki].clone())
6893 .collect();
6894 out_cols.push((key_names[ki].clone(), Column::Str(col_vals)));
6895 }
6896
6897 for spec in specs {
6899 for col_name in &spec.cols {
6900 let out_name = spec.output_name(col_name);
6901 if out_cols.iter().any(|(n, _)| n == &out_name) {
6903 return Err(TidyError::DuplicateColumn(out_name));
6904 }
6905
6906 let base_col = self.view.base.get_column(col_name)
6907 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
6908
6909 let mut agg_floats: Vec<f64> = Vec::with_capacity(n_groups);
6911 for group in &self.index.groups {
6912 let group_col = gather_column(base_col, &group.row_indices);
6913 let result_col = (spec.transform.func)(col_name, &group_col)?;
6914 if result_col.len() != 1 {
6915 return Err(TidyError::LengthMismatch {
6916 expected: 1,
6917 got: result_col.len(),
6918 });
6919 }
6920 let v = match &result_col {
6921 Column::Float(v) => v[0],
6922 Column::Int(v) => v[0] as f64,
6923 _ => return Err(TidyError::TypeMismatch {
6924 expected: "Float or Int".into(),
6925 got: result_col.type_name().into(),
6926 }),
6927 };
6928 agg_floats.push(v);
6929 }
6930 out_cols.push((out_name, Column::Float(agg_floats)));
6931 }
6932 }
6933
6934 let df = DataFrame::from_columns(out_cols)
6935 .map_err(|e| TidyError::Internal(e.to_string()))?;
6936 Ok(TidyFrame::from_df(df))
6937 }
6938}
6939
6940mod indexmap_simple {
6945 use super::{Column, DataFrame, DataError};
6946
6947 pub struct IndexMap {
6948 entries: Vec<(String, Column)>,
6949 }
6950
6951 impl IndexMap {
6952 pub fn from_df(df: &DataFrame) -> Self {
6953 Self {
6954 entries: df.columns.iter()
6955 .map(|(n, c)| (n.clone(), c.clone()))
6956 .collect(),
6957 }
6958 }
6959
6960 pub fn insert(&mut self, name: String, col: Column) {
6962 if let Some(pos) = self.entries.iter().position(|(n, _)| n == &name) {
6963 self.entries[pos] = (name, col);
6964 } else {
6965 self.entries.push((name, col));
6966 }
6967 }
6968
6969 pub fn into_df(self) -> Result<DataFrame, DataError> {
6970 DataFrame::from_columns(self.entries)
6971 }
6972 }
6973}
6974
6975impl GroupIndex {
6992 pub fn build_fast<I: IntoIterator<Item = usize>>(
7008 base: &DataFrame,
7009 key_col_indices: &[usize],
7010 visible_rows: I,
7011 key_names: Vec<String>,
7012 ) -> Self {
7013 use std::collections::BTreeMap;
7014
7015 if let Some(cat_keys) = collect_categorical_keys(base, key_col_indices) {
7018 return build_groupindex_categorical(cat_keys, visible_rows, key_names);
7019 }
7020
7021 let mut groups: Vec<GroupMeta> = Vec::new();
7022 let mut key_to_slot: BTreeMap<Vec<String>, usize> = BTreeMap::new();
7023
7024 for row in visible_rows {
7025 let key: Vec<String> = key_col_indices.iter()
7026 .map(|&ci| base.columns[ci].1.get_display(row))
7027 .collect();
7028
7029 if let Some(&slot) = key_to_slot.get(&key) {
7030 groups[slot].row_indices.push(row);
7031 } else {
7032 let slot = groups.len();
7033 let key_values = key.clone();
7034 key_to_slot.insert(key, slot);
7035 groups.push(GroupMeta { key_values, row_indices: vec![row] });
7036 }
7037 }
7038
7039 GroupIndex { groups, key_names }
7040 }
7041}
7042
7043pub(crate) struct CategoricalKeys<'a> {
7062 pub(crate) levels: Vec<&'a [String]>,
7064 pub(crate) codes: Vec<&'a [u32]>,
7066}
7067
7068pub(crate) fn collect_categorical_keys<'a>(
7072 base: &'a DataFrame,
7073 key_col_indices: &[usize],
7074) -> Option<CategoricalKeys<'a>> {
7075 if key_col_indices.is_empty() {
7076 return None;
7077 }
7078 let mut levels: Vec<&[String]> = Vec::with_capacity(key_col_indices.len());
7079 let mut codes: Vec<&[u32]> = Vec::with_capacity(key_col_indices.len());
7080 for &ci in key_col_indices {
7081 match &base.columns[ci].1 {
7082 Column::Categorical { levels: l, codes: c } => {
7083 levels.push(l.as_slice());
7084 codes.push(c.as_slice());
7085 }
7086 _ => return None,
7087 }
7088 }
7089 Some(CategoricalKeys { levels, codes })
7090}
7091
7092fn build_groupindex_categorical<I: IntoIterator<Item = usize>>(
7095 cat: CategoricalKeys<'_>,
7096 visible_rows: I,
7097 key_names: Vec<String>,
7098) -> GroupIndex {
7099 use std::collections::BTreeMap;
7100 let nkeys = cat.codes.len();
7101 let mut groups: Vec<GroupMeta> = Vec::new();
7102 let mut key_to_slot: BTreeMap<Vec<u32>, usize> = BTreeMap::new();
7103 let mut key_buf: Vec<u32> = Vec::with_capacity(nkeys);
7104
7105 for row in visible_rows {
7106 key_buf.clear();
7107 for c in &cat.codes {
7108 key_buf.push(c[row]);
7109 }
7110 if let Some(&slot) = key_to_slot.get(&key_buf) {
7111 groups[slot].row_indices.push(row);
7112 } else {
7113 let key_values: Vec<String> = (0..nkeys)
7115 .map(|i| cat.levels[i][key_buf[i] as usize].clone())
7116 .collect();
7117 let slot = groups.len();
7118 key_to_slot.insert(key_buf.clone(), slot);
7119 groups.push(GroupMeta { key_values, row_indices: vec![row] });
7120 }
7121 }
7122
7123 GroupIndex { groups, key_names }
7124}
7125
7126impl TidyView {
7129 pub fn group_by_fast(&self, keys: &[&str]) -> Result<GroupedTidyView, TidyError> {
7134 let mut key_col_indices = Vec::with_capacity(keys.len());
7135 for &key in keys {
7136 let idx = self.base.columns.iter().position(|(n, _)| n == key)
7137 .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
7138 key_col_indices.push(idx);
7139 }
7140 let key_names: Vec<String> = keys.iter().map(|s| s.to_string()).collect();
7141 let index = GroupIndex::build_fast(&self.base, &key_col_indices, self.mask.iter_indices(), key_names);
7142 Ok(GroupedTidyView { view: self.clone(), index })
7143 }
7144}
7145
7146#[derive(Debug, Clone)]
7178pub enum StreamingAgg {
7179 Count,
7181 Sum(String),
7183 Mean(String),
7185 Min(String),
7187 Max(String),
7189 Var(String),
7191 Sd(String),
7193}
7194
7195#[derive(Debug, Clone)]
7197enum AccState {
7198 Count {
7199 n: u64,
7200 },
7201 Sum {
7202 sum: f64,
7204 c: f64,
7205 },
7206 Mean {
7207 sum: f64,
7209 c: f64,
7210 n: u64,
7211 },
7212 Min {
7213 cur: f64,
7214 any: bool,
7215 },
7216 Max {
7217 cur: f64,
7218 any: bool,
7219 },
7220 Welford {
7222 n: u64,
7223 mean: f64,
7224 m2: f64,
7225 },
7226}
7227
7228impl AccState {
7229 fn from_agg(agg: &StreamingAgg) -> Self {
7230 match agg {
7231 StreamingAgg::Count => AccState::Count { n: 0 },
7232 StreamingAgg::Sum(_) => AccState::Sum { sum: 0.0, c: 0.0 },
7233 StreamingAgg::Mean(_) => AccState::Mean { sum: 0.0, c: 0.0, n: 0 },
7234 StreamingAgg::Min(_) => AccState::Min {
7235 cur: f64::INFINITY,
7236 any: false,
7237 },
7238 StreamingAgg::Max(_) => AccState::Max {
7239 cur: f64::NEG_INFINITY,
7240 any: false,
7241 },
7242 StreamingAgg::Var(_) | StreamingAgg::Sd(_) => AccState::Welford {
7243 n: 0,
7244 mean: 0.0,
7245 m2: 0.0,
7246 },
7247 }
7248 }
7249
7250 fn update(&mut self, x: f64) {
7251 match self {
7252 AccState::Count { n } => *n += 1,
7253 AccState::Sum { sum, c } => {
7254 let y = x - *c;
7256 let t = *sum + y;
7257 *c = (t - *sum) - y;
7258 *sum = t;
7259 }
7260 AccState::Mean { sum, c, n } => {
7261 let y = x - *c;
7262 let t = *sum + y;
7263 *c = (t - *sum) - y;
7264 *sum = t;
7265 *n += 1;
7266 }
7267 AccState::Min { cur, any } => {
7268 if !x.is_nan() {
7269 if !*any || x < *cur {
7270 *cur = x;
7271 *any = true;
7272 }
7273 }
7274 }
7275 AccState::Max { cur, any } => {
7276 if !x.is_nan() {
7277 if !*any || x > *cur {
7278 *cur = x;
7279 *any = true;
7280 }
7281 }
7282 }
7283 AccState::Welford { n, mean, m2 } => {
7284 *n += 1;
7286 let delta = x - *mean;
7287 *mean += delta / (*n as f64);
7288 let delta2 = x - *mean;
7289 *m2 += delta * delta2;
7290 }
7291 }
7292 }
7293
7294 fn finalize(&self, agg: &StreamingAgg) -> f64 {
7295 match (self, agg) {
7296 (AccState::Count { n }, StreamingAgg::Count) => *n as f64,
7297 (AccState::Sum { sum, .. }, StreamingAgg::Sum(_)) => *sum,
7298 (AccState::Mean { sum, n, .. }, StreamingAgg::Mean(_)) => {
7299 if *n == 0 {
7300 f64::NAN
7301 } else {
7302 *sum / (*n as f64)
7303 }
7304 }
7305 (AccState::Min { cur, any }, StreamingAgg::Min(_)) => {
7306 if *any {
7307 *cur
7308 } else {
7309 f64::NAN
7310 }
7311 }
7312 (AccState::Max { cur, any }, StreamingAgg::Max(_)) => {
7313 if *any {
7314 *cur
7315 } else {
7316 f64::NAN
7317 }
7318 }
7319 (AccState::Welford { n, m2, .. }, StreamingAgg::Var(_)) => {
7320 if *n < 2 {
7321 f64::NAN
7322 } else {
7323 *m2 / ((*n - 1) as f64)
7324 }
7325 }
7326 (AccState::Welford { n, m2, .. }, StreamingAgg::Sd(_)) => {
7327 if *n < 2 {
7328 f64::NAN
7329 } else {
7330 (*m2 / ((*n - 1) as f64)).sqrt()
7331 }
7332 }
7333 _ => f64::NAN,
7334 }
7335 }
7336}
7337
7338fn row_as_f64(col: &Column, row: usize) -> f64 {
7340 match col {
7341 Column::Float(v) => v[row],
7342 Column::Int(v) => v[row] as f64,
7343 _ => f64::NAN,
7344 }
7345}
7346
7347impl TidyView {
7348 pub fn summarise_streaming(
7356 &self,
7357 keys: &[&str],
7358 aggs: &[(&str, StreamingAgg)],
7359 ) -> Result<TidyFrame, TidyError> {
7360 {
7362 let mut seen = std::collections::BTreeSet::new();
7363 for &(name, _) in aggs {
7364 if !seen.insert(name) {
7365 return Err(TidyError::DuplicateColumn(name.to_string()));
7366 }
7367 }
7368 for &k in keys {
7369 if seen.contains(k) {
7370 return Err(TidyError::DuplicateColumn(k.to_string()));
7371 }
7372 }
7373 }
7374
7375 let mut key_col_indices = Vec::with_capacity(keys.len());
7377 for &key in keys {
7378 let idx = self
7379 .base
7380 .columns
7381 .iter()
7382 .position(|(n, _)| n == key)
7383 .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
7384 key_col_indices.push(idx);
7385 }
7386
7387 let agg_col_indices: Vec<Option<usize>> = aggs
7389 .iter()
7390 .map(|(_, agg)| match agg {
7391 StreamingAgg::Count => None,
7392 StreamingAgg::Sum(c)
7393 | StreamingAgg::Mean(c)
7394 | StreamingAgg::Min(c)
7395 | StreamingAgg::Max(c)
7396 | StreamingAgg::Var(c)
7397 | StreamingAgg::Sd(c) => self.base.columns.iter().position(|(n, _)| n == c),
7398 })
7399 .collect();
7400 for (i, (_, agg)) in aggs.iter().enumerate() {
7401 if matches!(agg, StreamingAgg::Count) {
7402 continue;
7403 }
7404 if agg_col_indices[i].is_none() {
7405 let col_name = match agg {
7406 StreamingAgg::Sum(c)
7407 | StreamingAgg::Mean(c)
7408 | StreamingAgg::Min(c)
7409 | StreamingAgg::Max(c)
7410 | StreamingAgg::Var(c)
7411 | StreamingAgg::Sd(c) => c.clone(),
7412 _ => String::new(),
7413 };
7414 return Err(TidyError::ColumnNotFound(col_name));
7415 }
7416 }
7417
7418 let cat = collect_categorical_keys(&self.base, &key_col_indices);
7420
7421 use std::collections::BTreeMap;
7424
7425 let n_aggs = aggs.len();
7426 let init_accs = || -> Vec<AccState> {
7427 aggs.iter().map(|(_, a)| AccState::from_state(a)).collect()
7428 };
7429
7430 fn _unused() {}
7433
7434 let (cat_state, str_state) = if let Some(cat) = cat.as_ref() {
7436 let mut state: BTreeMap<Vec<u32>, Vec<AccState>> = BTreeMap::new();
7437 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.codes.len());
7438 for row in self.mask.iter_indices() {
7439 key_buf.clear();
7440 for c in &cat.codes {
7441 key_buf.push(c[row]);
7442 }
7443 let entry = state
7444 .entry(key_buf.clone())
7445 .or_insert_with(&init_accs);
7446 for (i, (_, agg)) in aggs.iter().enumerate() {
7447 if let Some(col_idx) = agg_col_indices[i] {
7448 entry[i].update(row_as_f64(&self.base.columns[col_idx].1, row));
7449 } else {
7450 entry[i].update(0.0); }
7452 }
7453 }
7454 (Some(state), None)
7455 } else {
7456 let mut state: BTreeMap<Vec<String>, Vec<AccState>> = BTreeMap::new();
7457 for row in self.mask.iter_indices() {
7458 let key: Vec<String> = key_col_indices
7459 .iter()
7460 .map(|&ci| self.base.columns[ci].1.get_display(row))
7461 .collect();
7462 let entry = state.entry(key).or_insert_with(&init_accs);
7463 for (i, (_, agg)) in aggs.iter().enumerate() {
7464 if let Some(col_idx) = agg_col_indices[i] {
7465 entry[i].update(row_as_f64(&self.base.columns[col_idx].1, row));
7466 } else {
7467 entry[i].update(0.0);
7468 }
7469 }
7470 }
7471 (None, Some(state))
7472 };
7473
7474 let n_groups = cat_state
7476 .as_ref()
7477 .map(|s| s.len())
7478 .unwrap_or_else(|| str_state.as_ref().unwrap().len());
7479
7480 let mut result_columns: Vec<(String, Column)> = Vec::with_capacity(keys.len() + n_aggs);
7481
7482 if let Some(state) = &cat_state {
7484 let cat = cat.as_ref().unwrap();
7485 for (ki, &key_col_idx) in key_col_indices.iter().enumerate() {
7486 let mut vals: Vec<String> = Vec::with_capacity(n_groups);
7487 for key_codes in state.keys() {
7488 let code = key_codes[ki] as usize;
7489 vals.push(cat.levels[ki][code].clone());
7490 }
7491 let key_name = self.base.columns[key_col_idx].0.clone();
7492 let levels: Vec<String> = cat.levels[ki].to_vec();
7494 let codes: Vec<u32> = state.keys().map(|k| k[ki]).collect();
7495 result_columns.push((key_name, Column::Categorical { levels, codes }));
7496 let _ = vals;
7497 }
7498 } else {
7499 let state = str_state.as_ref().unwrap();
7500 for (ki, &key_col_idx) in key_col_indices.iter().enumerate() {
7501 let key_name = self.base.columns[key_col_idx].0.clone();
7502 let vals: Vec<String> = state.keys().map(|k| k[ki].clone()).collect();
7503 result_columns.push((key_name, Column::Str(vals)));
7504 }
7505 }
7506
7507 for (i, (out_name, agg)) in aggs.iter().enumerate() {
7509 let vals: Vec<f64> = if let Some(state) = &cat_state {
7510 state.values().map(|accs| accs[i].finalize(agg)).collect()
7511 } else {
7512 str_state
7513 .as_ref()
7514 .unwrap()
7515 .values()
7516 .map(|accs| accs[i].finalize(agg))
7517 .collect()
7518 };
7519 let col = if matches!(agg, StreamingAgg::Count) {
7520 Column::Int(vals.into_iter().map(|x| x as i64).collect())
7521 } else {
7522 Column::Float(vals)
7523 };
7524 result_columns.push((out_name.to_string(), col));
7525 }
7526
7527 let df = DataFrame::from_columns(result_columns)
7528 .map_err(|e| TidyError::Internal(e.to_string()))?;
7529 Ok(TidyFrame::from_df(df))
7530 }
7531}
7532
7533impl AccState {
7534 fn from_state(agg: &StreamingAgg) -> Self {
7535 Self::from_agg(agg)
7536 }
7537}
7538
7539#[derive(Clone, Debug)]
7605pub struct FctColumn {
7606 pub levels: Vec<String>,
7609 pub data: Vec<u16>,
7611}
7612
7613impl FctColumn {
7614 pub fn encode(strings: &[String]) -> Result<Self, TidyError> {
7621 use std::collections::BTreeMap;
7622 let mut levels: Vec<String> = Vec::new();
7623 let mut level_map: BTreeMap<String, u16> = BTreeMap::new();
7627 let mut data: Vec<u16> = Vec::with_capacity(strings.len());
7628
7629 for s in strings {
7630 let idx = if let Some(&existing) = level_map.get(s.as_str()) {
7631 existing
7632 } else {
7633 let next = levels.len();
7634 if next >= 65_535 {
7635 return Err(TidyError::CapacityExceeded {
7636 limit: 65_535,
7637 got: next + 1,
7638 });
7639 }
7640 let idx = next as u16;
7641 levels.push(s.clone());
7642 level_map.insert(s.clone(), idx);
7643 idx
7644 };
7645 data.push(idx);
7646 }
7647 Ok(FctColumn { levels, data })
7648 }
7649
7650 pub fn encode_from_view(view: &TidyView, col: &str) -> Result<Self, TidyError> {
7652 let base_idx = view.base.columns.iter()
7653 .position(|(n, _)| n == col)
7654 .ok_or_else(|| TidyError::ColumnNotFound(col.to_string()))?;
7655 if !view.proj.indices().contains(&base_idx) {
7657 return Err(TidyError::ColumnNotFound(col.to_string()));
7658 }
7659 let col_data = &view.base.columns[base_idx].1;
7660 let visible: Vec<usize> = view.mask.iter_indices().collect();
7661 let strings: Vec<String> = visible.iter()
7662 .map(|&r| col_data.get_display(r))
7663 .collect();
7664 Self::encode(&strings)
7665 }
7666
7667 pub fn nrows(&self) -> usize { self.data.len() }
7671 pub fn nlevels(&self) -> usize { self.levels.len() }
7673
7674 pub fn decode(&self, i: usize) -> &str {
7676 &self.levels[self.data[i] as usize]
7677 }
7678
7679 pub fn fct_lump(&self, n: usize) -> Result<Self, TidyError> {
7691 if n >= self.levels.len() {
7692 return Ok(self.clone()); }
7694
7695 let mut freq = vec![0usize; self.levels.len()];
7697 for &idx in &self.data {
7698 freq[idx as usize] += 1;
7699 }
7700
7701 let mut ranked: Vec<(usize, usize)> = freq.iter().copied().enumerate().collect();
7704 ranked.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
7705
7706 let mut keep_set: Vec<usize> = ranked[..n].iter().map(|(i, _)| *i).collect();
7708 keep_set.sort_unstable(); let mut other_name = "Other".to_string();
7712 while keep_set.iter().any(|&ki| self.levels[ki] == other_name) {
7713 other_name.push('_');
7714 }
7715
7716 let mut new_levels: Vec<String> = keep_set.iter().map(|&ki| self.levels[ki].clone()).collect();
7718 let other_idx = new_levels.len() as u16;
7719 new_levels.push(other_name);
7720
7721 let mut remap = vec![other_idx; self.levels.len()];
7723 for (new_i, &old_i) in keep_set.iter().enumerate() {
7724 remap[old_i] = new_i as u16;
7725 }
7726
7727 let new_data: Vec<u16> = self.data.iter().map(|&d| remap[d as usize]).collect();
7728 Ok(FctColumn { levels: new_levels, data: new_data })
7729 }
7730
7731 pub fn fct_reorder(&self, summary_vals: &[f64], descending: bool) -> Result<Self, TidyError> {
7740 if summary_vals.len() != self.levels.len() {
7741 return Err(TidyError::LengthMismatch {
7742 expected: self.levels.len(),
7743 got: summary_vals.len(),
7744 });
7745 }
7746 let mut order: Vec<usize> = (0..self.levels.len()).collect();
7750 order.sort_by(|&a, &b| {
7751 let va = summary_vals[a];
7752 let vb = summary_vals[b];
7753 match (va.is_nan(), vb.is_nan()) {
7754 (true, true) => std::cmp::Ordering::Equal,
7755 (true, false) => std::cmp::Ordering::Greater, (false, true) => std::cmp::Ordering::Less, (false, false) => {
7758 let cmp = va.partial_cmp(&vb).unwrap_or(std::cmp::Ordering::Equal);
7759 if descending { cmp.reverse() } else { cmp }
7760 }
7761 }
7762 });
7763
7764 let new_levels: Vec<String> = order.iter().map(|&i| self.levels[i].clone()).collect();
7766
7767 let mut remap = vec![0u16; self.levels.len()];
7769 for (new_i, &old_i) in order.iter().enumerate() {
7770 remap[old_i] = new_i as u16;
7771 }
7772
7773 let new_data: Vec<u16> = self.data.iter().map(|&d| remap[d as usize]).collect();
7774 Ok(FctColumn { levels: new_levels, data: new_data })
7775 }
7776
7777 pub fn fct_reorder_by_col(&self, numeric_col: &Column, descending: bool) -> Result<Self, TidyError> {
7783 if numeric_col.len() != self.data.len() {
7784 return Err(TidyError::LengthMismatch {
7785 expected: self.data.len(),
7786 got: numeric_col.len(),
7787 });
7788 }
7789 let mut sums = vec![0.0f64; self.levels.len()];
7790 let mut counts = vec![0usize; self.levels.len()];
7791 match numeric_col {
7792 Column::Float(v) => {
7793 for (i, &d) in self.data.iter().enumerate() {
7794 let val = v[i];
7795 if !val.is_nan() {
7796 sums[d as usize] += val;
7797 counts[d as usize] += 1;
7798 }
7799 }
7800 }
7801 Column::Int(v) => {
7802 for (i, &d) in self.data.iter().enumerate() {
7803 sums[d as usize] += v[i] as f64;
7804 counts[d as usize] += 1;
7805 }
7806 }
7807 _ => return Err(TidyError::TypeMismatch {
7808 expected: "Float or Int".to_string(),
7809 got: numeric_col.type_name().to_string(),
7810 }),
7811 }
7812 let means: Vec<f64> = sums.iter().zip(counts.iter())
7813 .map(|(&s, &c)| if c == 0 { f64::NAN } else { s / c as f64 })
7814 .collect();
7815 self.fct_reorder(&means, descending)
7816 }
7817
7818 pub fn fct_collapse(&self, mapping: &[(&str, &str)]) -> Result<Self, TidyError> {
7835 if mapping.is_empty() {
7836 return Ok(self.clone());
7837 }
7838 let new_name_for: Vec<String> = self.levels.iter().map(|old| {
7840 if let Some((_, new)) = mapping.iter().find(|(o, _)| *o == old.as_str()) {
7841 new.to_string()
7842 } else {
7843 old.clone()
7844 }
7845 }).collect();
7846
7847 use std::collections::BTreeMap;
7850 let mut new_levels: Vec<String> = Vec::new();
7851 let mut new_name_to_idx: BTreeMap<String, u16> = BTreeMap::new();
7852
7853 let mut old_to_new: Vec<u16> = Vec::with_capacity(self.levels.len());
7854 for name in &new_name_for {
7855 let idx = if let Some(&existing) = new_name_to_idx.get(name.as_str()) {
7856 existing
7857 } else {
7858 let idx = new_levels.len() as u16;
7859 new_levels.push(name.clone());
7860 new_name_to_idx.insert(name.clone(), idx);
7861 idx
7862 };
7863 old_to_new.push(idx);
7864 }
7865
7866 let changed = old_to_new.iter().enumerate().any(|(i, &new)| new != i as u16);
7868 let new_data = if changed {
7869 self.data.iter().map(|&d| old_to_new[d as usize]).collect()
7870 } else {
7871 self.data.clone()
7872 };
7873 Ok(FctColumn { levels: new_levels, data: new_data })
7874 }
7875
7876 pub fn to_str_column(&self) -> Column {
7880 Column::Str(self.data.iter().map(|&d| self.levels[d as usize].clone()).collect())
7881 }
7882
7883 pub fn gather(&self, indices: &[usize]) -> FctColumn {
7885 FctColumn {
7886 levels: self.levels.clone(),
7887 data: indices.iter().map(|&i| self.data[i]).collect(),
7888 }
7889 }
7890}
7891
7892impl TidyError {
7895 pub fn capacity_exceeded(limit: usize, got: usize) -> Self {
7897 TidyError::CapacityExceeded { limit, got }
7898 }
7899}
7900
7901#[derive(Clone, Debug)]
7906pub struct NullableFactor {
7907 pub fct: FctColumn,
7908 pub validity: BitMask,
7909}
7910
7911impl NullableFactor {
7912 pub fn from_fct(fct: FctColumn) -> Self {
7914 let n = fct.nrows();
7915 NullableFactor { fct, validity: BitMask::all_true(n) }
7916 }
7917
7918 pub fn new(fct: FctColumn, validity: BitMask) -> Self {
7920 NullableFactor { fct, validity }
7921 }
7922
7923 pub fn encode_nullable(strings: &[Option<String>]) -> Result<Self, TidyError> {
7927 use std::collections::BTreeMap;
7928 let mut levels: Vec<String> = Vec::new();
7929 let mut level_map: BTreeMap<String, u16> = BTreeMap::new();
7930 let mut data: Vec<u16> = Vec::with_capacity(strings.len());
7931 let mut valid_flags: Vec<bool> = Vec::with_capacity(strings.len());
7932
7933 for opt in strings {
7934 match opt {
7935 None => {
7936 data.push(0); valid_flags.push(false);
7938 }
7939 Some(s) => {
7940 let idx = if let Some(&existing) = level_map.get(s.as_str()) {
7941 existing
7942 } else {
7943 let next = levels.len();
7944 if next >= 65_535 {
7945 return Err(TidyError::CapacityExceeded { limit: 65_535, got: next + 1 });
7946 }
7947 let idx = next as u16;
7948 levels.push(s.clone());
7949 level_map.insert(s.clone(), idx);
7950 idx
7951 };
7952 data.push(idx);
7953 valid_flags.push(true);
7954 }
7955 }
7956 }
7957 let fct = FctColumn { levels, data };
7958 let validity = BitMask::from_bools(&valid_flags);
7959 Ok(NullableFactor { fct, validity })
7960 }
7961
7962 pub fn nrows(&self) -> usize { self.fct.nrows() }
7964 pub fn nlevels(&self) -> usize { self.fct.nlevels() }
7966 pub fn is_null(&self, i: usize) -> bool { !self.validity.get(i) }
7968 pub fn count_valid(&self) -> usize { self.validity.count_ones() }
7970
7971 pub fn decode(&self, i: usize) -> Option<&str> {
7973 if self.is_null(i) { None } else { Some(self.fct.decode(i)) }
7974 }
7975
7976 pub fn fct_lump(&self, n: usize) -> Result<Self, TidyError> {
7978 let lumped = self.fct.fct_lump(n)?;
7979 Ok(NullableFactor { fct: lumped, validity: self.validity.clone() })
7980 }
7981
7982 pub fn fct_reorder(&self, summary_vals: &[f64], descending: bool) -> Result<Self, TidyError> {
7984 let reordered = self.fct.fct_reorder(summary_vals, descending)?;
7985 Ok(NullableFactor { fct: reordered, validity: self.validity.clone() })
7986 }
7987
7988 pub fn fct_collapse(&self, mapping: &[(&str, &str)]) -> Result<Self, TidyError> {
7990 let collapsed = self.fct.fct_collapse(mapping)?;
7991 Ok(NullableFactor { fct: collapsed, validity: self.validity.clone() })
7992 }
7993}
7994
7995impl TidyView {
7998 pub fn fct_encode(&self, col: &str) -> Result<FctColumn, TidyError> {
8003 FctColumn::encode_from_view(self, col)
8004 }
8005
8006 pub fn fct_summary_means(
8011 &self,
8012 fct: &FctColumn,
8013 numeric_col: &str,
8014 ) -> Result<Vec<f64>, TidyError> {
8015 let base_idx = self.base.columns.iter()
8016 .position(|(n, _)| n == numeric_col)
8017 .ok_or_else(|| TidyError::ColumnNotFound(numeric_col.to_string()))?;
8018 let nc = &self.base.columns[base_idx].1;
8019 if nc.len() != fct.nrows() {
8020 return Err(TidyError::LengthMismatch { expected: fct.nrows(), got: nc.len() });
8021 }
8022 match nc {
8024 Column::Float(_) | Column::Int(_) => {}
8025 _ => return Err(TidyError::TypeMismatch {
8026 expected: "Float or Int".to_string(),
8027 got: nc.type_name().to_string(),
8028 }),
8029 }
8030 let mut sums = vec![0.0f64; fct.levels.len()];
8031 let mut counts = vec![0usize; fct.levels.len()];
8032 match nc {
8033 Column::Float(v) => {
8034 for (i, &d) in fct.data.iter().enumerate() {
8035 if !v[i].is_nan() {
8036 sums[d as usize] += v[i];
8037 counts[d as usize] += 1;
8038 }
8039 }
8040 }
8041 Column::Int(v) => {
8042 for (i, &d) in fct.data.iter().enumerate() {
8043 sums[d as usize] += v[i] as f64;
8044 counts[d as usize] += 1;
8045 }
8046 }
8047 _ => unreachable!(),
8048 }
8049 Ok(sums.iter().zip(counts.iter())
8050 .map(|(&s, &c)| if c == 0 { f64::NAN } else { s / c as f64 })
8051 .collect())
8052 }
8053}
8054
8055pub fn label_encode(col: &[String]) -> (Vec<String>, Vec<u32>) {
8062 let unique: BTreeSet<&str> = col.iter().map(|s| s.as_str()).collect();
8063 let levels: Vec<String> = unique.into_iter().map(|s| s.to_string()).collect();
8064
8065 let lookup: BTreeMap<&str, u32> = levels
8066 .iter()
8067 .enumerate()
8068 .map(|(i, s)| (s.as_str(), i as u32))
8069 .collect();
8070
8071 let codes: Vec<u32> = col.iter().map(|s| lookup[s.as_str()]).collect();
8072 (levels, codes)
8073}
8074
8075pub fn ordinal_encode(col: &[String], order: &[String]) -> Result<(Vec<String>, Vec<u32>), String> {
8080 let lookup: BTreeMap<&str, u32> = order
8081 .iter()
8082 .enumerate()
8083 .map(|(i, s)| (s.as_str(), i as u32))
8084 .collect();
8085
8086 let mut codes = Vec::with_capacity(col.len());
8087 for s in col {
8088 match lookup.get(s.as_str()) {
8089 Some(&idx) => codes.push(idx),
8090 None => return Err(format!("value {:?} not found in specified order", s)),
8091 }
8092 }
8093 Ok((order.to_vec(), codes))
8094}
8095
8096pub fn one_hot_encode(levels: &[String], codes: &[u32]) -> (Vec<String>, Vec<Vec<bool>>) {
8101 let n_levels = levels.len();
8102 let n_rows = codes.len();
8103
8104 let mut columns: Vec<Vec<bool>> = vec![vec![false; n_rows]; n_levels];
8105 for (row, &code) in codes.iter().enumerate() {
8106 columns[code as usize][row] = true;
8107 }
8108
8109 let names: Vec<String> = levels.to_vec();
8110 (names, columns)
8111}
8112
8113#[cfg(test)]
8114mod rolling_window_tests {
8115 use super::*;
8116
8117 fn make_df(col_name: &str, vals: Vec<f64>) -> DataFrame {
8119 DataFrame {
8120 columns: vec![(col_name.to_string(), Column::Float(vals))],
8121 }
8122 }
8123
8124 #[test]
8125 fn rolling_sum_basic() {
8126 let df = make_df("x", vec![1.0, 2.0, 3.0, 4.0, 5.0]);
8129 let expr = DExpr::RollingSum("x".into(), 3);
8130 let col = eval_expr_column(&df, &expr, 5).unwrap();
8131 match col {
8132 Column::Float(v) => {
8133 assert_eq!(v.len(), 5);
8134 assert!((v[0] - 1.0).abs() < 1e-12);
8135 assert!((v[1] - 3.0).abs() < 1e-12);
8136 assert!((v[2] - 6.0).abs() < 1e-12);
8137 assert!((v[3] - 9.0).abs() < 1e-12);
8138 assert!((v[4] - 12.0).abs() < 1e-12);
8139 }
8140 _ => panic!("expected Float column"),
8141 }
8142 }
8143
8144 #[test]
8145 fn rolling_mean_basic() {
8146 let df = make_df("x", vec![1.0, 2.0, 3.0, 4.0, 5.0]);
8149 let expr = DExpr::RollingMean("x".into(), 3);
8150 let col = eval_expr_column(&df, &expr, 5).unwrap();
8151 match col {
8152 Column::Float(v) => {
8153 assert_eq!(v.len(), 5);
8154 assert!((v[0] - 1.0).abs() < 1e-12);
8155 assert!((v[1] - 1.5).abs() < 1e-12);
8156 assert!((v[2] - 2.0).abs() < 1e-12);
8157 assert!((v[3] - 3.0).abs() < 1e-12);
8158 assert!((v[4] - 4.0).abs() < 1e-12);
8159 }
8160 _ => panic!("expected Float column"),
8161 }
8162 }
8163
8164 #[test]
8165 fn rolling_min_basic() {
8166 let df = make_df("x", vec![5.0, 3.0, 4.0, 1.0, 2.0]);
8169 let expr = DExpr::RollingMin("x".into(), 3);
8170 let col = eval_expr_column(&df, &expr, 5).unwrap();
8171 match col {
8172 Column::Float(v) => {
8173 assert_eq!(v.len(), 5);
8174 assert!((v[0] - 5.0).abs() < 1e-12);
8175 assert!((v[1] - 3.0).abs() < 1e-12);
8176 assert!((v[2] - 3.0).abs() < 1e-12);
8177 assert!((v[3] - 1.0).abs() < 1e-12);
8178 assert!((v[4] - 1.0).abs() < 1e-12);
8179 }
8180 _ => panic!("expected Float column"),
8181 }
8182 }
8183
8184 #[test]
8185 fn rolling_max_basic() {
8186 let df = make_df("x", vec![1.0, 5.0, 3.0, 2.0, 4.0]);
8189 let expr = DExpr::RollingMax("x".into(), 3);
8190 let col = eval_expr_column(&df, &expr, 5).unwrap();
8191 match col {
8192 Column::Float(v) => {
8193 assert_eq!(v.len(), 5);
8194 assert!((v[0] - 1.0).abs() < 1e-12);
8195 assert!((v[1] - 5.0).abs() < 1e-12);
8196 assert!((v[2] - 5.0).abs() < 1e-12);
8197 assert!((v[3] - 5.0).abs() < 1e-12);
8198 assert!((v[4] - 4.0).abs() < 1e-12);
8199 }
8200 _ => panic!("expected Float column"),
8201 }
8202 }
8203
8204 #[test]
8205 fn rolling_var_basic() {
8206 let df = make_df("x", vec![2.0, 4.0, 6.0, 8.0]);
8208 let expr = DExpr::RollingVar("x".into(), 3);
8209 let col = eval_expr_column(&df, &expr, 4).unwrap();
8210 match col {
8211 Column::Float(v) => {
8212 assert_eq!(v.len(), 4);
8213 assert!((v[0] - 0.0).abs() < 1e-12);
8215 assert!((v[1] - 2.0).abs() < 1e-10);
8217 assert!((v[2] - 4.0).abs() < 1e-10);
8219 assert!((v[3] - 4.0).abs() < 1e-10);
8221 }
8222 _ => panic!("expected Float column"),
8223 }
8224 }
8225
8226 #[test]
8227 fn rolling_sd_basic() {
8228 let df = make_df("x", vec![2.0, 4.0, 6.0, 8.0]);
8229 let expr = DExpr::RollingSd("x".into(), 3);
8230 let col = eval_expr_column(&df, &expr, 4).unwrap();
8231 match col {
8232 Column::Float(v) => {
8233 assert_eq!(v.len(), 4);
8234 assert!((v[0] - 0.0).abs() < 1e-12);
8235 assert!((v[1] - 2.0_f64.sqrt()).abs() < 1e-10);
8236 assert!((v[2] - 2.0).abs() < 1e-10);
8237 assert!((v[3] - 2.0).abs() < 1e-10);
8238 }
8239 _ => panic!("expected Float column"),
8240 }
8241 }
8242
8243 #[test]
8244 fn rolling_window_larger_than_data() {
8245 let df = make_df("x", vec![1.0, 2.0, 3.0]);
8246 let expr = DExpr::RollingSum("x".into(), 10);
8247 let col = eval_expr_column(&df, &expr, 3).unwrap();
8248 match col {
8249 Column::Float(v) => {
8250 assert_eq!(v.len(), 3);
8251 assert!((v[0] - 1.0).abs() < 1e-12);
8252 assert!((v[1] - 3.0).abs() < 1e-12);
8253 assert!((v[2] - 6.0).abs() < 1e-12);
8254 }
8255 _ => panic!("expected Float column"),
8256 }
8257 }
8258
8259 #[test]
8260 fn rolling_window_of_one() {
8261 let df = make_df("x", vec![3.0, 1.0, 4.0, 1.0, 5.0]);
8262 let expr_min = DExpr::RollingMin("x".into(), 1);
8263 let expr_max = DExpr::RollingMax("x".into(), 1);
8264 let col_min = eval_expr_column(&df, &expr_min, 5).unwrap();
8265 let col_max = eval_expr_column(&df, &expr_max, 5).unwrap();
8266 match (col_min, col_max) {
8267 (Column::Float(mins), Column::Float(maxs)) => {
8268 let expected = [3.0, 1.0, 4.0, 1.0, 5.0];
8269 for i in 0..5 {
8270 assert!((mins[i] - expected[i]).abs() < 1e-12, "min[{}]", i);
8271 assert!((maxs[i] - expected[i]).abs() < 1e-12, "max[{}]", i);
8272 }
8273 }
8274 _ => panic!("expected Float columns"),
8275 }
8276 }
8277
8278 #[test]
8279 fn rolling_sum_with_nan() {
8280 let df = make_df("x", vec![1.0, f64::NAN, 3.0, 4.0]);
8281 let expr = DExpr::RollingSum("x".into(), 2);
8282 let col = eval_expr_column(&df, &expr, 4).unwrap();
8283 match col {
8284 Column::Float(v) => {
8285 assert_eq!(v.len(), 4);
8286 assert!((v[0] - 1.0).abs() < 1e-12);
8287 assert!(v[1].is_nan());
8288 assert!(v[2].is_nan());
8289 assert!(v[3].is_nan()); }
8291 _ => panic!("expected Float column"),
8292 }
8293 }
8294
8295 #[test]
8296 fn rolling_determinism() {
8297 let df = make_df("x", vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]);
8298 let expr = DExpr::RollingSum("x".into(), 4);
8299 let mut runs: Vec<Vec<f64>> = Vec::new();
8300 for _ in 0..3 {
8301 let col = eval_expr_column(&df, &expr, 10).unwrap();
8302 match col {
8303 Column::Float(v) => runs.push(v),
8304 _ => panic!("expected Float column"),
8305 }
8306 }
8307 assert_eq!(runs[0], runs[1]);
8308 assert_eq!(runs[1], runs[2]);
8309 }
8310
8311 #[test]
8312 fn rolling_display() {
8313 let expr = DExpr::RollingSum("val".into(), 5);
8314 assert_eq!(format!("{}", expr), "rolling_sum(\"val\", 5)");
8315 let expr2 = DExpr::RollingMean("col".into(), 3);
8316 assert_eq!(format!("{}", expr2), "rolling_mean(\"col\", 3)");
8317 }
8318
8319 #[test]
8320 fn rolling_collect_columns() {
8321 let expr = DExpr::RollingSum("revenue".into(), 7);
8322 let mut cols = Vec::new();
8323 collect_expr_columns(&expr, &mut cols);
8324 assert_eq!(cols, vec!["revenue".to_string()]);
8325 }
8326
8327 #[test]
8328 fn rolling_not_allowed_in_row_context() {
8329 let df = make_df("x", vec![1.0, 2.0, 3.0]);
8330 let expr = DExpr::RollingSum("x".into(), 2);
8331 let result = eval_expr_row(&df, &expr, 0);
8332 assert!(result.is_err());
8333 }
8334
8335 fn cat_col(levels: &[&str], codes: &[u32]) -> Column {
8338 Column::Categorical {
8339 levels: levels.iter().map(|s| s.to_string()).collect(),
8340 codes: codes.to_vec(),
8341 }
8342 }
8343
8344 #[test]
8345 fn phase4_collect_cat_keys_returns_some_when_all_categorical() {
8346 let left = DataFrame::from_columns(vec![
8347 ("k".into(), cat_col(&["a", "b", "c"], &[0, 1, 2, 0])),
8348 ])
8349 .unwrap();
8350 let right = DataFrame::from_columns(vec![
8351 ("k".into(), cat_col(&["b", "a"], &[0, 1, 1])),
8352 ])
8353 .unwrap();
8354 let cat = collect_categorical_join_keys(&left, &[0], &right, &[0]).unwrap();
8355 assert_eq!(cat.right_to_left[0], vec![Some(1u32), Some(0u32)]);
8357 }
8358
8359 #[test]
8360 fn phase4_collect_cat_keys_returns_none_on_mixed_types() {
8361 let left = DataFrame::from_columns(vec![
8362 ("k".into(), cat_col(&["a"], &[0])),
8363 ("n".into(), Column::Int(vec![1])),
8364 ])
8365 .unwrap();
8366 let right = DataFrame::from_columns(vec![
8367 ("k".into(), cat_col(&["a"], &[0])),
8368 ("n".into(), Column::Int(vec![1])),
8369 ])
8370 .unwrap();
8371 assert!(collect_categorical_join_keys(&left, &[0, 1], &right, &[0, 1]).is_none());
8373 }
8374
8375 #[test]
8376 fn phase4_collect_cat_keys_unknown_right_level_yields_none_in_remap() {
8377 let left = DataFrame::from_columns(vec![
8378 ("k".into(), cat_col(&["a", "b"], &[0, 1])),
8379 ])
8380 .unwrap();
8381 let right = DataFrame::from_columns(vec![
8382 ("k".into(), cat_col(&["a", "z"], &[0, 1])),
8383 ])
8384 .unwrap();
8385 let cat = collect_categorical_join_keys(&left, &[0], &right, &[0]).unwrap();
8386 assert_eq!(cat.right_to_left[0], vec![Some(0u32), None]);
8388 }
8389
8390 #[test]
8391 fn phase4_column_to_categorical_column_roundtrip() {
8392 let original = cat_col(&["red", "green", "blue"], &[0, 1, 2, 1, 0]);
8393 let cc = original.to_categorical_column().unwrap();
8394 let restored = Column::from_categorical_column(&cc).unwrap();
8395 match (&original, &restored) {
8396 (
8397 Column::Categorical { levels: l1, codes: c1 },
8398 Column::Categorical { levels: l2, codes: c2 },
8399 ) => {
8400 assert_eq!(l1, l2);
8401 assert_eq!(c1, c2);
8402 }
8403 _ => panic!("expected Categorical"),
8404 }
8405 }
8406
8407 #[test]
8408 fn phase4_column_to_categorical_column_none_for_non_categorical() {
8409 assert!(Column::Int(vec![1, 2, 3]).to_categorical_column().is_none());
8410 assert!(Column::Str(vec!["a".into()]).to_categorical_column().is_none());
8411 assert!(Column::Float(vec![1.0]).to_categorical_column().is_none());
8412 }
8413
8414 #[test]
8415 fn phase4_column_from_categorical_column_rejects_nulls() {
8416 use crate::byte_dict::CategoricalColumn;
8419 let mut cc = CategoricalColumn::new();
8420 cc.push(b"a").unwrap();
8421 cc.push_null();
8422 cc.push(b"b").unwrap();
8423 assert!(Column::from_categorical_column(&cc).is_none());
8424 }
8425
8426 #[test]
8427 fn phase4_column_from_categorical_column_rejects_non_utf8() {
8428 use crate::byte_dict::CategoricalColumn;
8429 let mut cc = CategoricalColumn::new();
8430 cc.push(&[0xFFu8]).unwrap();
8432 assert!(Column::from_categorical_column(&cc).is_none());
8433 }
8434}
8435
8436