1use cjc_repro::kahan_sum_f64;
10use std::cell::RefCell;
11use std::collections::{BTreeMap, BTreeSet, VecDeque};
12use std::fmt;
13use std::rc::Rc;
14
15mod csv;
16pub use csv::{CsvConfig, CsvReader, StreamingCsvProcessor};
17
18pub mod adaptive_selection;
19pub mod agg_kernels;
20pub mod byte_dict;
21pub mod column_meta;
22pub mod detcoll;
23pub mod dict_encoding;
24pub mod lazy;
25pub mod predicate_bytecode;
26pub mod tidy_dispatch;
27
28pub use adaptive_selection::{AdaptiveSelection, SelectionIndices};
29
30#[derive(Debug, Clone)]
34pub enum Column {
35 Int(Vec<i64>),
37 Float(Vec<f64>),
39 Str(Vec<String>),
41 Bool(Vec<bool>),
43 Categorical {
45 levels: Vec<String>,
46 codes: Vec<u32>,
47 },
48 CategoricalAdaptive(Box<crate::byte_dict::CategoricalColumn>),
59 DateTime(Vec<i64>),
61}
62
63impl Column {
64 pub fn len(&self) -> usize {
66 match self {
67 Column::Int(v) => v.len(),
68 Column::Float(v) => v.len(),
69 Column::Str(v) => v.len(),
70 Column::Bool(v) => v.len(),
71 Column::Categorical { codes, .. } => codes.len(),
72 Column::CategoricalAdaptive(cc) => cc.len(),
73 Column::DateTime(v) => v.len(),
74 }
75 }
76
77 pub fn is_empty(&self) -> bool {
79 self.len() == 0
80 }
81
82 pub fn type_name(&self) -> &'static str {
84 match self {
85 Column::Int(_) => "Int",
86 Column::Float(_) => "Float",
87 Column::Str(_) => "Str",
88 Column::Bool(_) => "Bool",
89 Column::Categorical { .. } => "Categorical",
90 Column::CategoricalAdaptive(_) => "CategoricalAdaptive",
91 Column::DateTime(_) => "DateTime",
92 }
93 }
94
95 pub fn get_display(&self, idx: usize) -> String {
97 match self {
98 Column::Int(v) => format!("{}", v[idx]),
99 Column::Float(v) => format!("{}", v[idx]),
100 Column::Str(v) => v[idx].clone(),
101 Column::Bool(v) => format!("{}", v[idx]),
102 Column::Categorical { levels, codes } => levels[codes[idx] as usize].clone(),
103 Column::CategoricalAdaptive(cc) => match cc.get(idx) {
104 None => String::new(),
105 Some(bytes) => String::from_utf8_lossy(bytes).into_owned(),
106 },
107 Column::DateTime(v) => format!("{}ms", v[idx]),
108 }
109 }
110
111 pub fn categorical_adaptive(cc: crate::byte_dict::CategoricalColumn) -> Self {
114 Column::CategoricalAdaptive(Box::new(cc))
115 }
116
117 pub fn to_legacy_categorical(&self) -> Column {
128 match self {
129 Column::CategoricalAdaptive(cc) => {
130 if let Some(legacy) = Column::from_categorical_column(cc) {
132 return legacy;
133 }
134 let n = cc.len();
137 let mut out: Vec<String> = Vec::with_capacity(n);
138 for i in 0..n {
139 out.push(match cc.get(i) {
140 None => String::new(),
141 Some(b) => String::from_utf8_lossy(b).into_owned(),
142 });
143 }
144 Column::Str(out)
145 }
146 _ => self.clone(),
147 }
148 }
149
150 pub fn to_categorical_column(&self) -> Option<crate::byte_dict::CategoricalColumn> {
170 use crate::byte_dict::{ByteDictionary, CategoricalColumn};
171 match self {
172 Column::Categorical { levels, codes } => {
173 let explicit: Vec<Vec<u8>> =
174 levels.iter().map(|s| s.as_bytes().to_vec()).collect();
175 let dict = ByteDictionary::from_explicit(explicit).ok()?;
179 let mut col = CategoricalColumn::with_dictionary(dict);
180 for &c in codes {
185 let bytes = levels[c as usize].as_bytes();
186 col.push(bytes).ok()?;
189 }
190 Some(col)
191 }
192 _ => None,
193 }
194 }
195
196 pub fn from_categorical_column(
206 cat: &crate::byte_dict::CategoricalColumn,
207 ) -> Option<Self> {
208 if cat.nulls().is_some() {
209 return None;
210 }
211 let dict = cat.dictionary();
212 let mut levels: Vec<String> = Vec::with_capacity(dict.len());
213 for (_, bytes) in dict.iter() {
214 match std::str::from_utf8(bytes) {
215 Ok(s) => levels.push(s.to_string()),
216 Err(_) => return None,
217 }
218 }
219 let mut codes: Vec<u32> = Vec::with_capacity(cat.len());
220 for c in cat.codes().iter() {
221 if c > u32::MAX as u64 {
222 return None;
223 }
224 codes.push(c as u32);
225 }
226 Some(Column::Categorical { levels, codes })
227 }
228}
229
230#[derive(Debug, Clone)]
234pub struct DataFrame {
235 pub columns: Vec<(String, Column)>,
236}
237
238impl DataFrame {
239 pub fn new() -> Self {
241 Self {
242 columns: Vec::new(),
243 }
244 }
245
246 pub fn from_columns(columns: Vec<(String, Column)>) -> Result<Self, DataError> {
250 if columns.is_empty() {
251 return Ok(Self { columns });
252 }
253 let len = columns[0].1.len();
254 for (name, col) in &columns {
255 if col.len() != len {
256 return Err(DataError::ColumnLengthMismatch {
257 expected: len,
258 got: col.len(),
259 column: name.clone(),
260 });
261 }
262 }
263 Ok(Self { columns })
264 }
265
266 pub fn nrows(&self) -> usize {
268 self.columns.first().map(|(_, c)| c.len()).unwrap_or(0)
269 }
270
271 pub fn ncols(&self) -> usize {
273 self.columns.len()
274 }
275
276 pub fn column_names(&self) -> Vec<&str> {
278 self.columns.iter().map(|(n, _)| n.as_str()).collect()
279 }
280
281 pub fn get_column(&self, name: &str) -> Option<&Column> {
283 self.columns
284 .iter()
285 .find(|(n, _)| n == name)
286 .map(|(_, c)| c)
287 }
288
289 pub fn to_tensor_data(&self, col_names: &[&str]) -> Result<(Vec<f64>, Vec<usize>), DataError> {
291 let nrows = self.nrows();
292 let ncols = col_names.len();
293 let mut data = Vec::with_capacity(nrows * ncols);
294
295 for row in 0..nrows {
296 for &col_name in col_names {
297 let col = self
298 .get_column(col_name)
299 .ok_or_else(|| DataError::ColumnNotFound(col_name.to_string()))?;
300 let val = match col {
301 Column::Float(v) => v[row],
302 Column::Int(v) => v[row] as f64,
303 _ => {
304 return Err(DataError::InvalidOperation(format!(
305 "column `{}` is not numeric",
306 col_name
307 )))
308 }
309 };
310 data.push(val);
311 }
312 }
313
314 Ok((data, vec![nrows, ncols]))
315 }
316}
317
318impl Default for DataFrame {
319 fn default() -> Self {
320 Self::new()
321 }
322}
323
324impl fmt::Display for DataFrame {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 if self.columns.is_empty() {
327 return write!(f, "(empty DataFrame)");
328 }
329
330 let names: Vec<&str> = self.columns.iter().map(|(n, _)| n.as_str()).collect();
332 let mut col_widths: Vec<usize> = names.iter().map(|n| n.len()).collect();
333
334 let nrows = self.nrows();
336 for (col_idx, (_, col)) in self.columns.iter().enumerate() {
337 for row in 0..nrows {
338 let s = col.get_display(row);
339 col_widths[col_idx] = col_widths[col_idx].max(s.len());
340 }
341 }
342
343 for (i, name) in names.iter().enumerate() {
345 if i > 0 {
346 write!(f, " | ")?;
347 }
348 write!(f, "{:>width$}", name, width = col_widths[i])?;
349 }
350 writeln!(f)?;
351
352 for (i, &w) in col_widths.iter().enumerate() {
354 if i > 0 {
355 write!(f, "-+-")?;
356 }
357 write!(f, "{}", "-".repeat(w))?;
358 }
359 writeln!(f)?;
360
361 for row in 0..nrows {
363 for (col_idx, (_, col)) in self.columns.iter().enumerate() {
364 if col_idx > 0 {
365 write!(f, " | ")?;
366 }
367 let s = col.get_display(row);
368 write!(f, "{:>width$}", s, width = col_widths[col_idx])?;
369 }
370 writeln!(f)?;
371 }
372
373 Ok(())
374 }
375}
376
377#[derive(Debug, Clone)]
381pub enum DExpr {
382 Col(String),
384 LitInt(i64),
386 LitFloat(f64),
388 LitBool(bool),
390 LitStr(String),
392 BinOp {
394 op: DBinOp,
395 left: Box<DExpr>,
396 right: Box<DExpr>,
397 },
398 Agg(AggFunc, Box<DExpr>),
400 Count,
402 FnCall(String, Vec<DExpr>),
404 CumSum(Box<DExpr>),
406 CumProd(Box<DExpr>),
408 CumMax(Box<DExpr>),
410 CumMin(Box<DExpr>),
412 Lag(Box<DExpr>, usize),
414 Lead(Box<DExpr>, usize),
416 Rank(Box<DExpr>),
418 DenseRank(Box<DExpr>),
420 RowNumber,
422 RollingSum(String, usize),
424 RollingMean(String, usize),
426 RollingMin(String, usize),
428 RollingMax(String, usize),
430 RollingVar(String, usize),
432 RollingSd(String, usize),
434}
435
436#[derive(Debug, Clone, Copy, PartialEq, Eq)]
438pub enum DBinOp {
439 Add,
441 Sub,
443 Mul,
445 Div,
447 Gt,
449 Lt,
451 Ge,
453 Le,
455 Eq,
457 Ne,
459 And,
461 Or,
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
467pub enum AggFunc {
468 Sum,
470 Mean,
472 Min,
474 Max,
476 Count,
478}
479
480impl fmt::Display for DExpr {
481 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482 match self {
483 DExpr::Col(name) => write!(f, "col(\"{}\")", name),
484 DExpr::LitInt(v) => write!(f, "{}", v),
485 DExpr::LitFloat(v) => write!(f, "{}", v),
486 DExpr::LitBool(b) => write!(f, "{}", b),
487 DExpr::LitStr(s) => write!(f, "\"{}\"", s),
488 DExpr::BinOp { op, left, right } => {
489 let op_str = match op {
490 DBinOp::Add => "+",
491 DBinOp::Sub => "-",
492 DBinOp::Mul => "*",
493 DBinOp::Div => "/",
494 DBinOp::Gt => ">",
495 DBinOp::Lt => "<",
496 DBinOp::Ge => ">=",
497 DBinOp::Le => "<=",
498 DBinOp::Eq => "==",
499 DBinOp::Ne => "!=",
500 DBinOp::And => "&&",
501 DBinOp::Or => "||",
502 };
503 write!(f, "({} {} {})", left, op_str, right)
504 }
505 DExpr::Agg(func, expr) => {
506 let name = match func {
507 AggFunc::Sum => "sum",
508 AggFunc::Mean => "mean",
509 AggFunc::Min => "min",
510 AggFunc::Max => "max",
511 AggFunc::Count => "count",
512 };
513 write!(f, "{}({})", name, expr)
514 }
515 DExpr::Count => write!(f, "count()"),
516 DExpr::FnCall(name, args) => {
517 let args_str: Vec<String> = args.iter().map(|a| format!("{}", a)).collect();
518 write!(f, "{}({})", name, args_str.join(", "))
519 }
520 DExpr::CumSum(e) => write!(f, "cumsum({})", e),
521 DExpr::CumProd(e) => write!(f, "cumprod({})", e),
522 DExpr::CumMax(e) => write!(f, "cummax({})", e),
523 DExpr::CumMin(e) => write!(f, "cummin({})", e),
524 DExpr::Lag(e, k) => write!(f, "lag({}, {})", e, k),
525 DExpr::Lead(e, k) => write!(f, "lead({}, {})", e, k),
526 DExpr::Rank(e) => write!(f, "rank({})", e),
527 DExpr::DenseRank(e) => write!(f, "dense_rank({})", e),
528 DExpr::RowNumber => write!(f, "row_number()"),
529 DExpr::RollingSum(col, w) => write!(f, "rolling_sum(\"{}\", {})", col, w),
530 DExpr::RollingMean(col, w) => write!(f, "rolling_mean(\"{}\", {})", col, w),
531 DExpr::RollingMin(col, w) => write!(f, "rolling_min(\"{}\", {})", col, w),
532 DExpr::RollingMax(col, w) => write!(f, "rolling_max(\"{}\", {})", col, w),
533 DExpr::RollingVar(col, w) => write!(f, "rolling_var(\"{}\", {})", col, w),
534 DExpr::RollingSd(col, w) => write!(f, "rolling_sd(\"{}\", {})", col, w),
535 }
536 }
537}
538
539#[derive(Debug, Clone)]
543pub enum LogicalPlan {
544 Scan {
546 source: DataFrame,
547 },
548 Filter {
550 input: Box<LogicalPlan>,
551 predicate: DExpr,
552 },
553 GroupBy {
555 input: Box<LogicalPlan>,
556 keys: Vec<String>,
557 },
558 Aggregate {
560 input: Box<LogicalPlan>,
561 keys: Vec<String>,
562 aggs: Vec<(String, DExpr)>,
563 },
564 Project {
566 input: Box<LogicalPlan>,
567 columns: Vec<String>,
568 },
569 InnerJoin {
571 left: Box<LogicalPlan>,
572 right: Box<LogicalPlan>,
573 left_on: String,
574 right_on: String,
575 },
576 LeftJoin {
578 left: Box<LogicalPlan>,
579 right: Box<LogicalPlan>,
580 left_on: String,
581 right_on: String,
582 },
583 CrossJoin {
585 left: Box<LogicalPlan>,
586 right: Box<LogicalPlan>,
587 },
588}
589
590impl LogicalPlan {
591 pub fn referenced_columns(&self) -> Vec<String> {
593 let mut cols = Vec::new();
594 self.collect_columns(&mut cols);
595 cols.sort();
596 cols.dedup();
597 cols
598 }
599
600 fn collect_columns(&self, cols: &mut Vec<String>) {
601 match self {
602 LogicalPlan::Scan { .. } => {}
603 LogicalPlan::Filter { input, predicate } => {
604 input.collect_columns(cols);
605 collect_expr_columns(predicate, cols);
606 }
607 LogicalPlan::GroupBy { input, keys } => {
608 input.collect_columns(cols);
609 cols.extend(keys.clone());
610 }
611 LogicalPlan::Aggregate {
612 input, keys, aggs, ..
613 } => {
614 input.collect_columns(cols);
615 cols.extend(keys.clone());
616 for (_, expr) in aggs {
617 collect_expr_columns(expr, cols);
618 }
619 }
620 LogicalPlan::Project { input, columns } => {
621 input.collect_columns(cols);
622 cols.extend(columns.clone());
623 }
624 LogicalPlan::InnerJoin {
625 left,
626 right,
627 left_on,
628 right_on,
629 }
630 | LogicalPlan::LeftJoin {
631 left,
632 right,
633 left_on,
634 right_on,
635 } => {
636 left.collect_columns(cols);
637 right.collect_columns(cols);
638 cols.push(left_on.clone());
639 cols.push(right_on.clone());
640 }
641 LogicalPlan::CrossJoin { left, right } => {
642 left.collect_columns(cols);
643 right.collect_columns(cols);
644 }
645 }
646 }
647}
648
649fn collect_expr_columns(expr: &DExpr, cols: &mut Vec<String>) {
650 match expr {
651 DExpr::Col(name) => cols.push(name.clone()),
652 DExpr::BinOp { left, right, .. } => {
653 collect_expr_columns(left, cols);
654 collect_expr_columns(right, cols);
655 }
656 DExpr::Agg(_, inner) => collect_expr_columns(inner, cols),
657 DExpr::FnCall(_, args) => {
658 for arg in args {
659 collect_expr_columns(arg, cols);
660 }
661 }
662 DExpr::CumSum(e) | DExpr::CumProd(e) | DExpr::CumMax(e) | DExpr::CumMin(e)
663 | DExpr::Lag(e, _) | DExpr::Lead(e, _) | DExpr::Rank(e) | DExpr::DenseRank(e) => {
664 collect_expr_columns(e, cols);
665 }
666 DExpr::RollingSum(col, _) | DExpr::RollingMean(col, _)
667 | DExpr::RollingMin(col, _) | DExpr::RollingMax(col, _)
668 | DExpr::RollingVar(col, _) | DExpr::RollingSd(col, _) => {
669 cols.push(col.clone());
670 }
671 _ => {}
672 }
673}
674
675pub fn optimize(plan: LogicalPlan) -> LogicalPlan {
679 let plan = push_down_predicates(plan);
680 let plan = prune_columns(plan);
681 plan
682}
683
684fn push_down_predicates(plan: LogicalPlan) -> LogicalPlan {
686 match plan {
687 LogicalPlan::Filter {
688 input,
689 predicate,
690 } => {
691 let optimized_input = push_down_predicates(*input);
692 match optimized_input {
693 LogicalPlan::GroupBy {
695 input: inner,
696 keys,
697 } => {
698 let pred_cols = {
699 let mut c = Vec::new();
700 collect_expr_columns(&predicate, &mut c);
701 c
702 };
703 let can_push = pred_cols.iter().all(|c| !keys.contains(c))
704 || pred_cols.iter().all(|c| {
705 !keys.contains(c) || keys.contains(c)
707 });
708 if can_push && pred_cols.iter().all(|c| !keys.contains(c)) {
710 LogicalPlan::GroupBy {
711 input: Box::new(LogicalPlan::Filter {
712 input: inner,
713 predicate,
714 }),
715 keys,
716 }
717 } else {
718 LogicalPlan::Filter {
719 input: Box::new(LogicalPlan::GroupBy {
720 input: inner,
721 keys,
722 }),
723 predicate,
724 }
725 }
726 }
727 other => LogicalPlan::Filter {
728 input: Box::new(other),
729 predicate,
730 },
731 }
732 }
733 LogicalPlan::GroupBy { input, keys } => LogicalPlan::GroupBy {
734 input: Box::new(push_down_predicates(*input)),
735 keys,
736 },
737 LogicalPlan::Aggregate {
738 input,
739 keys,
740 aggs,
741 } => LogicalPlan::Aggregate {
742 input: Box::new(push_down_predicates(*input)),
743 keys,
744 aggs,
745 },
746 LogicalPlan::Project { input, columns } => LogicalPlan::Project {
747 input: Box::new(push_down_predicates(*input)),
748 columns,
749 },
750 LogicalPlan::InnerJoin {
751 left,
752 right,
753 left_on,
754 right_on,
755 } => LogicalPlan::InnerJoin {
756 left: Box::new(push_down_predicates(*left)),
757 right: Box::new(push_down_predicates(*right)),
758 left_on,
759 right_on,
760 },
761 LogicalPlan::LeftJoin {
762 left,
763 right,
764 left_on,
765 right_on,
766 } => LogicalPlan::LeftJoin {
767 left: Box::new(push_down_predicates(*left)),
768 right: Box::new(push_down_predicates(*right)),
769 left_on,
770 right_on,
771 },
772 LogicalPlan::CrossJoin { left, right } => LogicalPlan::CrossJoin {
773 left: Box::new(push_down_predicates(*left)),
774 right: Box::new(push_down_predicates(*right)),
775 },
776 other => other,
777 }
778}
779
780fn prune_columns(plan: LogicalPlan) -> LogicalPlan {
782 plan
785}
786
787pub fn execute(plan: &LogicalPlan) -> Result<DataFrame, DataError> {
791 match plan {
792 LogicalPlan::Scan { source } => Ok(source.clone()),
793
794 LogicalPlan::Filter { input, predicate } => {
795 let df = execute(input)?;
796 execute_filter(&df, predicate)
797 }
798
799 LogicalPlan::GroupBy { input, keys: _ } => {
800 let df = execute(input)?;
802 Ok(df)
804 }
805
806 LogicalPlan::Aggregate { input, keys, aggs } => {
807 let df = execute(input)?;
808 execute_aggregate(&df, keys, aggs)
809 }
810
811 LogicalPlan::Project { input, columns } => {
812 let df = execute(input)?;
813 let projected = df
814 .columns
815 .into_iter()
816 .filter(|(name, _)| columns.contains(name))
817 .collect();
818 Ok(DataFrame { columns: projected })
819 }
820
821 LogicalPlan::InnerJoin {
822 left,
823 right,
824 left_on,
825 right_on,
826 } => {
827 let left_df = execute(left)?;
828 let right_df = execute(right)?;
829 execute_inner_join(&left_df, &right_df, left_on, right_on)
830 }
831
832 LogicalPlan::LeftJoin {
833 left,
834 right,
835 left_on,
836 right_on,
837 } => {
838 let left_df = execute(left)?;
839 let right_df = execute(right)?;
840 execute_left_join(&left_df, &right_df, left_on, right_on)
841 }
842
843 LogicalPlan::CrossJoin { left, right } => {
844 let left_df = execute(left)?;
845 let right_df = execute(right)?;
846 execute_cross_join(&left_df, &right_df)
847 }
848 }
849}
850
851fn execute_filter(df: &DataFrame, predicate: &DExpr) -> Result<DataFrame, DataError> {
852 let nrows = df.nrows();
853 let mut mask = vec![false; nrows];
854
855 for row in 0..nrows {
856 let val = eval_expr_row(df, predicate, row)?;
857 mask[row] = match val {
858 ExprValue::Bool(b) => b,
859 _ => return Err(DataError::InvalidOperation("filter predicate must be boolean".into())),
860 };
861 }
862
863 let mut new_columns = Vec::new();
864 for (name, col) in &df.columns {
865 let filtered = filter_column(col, &mask);
866 new_columns.push((name.clone(), filtered));
867 }
868
869 Ok(DataFrame {
870 columns: new_columns,
871 })
872}
873
874fn filter_column(col: &Column, mask: &[bool]) -> Column {
875 if matches!(col, Column::CategoricalAdaptive(_)) {
876 return filter_column(&col.to_legacy_categorical(), mask);
877 }
878 match col {
879 Column::Int(v) => Column::Int(
880 v.iter()
881 .zip(mask)
882 .filter(|(_, &m)| m)
883 .map(|(v, _)| *v)
884 .collect(),
885 ),
886 Column::Float(v) => Column::Float(
887 v.iter()
888 .zip(mask)
889 .filter(|(_, &m)| m)
890 .map(|(v, _)| *v)
891 .collect(),
892 ),
893 Column::Str(v) => Column::Str(
894 v.iter()
895 .zip(mask)
896 .filter(|(_, &m)| m)
897 .map(|(v, _)| v.clone())
898 .collect(),
899 ),
900 Column::Bool(v) => Column::Bool(
901 v.iter()
902 .zip(mask)
903 .filter(|(_, &m)| m)
904 .map(|(v, _)| *v)
905 .collect(),
906 ),
907 Column::Categorical { levels, codes } => Column::Categorical {
908 levels: levels.clone(),
909 codes: codes
910 .iter()
911 .zip(mask)
912 .filter(|(_, &m)| m)
913 .map(|(v, _)| *v)
914 .collect(),
915 },
916 Column::DateTime(v) => Column::DateTime(
917 v.iter()
918 .zip(mask)
919 .filter(|(_, &m)| m)
920 .map(|(v, _)| *v)
921 .collect(),
922 ),
923 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
924 }
925}
926
927fn execute_aggregate(
928 df: &DataFrame,
929 keys: &[String],
930 aggs: &[(String, DExpr)],
931) -> Result<DataFrame, DataError> {
932 let nrows = df.nrows();
934 let mut groups: BTreeMap<Vec<String>, Vec<usize>> = BTreeMap::new();
935
936 for row in 0..nrows {
937 let key: Vec<String> = keys
938 .iter()
939 .map(|k| {
940 df.get_column(k)
941 .map(|col| col.get_display(row))
942 .ok_or_else(|| DataError::ColumnNotFound(k.to_string()))
943 })
944 .collect::<Result<Vec<String>, DataError>>()?;
945 groups.entry(key).or_default().push(row);
946 }
947
948 let mut sorted_groups: Vec<(Vec<String>, Vec<usize>)> = groups.into_iter().collect();
950 sorted_groups.sort_by(|a, b| a.0.cmp(&b.0));
951
952 let mut result_columns: Vec<(String, Column)> = Vec::new();
954
955 for (key_idx, key_name) in keys.iter().enumerate() {
957 let values: Vec<String> = sorted_groups
958 .iter()
959 .map(|(key, _)| key[key_idx].clone())
960 .collect();
961 let source_col = df.get_column(key_name).ok_or_else(|| {
963 DataError::ColumnNotFound(key_name.clone())
964 })?;
965 match source_col {
966 Column::Int(_) => {
967 let int_vals: Vec<i64> = values.iter().map(|s| s.parse().unwrap_or(0)).collect();
968 result_columns.push((key_name.clone(), Column::Int(int_vals)));
969 }
970 Column::Str(_) => {
971 result_columns.push((key_name.clone(), Column::Str(values)));
972 }
973 _ => {
974 result_columns.push((key_name.clone(), Column::Str(values)));
975 }
976 }
977 }
978
979 for (agg_name, agg_expr) in aggs {
981 let mut values = Vec::new();
982 for (_, row_indices) in &sorted_groups {
983 let val = eval_agg_expr(df, agg_expr, row_indices)?;
984 values.push(val);
985 }
986 result_columns.push((agg_name.clone(), Column::Float(values)));
987 }
988
989 Ok(DataFrame {
990 columns: result_columns,
991 })
992}
993
994#[derive(Debug, Clone)]
997enum ExprValue {
998 Int(i64),
999 Float(f64),
1000 Str(String),
1001 Bool(bool),
1002}
1003
1004fn eval_expr_row(df: &DataFrame, expr: &DExpr, row: usize) -> Result<ExprValue, DataError> {
1005 match expr {
1006 DExpr::Col(name) => {
1007 let col = df
1008 .get_column(name)
1009 .ok_or_else(|| DataError::ColumnNotFound(name.clone()))?;
1010 match col {
1011 Column::Int(v) => Ok(ExprValue::Int(v[row])),
1012 Column::Float(v) => Ok(ExprValue::Float(v[row])),
1013 Column::Str(v) => Ok(ExprValue::Str(v[row].clone())),
1014 Column::Bool(v) => Ok(ExprValue::Bool(v[row])),
1015 Column::Categorical { levels, codes } => {
1016 Ok(ExprValue::Str(levels[codes[row] as usize].clone()))
1017 }
1018 Column::CategoricalAdaptive(cc) => Ok(ExprValue::Str(match cc.get(row) {
1019 None => String::new(),
1020 Some(b) => String::from_utf8_lossy(b).into_owned(),
1021 })),
1022 Column::DateTime(v) => Ok(ExprValue::Int(v[row])),
1023 }
1024 }
1025 DExpr::LitInt(v) => Ok(ExprValue::Int(*v)),
1026 DExpr::LitFloat(v) => Ok(ExprValue::Float(*v)),
1027 DExpr::LitBool(b) => Ok(ExprValue::Bool(*b)),
1028 DExpr::LitStr(s) => Ok(ExprValue::Str(s.clone())),
1029 DExpr::BinOp { op, left, right } => {
1030 let lv = eval_expr_row(df, left, row)?;
1031 let rv = eval_expr_row(df, right, row)?;
1032 eval_binop(*op, lv, rv)
1033 }
1034 DExpr::Agg(_, _) | DExpr::Count => Err(DataError::InvalidOperation(
1035 "aggregation not allowed in row context".into(),
1036 )),
1037 DExpr::FnCall(name, args) => {
1038 if args.len() != 1 {
1039 return Err(DataError::InvalidOperation(
1040 format!("FnCall '{}' requires exactly 1 argument, got {}", name, args.len()),
1041 ));
1042 }
1043 let val = eval_expr_row(df, &args[0], row)?;
1044 let x = match val {
1045 ExprValue::Float(f) => f,
1046 ExprValue::Int(i) => i as f64,
1047 _ => return Err(DataError::InvalidOperation(
1048 format!("FnCall '{}' requires numeric argument", name),
1049 )),
1050 };
1051 let result = match name.as_str() {
1052 "log" => x.ln(),
1053 "exp" => x.exp(),
1054 "sqrt" => x.sqrt(),
1055 "abs" => x.abs(),
1056 "ceil" => x.ceil(),
1057 "floor" => x.floor(),
1058 "round" => x.round(),
1059 "sin" => x.sin(),
1060 "cos" => x.cos(),
1061 "tan" => x.tan(),
1062 other => return Err(DataError::InvalidOperation(
1063 format!("unknown DExpr function: {}", other),
1064 )),
1065 };
1066 Ok(ExprValue::Float(result))
1067 }
1068 DExpr::CumSum(_) | DExpr::CumProd(_) | DExpr::CumMax(_) | DExpr::CumMin(_)
1069 | DExpr::Lag(_, _) | DExpr::Lead(_, _) | DExpr::Rank(_) | DExpr::DenseRank(_)
1070 | DExpr::RowNumber
1071 | DExpr::RollingSum(..) | DExpr::RollingMean(..) | DExpr::RollingMin(..)
1072 | DExpr::RollingMax(..) | DExpr::RollingVar(..) | DExpr::RollingSd(..) => {
1073 Err(DataError::InvalidOperation(
1074 "window function not allowed in row context; use eval_expr_column".into(),
1075 ))
1076 }
1077 }
1078}
1079
1080fn eval_binop(op: DBinOp, left: ExprValue, right: ExprValue) -> Result<ExprValue, DataError> {
1081 match (left, right) {
1082 (ExprValue::Int(a), ExprValue::Int(b)) => match op {
1083 DBinOp::Add => Ok(ExprValue::Int(a + b)),
1084 DBinOp::Sub => Ok(ExprValue::Int(a - b)),
1085 DBinOp::Mul => Ok(ExprValue::Int(a * b)),
1086 DBinOp::Div => Ok(ExprValue::Int(a / b)),
1087 DBinOp::Gt => Ok(ExprValue::Bool(a > b)),
1088 DBinOp::Lt => Ok(ExprValue::Bool(a < b)),
1089 DBinOp::Ge => Ok(ExprValue::Bool(a >= b)),
1090 DBinOp::Le => Ok(ExprValue::Bool(a <= b)),
1091 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1092 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1093 _ => Err(DataError::InvalidOperation(format!(
1094 "unsupported op {:?} on Int",
1095 op
1096 ))),
1097 },
1098 (ExprValue::Float(a), ExprValue::Float(b)) => match op {
1099 DBinOp::Add => Ok(ExprValue::Float(a + b)),
1100 DBinOp::Sub => Ok(ExprValue::Float(a - b)),
1101 DBinOp::Mul => Ok(ExprValue::Float(a * b)),
1102 DBinOp::Div => Ok(ExprValue::Float(a / b)),
1103 DBinOp::Gt => Ok(ExprValue::Bool(a > b)),
1104 DBinOp::Lt => Ok(ExprValue::Bool(a < b)),
1105 DBinOp::Ge => Ok(ExprValue::Bool(a >= b)),
1106 DBinOp::Le => Ok(ExprValue::Bool(a <= b)),
1107 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1108 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1109 _ => Err(DataError::InvalidOperation(format!(
1110 "unsupported op {:?} on Float",
1111 op
1112 ))),
1113 },
1114 (ExprValue::Int(a), ExprValue::Float(b)) => {
1116 eval_binop(op, ExprValue::Float(a as f64), ExprValue::Float(b))
1117 }
1118 (ExprValue::Float(a), ExprValue::Int(b)) => {
1119 eval_binop(op, ExprValue::Float(a), ExprValue::Float(b as f64))
1120 }
1121 (ExprValue::Bool(a), ExprValue::Bool(b)) => match op {
1122 DBinOp::And => Ok(ExprValue::Bool(a && b)),
1123 DBinOp::Or => Ok(ExprValue::Bool(a || b)),
1124 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1125 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1126 _ => Err(DataError::InvalidOperation(format!(
1127 "unsupported op {:?} on Bool",
1128 op
1129 ))),
1130 },
1131 (ExprValue::Str(a), ExprValue::Str(b)) => match op {
1132 DBinOp::Eq => Ok(ExprValue::Bool(a == b)),
1133 DBinOp::Ne => Ok(ExprValue::Bool(a != b)),
1134 _ => Err(DataError::InvalidOperation(format!(
1135 "unsupported op {:?} on String",
1136 op
1137 ))),
1138 },
1139 _ => Err(DataError::InvalidOperation(
1140 "type mismatch in binary operation".into(),
1141 )),
1142 }
1143}
1144
1145fn eval_agg_expr(
1146 df: &DataFrame,
1147 expr: &DExpr,
1148 rows: &[usize],
1149) -> Result<f64, DataError> {
1150 match expr {
1151 DExpr::Agg(func, inner) => {
1152 let values = extract_float_values(df, inner, rows)?;
1153 match func {
1154 AggFunc::Sum => Ok(kahan_sum_f64(&values)),
1155 AggFunc::Mean => {
1156 if values.is_empty() {
1157 Ok(0.0)
1158 } else {
1159 Ok(kahan_sum_f64(&values) / values.len() as f64)
1160 }
1161 }
1162 AggFunc::Min => Ok(values
1163 .iter()
1164 .cloned()
1165 .fold(f64::INFINITY, f64::min)),
1166 AggFunc::Max => Ok(values
1167 .iter()
1168 .cloned()
1169 .fold(f64::NEG_INFINITY, f64::max)),
1170 AggFunc::Count => Ok(values.len() as f64),
1171 }
1172 }
1173 DExpr::Count => Ok(rows.len() as f64),
1174 _ => Err(DataError::InvalidOperation(
1175 "expected aggregation expression".into(),
1176 )),
1177 }
1178}
1179
1180fn extract_float_values(
1181 df: &DataFrame,
1182 expr: &DExpr,
1183 rows: &[usize],
1184) -> Result<Vec<f64>, DataError> {
1185 match expr {
1186 DExpr::Col(name) => {
1187 let col = df
1188 .get_column(name)
1189 .ok_or_else(|| DataError::ColumnNotFound(name.clone()))?;
1190 let vals: Vec<f64> = match col {
1191 Column::Float(v) => rows.iter().map(|&r| v[r]).collect(),
1192 Column::Int(v) => rows.iter().map(|&r| v[r] as f64).collect(),
1193 _ => {
1194 return Err(DataError::InvalidOperation(format!(
1195 "cannot aggregate non-numeric column `{}`",
1196 name
1197 )))
1198 }
1199 };
1200 Ok(vals)
1201 }
1202 _ => Err(DataError::InvalidOperation(
1203 "expected column reference in aggregation".into(),
1204 )),
1205 }
1206}
1207
1208pub struct Pipeline {
1212 plan: LogicalPlan,
1213}
1214
1215impl Pipeline {
1216 pub fn scan(df: DataFrame) -> Self {
1218 Self {
1219 plan: LogicalPlan::Scan { source: df },
1220 }
1221 }
1222
1223 pub fn filter(self, predicate: DExpr) -> Self {
1225 Self {
1226 plan: LogicalPlan::Filter {
1227 input: Box::new(self.plan),
1228 predicate,
1229 },
1230 }
1231 }
1232
1233 pub fn group_by(self, keys: Vec<String>) -> Self {
1235 Self {
1236 plan: LogicalPlan::GroupBy {
1237 input: Box::new(self.plan),
1238 keys,
1239 },
1240 }
1241 }
1242
1243 pub fn summarize(self, keys: Vec<String>, aggs: Vec<(String, DExpr)>) -> Self {
1245 Self {
1246 plan: LogicalPlan::Aggregate {
1247 input: Box::new(self.plan),
1248 keys,
1249 aggs,
1250 },
1251 }
1252 }
1253
1254 pub fn select(self, columns: Vec<String>) -> Self {
1256 Self {
1257 plan: LogicalPlan::Project {
1258 input: Box::new(self.plan),
1259 columns,
1260 },
1261 }
1262 }
1263
1264 pub fn inner_join(self, right: DataFrame, left_on: &str, right_on: &str) -> Self {
1266 Self {
1267 plan: LogicalPlan::InnerJoin {
1268 left: Box::new(self.plan),
1269 right: Box::new(LogicalPlan::Scan { source: right }),
1270 left_on: left_on.to_string(),
1271 right_on: right_on.to_string(),
1272 },
1273 }
1274 }
1275
1276 pub fn left_join(self, right: DataFrame, left_on: &str, right_on: &str) -> Self {
1278 Self {
1279 plan: LogicalPlan::LeftJoin {
1280 left: Box::new(self.plan),
1281 right: Box::new(LogicalPlan::Scan { source: right }),
1282 left_on: left_on.to_string(),
1283 right_on: right_on.to_string(),
1284 },
1285 }
1286 }
1287
1288 pub fn cross_join(self, right: DataFrame) -> Self {
1290 Self {
1291 plan: LogicalPlan::CrossJoin {
1292 left: Box::new(self.plan),
1293 right: Box::new(LogicalPlan::Scan { source: right }),
1294 },
1295 }
1296 }
1297
1298 pub fn collect(self) -> Result<DataFrame, DataError> {
1300 let optimized = optimize(self.plan);
1301 execute(&optimized)
1302 }
1303
1304 pub fn plan(&self) -> &LogicalPlan {
1306 &self.plan
1307 }
1308}
1309
1310#[derive(Debug, Clone)]
1314pub enum DataError {
1315 ColumnNotFound(String),
1317 ColumnLengthMismatch {
1319 expected: usize,
1321 got: usize,
1323 column: String,
1325 },
1326 InvalidOperation(String),
1328}
1329
1330impl fmt::Display for DataError {
1331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1332 match self {
1333 DataError::ColumnNotFound(name) => write!(f, "column `{}` not found", name),
1334 DataError::ColumnLengthMismatch {
1335 expected,
1336 got,
1337 column,
1338 } => write!(
1339 f,
1340 "column `{}` has {} rows, expected {}",
1341 column, got, expected
1342 ),
1343 DataError::InvalidOperation(msg) => write!(f, "invalid operation: {}", msg),
1344 }
1345 }
1346}
1347
1348impl std::error::Error for DataError {}
1349
1350fn column_value_str(col: &Column, row: usize) -> String {
1354 match col {
1355 Column::Int(v) => v[row].to_string(),
1356 Column::Float(v) => v[row].to_string(),
1357 Column::Str(v) => v[row].clone(),
1358 Column::Bool(v) => v[row].to_string(),
1359 Column::Categorical { levels, codes } => levels[codes[row] as usize].clone(),
1360 Column::CategoricalAdaptive(cc) => match cc.get(row) {
1361 None => String::new(),
1362 Some(b) => String::from_utf8_lossy(b).into_owned(),
1363 },
1364 Column::DateTime(v) => v[row].to_string(),
1365 }
1366}
1367
1368fn execute_inner_join(
1369 left: &DataFrame,
1370 right: &DataFrame,
1371 left_on: &str,
1372 right_on: &str,
1373) -> Result<DataFrame, DataError> {
1374 let left_col = left.get_column(left_on)
1375 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in left", left_on)))?;
1376 let right_col = right.get_column(right_on)
1377 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in right", right_on)))?;
1378
1379 let right_nrows = right.nrows();
1381 let mut index: std::collections::BTreeMap<String, Vec<usize>> = std::collections::BTreeMap::new();
1382 for i in 0..right_nrows {
1383 let key = column_value_str(right_col, i);
1384 index.entry(key).or_default().push(i);
1385 }
1386
1387 let left_nrows = left.nrows();
1388 let mut left_indices = Vec::new();
1389 let mut right_indices = Vec::new();
1390
1391 for i in 0..left_nrows {
1392 let key = column_value_str(left_col, i);
1393 if let Some(matches) = index.get(&key) {
1394 for &j in matches {
1395 left_indices.push(i);
1396 right_indices.push(j);
1397 }
1398 }
1399 }
1400
1401 build_join_result(left, right, &left_indices, &right_indices, right_on)
1402}
1403
1404fn execute_left_join(
1405 left: &DataFrame,
1406 right: &DataFrame,
1407 left_on: &str,
1408 right_on: &str,
1409) -> Result<DataFrame, DataError> {
1410 let left_col = left.get_column(left_on)
1411 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in left", left_on)))?;
1412 let right_col = right.get_column(right_on)
1413 .ok_or_else(|| DataError::InvalidOperation(format!("join key `{}` not found in right", right_on)))?;
1414
1415 let right_nrows = right.nrows();
1416 let mut index: std::collections::BTreeMap<String, Vec<usize>> = std::collections::BTreeMap::new();
1417 for i in 0..right_nrows {
1418 let key = column_value_str(right_col, i);
1419 index.entry(key).or_default().push(i);
1420 }
1421
1422 let left_nrows = left.nrows();
1423 let mut left_indices = Vec::new();
1424 let mut right_indices: Vec<Option<usize>> = Vec::new();
1425
1426 for i in 0..left_nrows {
1427 let key = column_value_str(left_col, i);
1428 if let Some(matches) = index.get(&key) {
1429 for &j in matches {
1430 left_indices.push(i);
1431 right_indices.push(Some(j));
1432 }
1433 } else {
1434 left_indices.push(i);
1435 right_indices.push(None);
1436 }
1437 }
1438
1439 build_left_join_result(left, right, &left_indices, &right_indices, right_on)
1440}
1441
1442fn execute_cross_join(left: &DataFrame, right: &DataFrame) -> Result<DataFrame, DataError> {
1443 let left_nrows = left.nrows();
1444 let right_nrows = right.nrows();
1445 let mut left_indices = Vec::with_capacity(left_nrows * right_nrows);
1446 let mut right_indices = Vec::with_capacity(left_nrows * right_nrows);
1447
1448 for i in 0..left_nrows {
1449 for j in 0..right_nrows {
1450 left_indices.push(i);
1451 right_indices.push(j);
1452 }
1453 }
1454
1455 build_join_result(left, right, &left_indices, &right_indices, "")
1456}
1457
1458fn build_join_result(
1459 left: &DataFrame,
1460 right: &DataFrame,
1461 left_indices: &[usize],
1462 right_indices: &[usize],
1463 right_on: &str,
1464) -> Result<DataFrame, DataError> {
1465 let mut columns = Vec::new();
1466
1467 for (name, col) in &left.columns {
1469 columns.push((name.clone(), gather_column(col, left_indices)));
1470 }
1471
1472 for (name, col) in &right.columns {
1474 if name == right_on {
1475 continue;
1476 }
1477 let out_name = if left.get_column(name).is_some() {
1478 format!("{}_right", name)
1479 } else {
1480 name.clone()
1481 };
1482 columns.push((out_name, gather_column(col, right_indices)));
1483 }
1484
1485 Ok(DataFrame { columns })
1486}
1487
1488fn build_left_join_result(
1489 left: &DataFrame,
1490 right: &DataFrame,
1491 left_indices: &[usize],
1492 right_indices: &[Option<usize>],
1493 right_on: &str,
1494) -> Result<DataFrame, DataError> {
1495 let mut columns = Vec::new();
1496
1497 for (name, col) in &left.columns {
1498 columns.push((name.clone(), gather_column(col, left_indices)));
1499 }
1500
1501 for (name, col) in &right.columns {
1502 if name == right_on {
1503 continue;
1504 }
1505 let out_name = if left.get_column(name).is_some() {
1506 format!("{}_right", name)
1507 } else {
1508 name.clone()
1509 };
1510 columns.push((out_name, gather_column_nullable(col, right_indices)));
1511 }
1512
1513 Ok(DataFrame { columns })
1514}
1515
1516fn gather_column(col: &Column, indices: &[usize]) -> Column {
1517 if matches!(col, Column::CategoricalAdaptive(_)) {
1518 return gather_column(&col.to_legacy_categorical(), indices);
1519 }
1520 match col {
1521 Column::Int(v) => Column::Int(indices.iter().map(|&i| v[i]).collect()),
1522 Column::Float(v) => Column::Float(indices.iter().map(|&i| v[i]).collect()),
1523 Column::Str(v) => Column::Str(indices.iter().map(|&i| v[i].clone()).collect()),
1524 Column::Bool(v) => Column::Bool(indices.iter().map(|&i| v[i]).collect()),
1525 Column::Categorical { levels, codes } => Column::Categorical {
1526 levels: levels.clone(),
1527 codes: indices.iter().map(|&i| codes[i]).collect(),
1528 },
1529 Column::DateTime(v) => Column::DateTime(indices.iter().map(|&i| v[i]).collect()),
1530 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
1531 }
1532}
1533
1534fn gather_column_nullable(col: &Column, indices: &[Option<usize>]) -> Column {
1535 if matches!(col, Column::CategoricalAdaptive(_)) {
1536 return gather_column_nullable(&col.to_legacy_categorical(), indices);
1537 }
1538 match col {
1539 Column::Int(v) => Column::Int(indices.iter().map(|opt| opt.map_or(0, |i| v[i])).collect()),
1540 Column::Float(v) => Column::Float(indices.iter().map(|opt| opt.map_or(f64::NAN, |i| v[i])).collect()),
1541 Column::Str(v) => Column::Str(indices.iter().map(|opt| opt.map_or_else(String::new, |i| v[i].clone())).collect()),
1542 Column::Bool(v) => Column::Bool(indices.iter().map(|opt| opt.map_or(false, |i| v[i])).collect()),
1543 Column::Categorical { levels, codes } => Column::Categorical {
1544 levels: levels.clone(),
1545 codes: indices.iter().map(|opt| opt.map_or(0, |i| codes[i])).collect(),
1546 },
1547 Column::DateTime(v) => Column::DateTime(indices.iter().map(|opt| opt.map_or(0, |i| v[i])).collect()),
1548 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
1549 }
1550}
1551
1552#[cfg(test)]
1555mod tests {
1556 use super::*;
1557
1558 fn sample_df() -> DataFrame {
1559 DataFrame::from_columns(vec![
1560 (
1561 "name".into(),
1562 Column::Str(vec![
1563 "Alice".into(),
1564 "Bob".into(),
1565 "Carol".into(),
1566 "Dave".into(),
1567 "Eve".into(),
1568 "Frank".into(),
1569 ]),
1570 ),
1571 (
1572 "dept".into(),
1573 Column::Str(vec![
1574 "eng".into(),
1575 "eng".into(),
1576 "sales".into(),
1577 "eng".into(),
1578 "sales".into(),
1579 "eng".into(),
1580 ]),
1581 ),
1582 (
1583 "salary".into(),
1584 Column::Float(vec![95000.0, 102000.0, 78000.0, 110000.0, 82000.0, 98000.0]),
1585 ),
1586 (
1587 "tenure".into(),
1588 Column::Int(vec![3, 7, 2, 10, 1, 5]),
1589 ),
1590 ])
1591 .unwrap()
1592 }
1593
1594 #[test]
1595 fn test_dataframe_creation() {
1596 let df = sample_df();
1597 assert_eq!(df.nrows(), 6);
1598 assert_eq!(df.ncols(), 4);
1599 assert_eq!(
1600 df.column_names(),
1601 vec!["name", "dept", "salary", "tenure"]
1602 );
1603 }
1604
1605 #[test]
1606 fn test_filter() {
1607 let df = sample_df();
1608
1609 let result = Pipeline::scan(df)
1611 .filter(DExpr::BinOp {
1612 op: DBinOp::Gt,
1613 left: Box::new(DExpr::Col("tenure".into())),
1614 right: Box::new(DExpr::LitInt(2)),
1615 })
1616 .collect()
1617 .unwrap();
1618
1619 assert_eq!(result.nrows(), 4); }
1621
1622 #[test]
1623 fn test_group_by_summarize() {
1624 let df = sample_df();
1625
1626 let result = Pipeline::scan(df)
1627 .summarize(
1628 vec!["dept".into()],
1629 vec![
1630 (
1631 "avg_salary".into(),
1632 DExpr::Agg(AggFunc::Mean, Box::new(DExpr::Col("salary".into()))),
1633 ),
1634 ("headcount".into(), DExpr::Count),
1635 ],
1636 )
1637 .collect()
1638 .unwrap();
1639
1640 assert_eq!(result.nrows(), 2); let dept_col = result.get_column("dept").unwrap();
1644 let avg_col = result.get_column("avg_salary").unwrap();
1645 let count_col = result.get_column("headcount").unwrap();
1646
1647 if let (Column::Str(depts), Column::Float(avgs), Column::Float(counts)) =
1648 (dept_col, avg_col, count_col)
1649 {
1650 let eng_idx = depts.iter().position(|d| d == "eng").unwrap();
1651 let sales_idx = depts.iter().position(|d| d == "sales").unwrap();
1652
1653 assert!((avgs[eng_idx] - 101250.0).abs() < 0.01);
1655 assert!((counts[eng_idx] - 4.0).abs() < 0.01);
1656
1657 assert!((avgs[sales_idx] - 80000.0).abs() < 0.01);
1659 assert!((counts[sales_idx] - 2.0).abs() < 0.01);
1660 } else {
1661 panic!("unexpected column types");
1662 }
1663 }
1664
1665 #[test]
1666 fn test_filter_then_aggregate() {
1667 let df = sample_df();
1668
1669 let result = Pipeline::scan(df)
1671 .filter(DExpr::BinOp {
1672 op: DBinOp::Gt,
1673 left: Box::new(DExpr::Col("tenure".into())),
1674 right: Box::new(DExpr::LitInt(2)),
1675 })
1676 .summarize(
1677 vec!["dept".into()],
1678 vec![
1679 (
1680 "avg_salary".into(),
1681 DExpr::Agg(AggFunc::Mean, Box::new(DExpr::Col("salary".into()))),
1682 ),
1683 (
1684 "max_tenure".into(),
1685 DExpr::Agg(AggFunc::Max, Box::new(DExpr::Col("tenure".into()))),
1686 ),
1687 ("headcount".into(), DExpr::Count),
1688 ],
1689 )
1690 .collect()
1691 .unwrap();
1692
1693 assert_eq!(result.nrows(), 1);
1696
1697 if let Column::Float(avgs) = result.get_column("avg_salary").unwrap() {
1698 assert!((avgs[0] - 101250.0).abs() < 0.01);
1700 }
1701 if let Column::Float(maxes) = result.get_column("max_tenure").unwrap() {
1702 assert!((maxes[0] - 10.0).abs() < 0.01);
1703 }
1704 }
1705
1706 #[test]
1707 fn test_to_tensor_data() {
1708 let df = DataFrame::from_columns(vec![
1709 ("x".into(), Column::Float(vec![1.0, 2.0, 3.0])),
1710 ("y".into(), Column::Float(vec![4.0, 5.0, 6.0])),
1711 ])
1712 .unwrap();
1713
1714 let (data, shape) = df.to_tensor_data(&["x", "y"]).unwrap();
1715 assert_eq!(shape, vec![3, 2]);
1716 assert_eq!(data, vec![1.0, 4.0, 2.0, 5.0, 3.0, 6.0]);
1717 }
1718
1719 #[test]
1720 fn test_display() {
1721 let df = DataFrame::from_columns(vec![
1722 ("x".into(), Column::Int(vec![1, 2, 3])),
1723 ("y".into(), Column::Float(vec![4.5, 5.5, 6.5])),
1724 ])
1725 .unwrap();
1726
1727 let output = format!("{}", df);
1728 assert!(output.contains("x"));
1729 assert!(output.contains("y"));
1730 assert!(output.contains("4.5"));
1731 }
1732
1733 #[test]
1734 fn test_column_not_found() {
1735 let df = sample_df();
1736 let result = Pipeline::scan(df)
1737 .filter(DExpr::BinOp {
1738 op: DBinOp::Gt,
1739 left: Box::new(DExpr::Col("nonexistent".into())),
1740 right: Box::new(DExpr::LitInt(0)),
1741 })
1742 .collect();
1743
1744 assert!(result.is_err());
1745 }
1746
1747 #[test]
1748 fn test_aggregation_functions() {
1749 let df = DataFrame::from_columns(vec![
1750 ("group".into(), Column::Str(vec!["a".into(), "a".into(), "a".into()])),
1751 ("val".into(), Column::Float(vec![10.0, 20.0, 30.0])),
1752 ])
1753 .unwrap();
1754
1755 let result = Pipeline::scan(df)
1756 .summarize(
1757 vec!["group".into()],
1758 vec![
1759 ("total".into(), DExpr::Agg(AggFunc::Sum, Box::new(DExpr::Col("val".into())))),
1760 ("avg".into(), DExpr::Agg(AggFunc::Mean, Box::new(DExpr::Col("val".into())))),
1761 ("lo".into(), DExpr::Agg(AggFunc::Min, Box::new(DExpr::Col("val".into())))),
1762 ("hi".into(), DExpr::Agg(AggFunc::Max, Box::new(DExpr::Col("val".into())))),
1763 ("n".into(), DExpr::Count),
1764 ],
1765 )
1766 .collect()
1767 .unwrap();
1768
1769 if let Column::Float(totals) = result.get_column("total").unwrap() {
1770 assert!((totals[0] - 60.0).abs() < 0.01);
1771 }
1772 if let Column::Float(avgs) = result.get_column("avg").unwrap() {
1773 assert!((avgs[0] - 20.0).abs() < 0.01);
1774 }
1775 if let Column::Float(mins) = result.get_column("lo").unwrap() {
1776 assert!((mins[0] - 10.0).abs() < 0.01);
1777 }
1778 if let Column::Float(maxs) = result.get_column("hi").unwrap() {
1779 assert!((maxs[0] - 30.0).abs() < 0.01);
1780 }
1781 if let Column::Float(counts) = result.get_column("n").unwrap() {
1782 assert!((counts[0] - 3.0).abs() < 0.01);
1783 }
1784 }
1785
1786 #[test]
1787 fn test_empty_dataframe() {
1788 let df = DataFrame::new();
1789 assert_eq!(df.nrows(), 0);
1790 assert_eq!(df.ncols(), 0);
1791 }
1792
1793 #[test]
1794 fn test_expr_display() {
1795 let expr = DExpr::BinOp {
1796 op: DBinOp::Gt,
1797 left: Box::new(DExpr::Col("age".into())),
1798 right: Box::new(DExpr::LitInt(18)),
1799 };
1800 assert_eq!(format!("{}", expr), "(col(\"age\") > 18)");
1801 }
1802
1803 #[test]
1806 fn test_categorical_column_basics() {
1807 let col = Column::Categorical {
1808 levels: vec!["bird".into(), "cat".into(), "dog".into()],
1809 codes: vec![1, 2, 1, 0],
1810 };
1811 assert_eq!(col.len(), 4);
1812 assert_eq!(col.type_name(), "Categorical");
1813 assert_eq!(col.get_display(0), "cat");
1814 assert_eq!(col.get_display(1), "dog");
1815 assert_eq!(col.get_display(2), "cat");
1816 assert_eq!(col.get_display(3), "bird");
1817 }
1818
1819 #[test]
1820 fn test_datetime_column_basics() {
1821 let col = Column::DateTime(vec![1000, 2000, 3000]);
1822 assert_eq!(col.len(), 3);
1823 assert_eq!(col.type_name(), "DateTime");
1824 assert_eq!(col.get_display(0), "1000ms");
1825 assert_eq!(col.get_display(1), "2000ms");
1826 }
1827
1828 #[test]
1829 fn test_label_encode() {
1830 let data: Vec<String> = vec!["cat".into(), "dog".into(), "cat".into(), "bird".into()];
1831 let (levels, codes) = label_encode(&data);
1832 assert_eq!(levels, vec!["bird", "cat", "dog"]);
1833 assert_eq!(codes, vec![1, 2, 1, 0]);
1834 }
1835
1836 #[test]
1837 fn test_label_encode_empty() {
1838 let data: Vec<String> = vec![];
1839 let (levels, codes) = label_encode(&data);
1840 assert!(levels.is_empty());
1841 assert!(codes.is_empty());
1842 }
1843
1844 #[test]
1845 fn test_label_encode_single_level() {
1846 let data: Vec<String> = vec!["x".into(), "x".into(), "x".into()];
1847 let (levels, codes) = label_encode(&data);
1848 assert_eq!(levels, vec!["x"]);
1849 assert_eq!(codes, vec![0, 0, 0]);
1850 }
1851
1852 #[test]
1853 fn test_label_encode_deterministic() {
1854 let data: Vec<String> = vec!["z".into(), "a".into(), "m".into(), "a".into(), "z".into()];
1856 let (levels1, codes1) = label_encode(&data);
1857 let (levels2, codes2) = label_encode(&data);
1858 assert_eq!(levels1, levels2);
1859 assert_eq!(codes1, codes2);
1860 assert_eq!(levels1, vec!["a", "m", "z"]);
1862 }
1863
1864 #[test]
1865 fn test_ordinal_encode() {
1866 let data: Vec<String> = vec!["low".into(), "high".into(), "medium".into(), "low".into()];
1867 let order: Vec<String> = vec!["low".into(), "medium".into(), "high".into()];
1868 let (levels, codes) = ordinal_encode(&data, &order).unwrap();
1869 assert_eq!(levels, vec!["low", "medium", "high"]);
1870 assert_eq!(codes, vec![0, 2, 1, 0]);
1871 }
1872
1873 #[test]
1874 fn test_ordinal_encode_missing_value() {
1875 let data: Vec<String> = vec!["low".into(), "unknown".into()];
1876 let order: Vec<String> = vec!["low".into(), "medium".into(), "high".into()];
1877 let result = ordinal_encode(&data, &order);
1878 assert!(result.is_err());
1879 assert!(result.unwrap_err().contains("unknown"));
1880 }
1881
1882 #[test]
1883 fn test_one_hot_encode() {
1884 let levels = vec!["bird".to_string(), "cat".to_string(), "dog".to_string()];
1885 let codes = vec![1u32, 2, 1, 0];
1886 let (names, cols) = one_hot_encode(&levels, &codes);
1887 assert_eq!(names, vec!["bird", "cat", "dog"]);
1888 assert_eq!(cols.len(), 3);
1889 assert_eq!(cols[0], vec![false, false, false, true]);
1891 assert_eq!(cols[1], vec![true, false, true, false]);
1893 assert_eq!(cols[2], vec![false, true, false, false]);
1895
1896 for row in 0..4 {
1898 let count: usize = cols.iter().map(|c| if c[row] { 1 } else { 0 }).sum();
1899 assert_eq!(count, 1, "row {} should have exactly one true", row);
1900 }
1901 }
1902
1903 #[test]
1904 fn test_one_hot_encode_empty() {
1905 let levels = vec!["a".to_string(), "b".to_string()];
1906 let codes: Vec<u32> = vec![];
1907 let (names, cols) = one_hot_encode(&levels, &codes);
1908 assert_eq!(names.len(), 2);
1909 assert!(cols[0].is_empty());
1910 assert!(cols[1].is_empty());
1911 }
1912
1913 #[test]
1914 fn test_categorical_column_in_dataframe() {
1915 let data: Vec<String> = vec!["cat".into(), "dog".into(), "cat".into()];
1916 let (levels, codes) = label_encode(&data);
1917 let df = DataFrame::from_columns(vec![
1918 ("animal".into(), Column::Categorical { levels, codes }),
1919 ("score".into(), Column::Float(vec![1.0, 2.0, 3.0])),
1920 ])
1921 .unwrap();
1922 assert_eq!(df.nrows(), 3);
1923 assert_eq!(df.ncols(), 2);
1924 assert_eq!(df.get_column("animal").unwrap().type_name(), "Categorical");
1925 }
1926
1927 #[test]
1928 fn test_datetime_column_in_dataframe() {
1929 let df = DataFrame::from_columns(vec![
1930 ("ts".into(), Column::DateTime(vec![1000, 2000, 3000])),
1931 ("val".into(), Column::Float(vec![1.0, 2.0, 3.0])),
1932 ])
1933 .unwrap();
1934 assert_eq!(df.nrows(), 3);
1935 assert_eq!(df.get_column("ts").unwrap().type_name(), "DateTime");
1936 }
1937
1938 #[test]
1939 fn test_label_encode_to_column_roundtrip() {
1940 let data: Vec<String> = vec!["cat".into(), "dog".into(), "cat".into(), "bird".into()];
1941 let (levels, codes) = label_encode(&data);
1942 let col = Column::Categorical { levels: levels.clone(), codes: codes.clone() };
1943 for (i, original) in data.iter().enumerate() {
1945 assert_eq!(col.get_display(i), *original);
1946 }
1947 }
1948}
1949
1950impl DataFrame {
1955 pub fn to_tensor(
1962 &self,
1963 col_names: &[&str],
1964 ) -> Result<cjc_runtime::Tensor, DataError> {
1965 let (data, shape) = self.to_tensor_data(col_names)?;
1966 cjc_runtime::Tensor::from_vec(data, &shape)
1967 .map_err(|e| DataError::InvalidOperation(format!("tensor conversion: {}", e)))
1968 }
1969
1970 pub fn push_row(&mut self, values: &[&str]) -> Result<(), DataError> {
1979 if values.len() != self.ncols() {
1980 return Err(DataError::ColumnLengthMismatch {
1981 expected: self.ncols(),
1982 got: values.len(),
1983 column: "row".to_string(),
1984 });
1985 }
1986 for (i, (_, col)) in self.columns.iter_mut().enumerate() {
1987 let s = values[i];
1988 match col {
1989 Column::Float(v) => v.push(s.trim().parse::<f64>().unwrap_or(0.0)),
1990 Column::Int(v) => v.push(s.trim().parse::<i64>().unwrap_or(0)),
1991 Column::Str(v) => v.push(s.to_string()),
1992 Column::Bool(v) => v.push(matches!(s.trim(), "true" | "1")),
1993 Column::Categorical { .. } => {
1994 }
1996 Column::CategoricalAdaptive(_) => {
1997 }
1999 Column::DateTime(v) => v.push(s.trim().parse::<i64>().unwrap_or(0)),
2000 }
2001 }
2002 Ok(())
2003 }
2004}
2005
2006#[derive(Debug, Clone, PartialEq, Eq)]
2022pub struct BitMask {
2023 words: Vec<u64>,
2024 nrows: usize,
2025}
2026
2027impl BitMask {
2028 pub fn all_true(nrows: usize) -> Self {
2030 let nwords = nwords_for(nrows);
2031 let mut words = vec![u64::MAX; nwords];
2032 if nrows % 64 != 0 && nwords > 0 {
2034 let tail = nrows % 64;
2035 words[nwords - 1] = (1u64 << tail) - 1;
2036 }
2037 BitMask { words, nrows }
2038 }
2039
2040 pub fn all_false(nrows: usize) -> Self {
2042 let nwords = nwords_for(nrows);
2043 BitMask {
2044 words: vec![0u64; nwords],
2045 nrows,
2046 }
2047 }
2048
2049 pub fn from_bools(bools: &[bool]) -> Self {
2051 let nrows = bools.len();
2052 let nwords = nwords_for(nrows);
2053 let mut words = vec![0u64; nwords];
2054 for (i, &b) in bools.iter().enumerate() {
2055 if b {
2056 words[i / 64] |= 1u64 << (i % 64);
2057 }
2058 }
2059 BitMask { words, nrows }
2060 }
2061
2062 #[inline]
2064 pub fn get(&self, i: usize) -> bool {
2065 debug_assert!(i < self.nrows);
2066 (self.words[i / 64] >> (i % 64)) & 1 == 1
2067 }
2068
2069 pub fn count_ones(&self) -> usize {
2071 self.words.iter().map(|w| w.count_ones() as usize).sum()
2072 }
2073
2074 pub fn and(&self, other: &BitMask) -> BitMask {
2078 assert_eq!(
2079 self.nrows, other.nrows,
2080 "BitMask::and: nrows mismatch ({} vs {})",
2081 self.nrows, other.nrows
2082 );
2083 let words = self
2084 .words
2085 .iter()
2086 .zip(other.words.iter())
2087 .map(|(a, b)| a & b)
2088 .collect();
2089 BitMask {
2090 words,
2091 nrows: self.nrows,
2092 }
2093 }
2094
2095 pub fn iter_set(&self) -> impl Iterator<Item = usize> + '_ {
2097 (0..self.nrows).filter(move |&i| self.get(i))
2098 }
2099
2100 pub fn nrows(&self) -> usize {
2102 self.nrows
2103 }
2104
2105 pub fn nwords(&self) -> usize {
2107 self.words.len()
2108 }
2109
2110 pub fn words_slice(&self) -> &[u64] {
2113 &self.words
2114 }
2115
2116 pub fn from_words_for_test(words: Vec<u64>, nrows: usize) -> Self {
2121 debug_assert_eq!(words.len(), nwords_for(nrows));
2122 BitMask { words, nrows }
2123 }
2124}
2125
2126#[inline]
2127pub(crate) fn nwords_for(nrows: usize) -> usize {
2128 (nrows + 63) / 64
2129}
2130
2131#[derive(Debug, Clone, PartialEq, Eq)]
2139pub struct ProjectionMap {
2140 indices: Vec<usize>,
2142}
2143
2144impl ProjectionMap {
2145 pub fn identity(ncols: usize) -> Self {
2147 ProjectionMap {
2148 indices: (0..ncols).collect(),
2149 }
2150 }
2151
2152 pub fn from_indices(indices: Vec<usize>) -> Self {
2154 ProjectionMap { indices }
2155 }
2156
2157 pub fn len(&self) -> usize {
2159 self.indices.len()
2160 }
2161
2162 pub fn is_empty(&self) -> bool {
2164 self.indices.is_empty()
2165 }
2166
2167 pub fn indices(&self) -> &[usize] {
2169 &self.indices
2170 }
2171}
2172
2173#[derive(Debug, Clone)]
2184pub struct TidyView {
2185 base: Rc<DataFrame>,
2186 mask: AdaptiveSelection,
2187 proj: ProjectionMap,
2188}
2189
2190fn try_eval_predicate_columnar(
2204 base: &DataFrame,
2205 predicate: &DExpr,
2206 existing_mask: &BitMask,
2207) -> Option<BitMask> {
2208 match predicate {
2209 DExpr::BinOp {
2211 op: DBinOp::And,
2212 left,
2213 right,
2214 } => {
2215 let lmask = try_eval_predicate_columnar(base, left, existing_mask)?;
2216 let rmask = try_eval_predicate_columnar(base, right, &lmask)?;
2217 Some(rmask)
2218 }
2219 DExpr::BinOp {
2222 op: DBinOp::Or,
2223 left,
2224 right,
2225 } => {
2226 let all_mask = BitMask::all_true(existing_mask.nrows);
2229 let lmask = try_eval_predicate_columnar(base, left, &all_mask)?;
2230 let rmask = try_eval_predicate_columnar(base, right, &all_mask)?;
2231 let nrows = existing_mask.nrows;
2233 let or_words: Vec<u64> = lmask
2234 .words
2235 .iter()
2236 .zip(rmask.words.iter())
2237 .map(|(a, b)| a | b)
2238 .collect();
2239 let final_words: Vec<u64> = or_words
2241 .iter()
2242 .zip(existing_mask.words.iter())
2243 .map(|(a, b)| a & b)
2244 .collect();
2245 Some(BitMask {
2246 words: final_words,
2247 nrows,
2248 })
2249 }
2250 DExpr::BinOp { op, left, right } => {
2252 if !matches!(
2254 op,
2255 DBinOp::Gt | DBinOp::Lt | DBinOp::Ge | DBinOp::Le | DBinOp::Eq | DBinOp::Ne
2256 ) {
2257 return None;
2258 }
2259
2260 enum LitVal {
2263 F(f64),
2264 I(i64),
2265 }
2266
2267 let (col_name, lit, reversed) = match (left.as_ref(), right.as_ref()) {
2268 (DExpr::Col(name), DExpr::LitFloat(v)) => (name.as_str(), LitVal::F(*v), false),
2269 (DExpr::LitFloat(v), DExpr::Col(name)) => (name.as_str(), LitVal::F(*v), true),
2270 (DExpr::Col(name), DExpr::LitInt(v)) => (name.as_str(), LitVal::I(*v), false),
2271 (DExpr::LitInt(v), DExpr::Col(name)) => (name.as_str(), LitVal::I(*v), true),
2272 _ => return None,
2273 };
2274
2275 let column = base.get_column(col_name)?;
2276
2277 let effective_op = if reversed {
2279 match op {
2280 DBinOp::Gt => DBinOp::Lt,
2281 DBinOp::Lt => DBinOp::Gt,
2282 DBinOp::Ge => DBinOp::Le,
2283 DBinOp::Le => DBinOp::Ge,
2284 other => *other, }
2286 } else {
2287 *op
2288 };
2289
2290 let nrows = existing_mask.nrows;
2291 let nwords = nwords_for(nrows);
2292 let mut words = vec![0u64; nwords];
2293
2294 match (column, &lit) {
2295 (Column::Float(data), LitVal::F(v)) => {
2297 columnar_cmp_f64(data, *v, effective_op, &mut words);
2298 }
2299 (Column::Float(data), LitVal::I(v)) => {
2301 columnar_cmp_f64(data, *v as f64, effective_op, &mut words);
2302 }
2303 (Column::Int(data), LitVal::I(v)) => {
2305 columnar_cmp_i64(data, *v, effective_op, &mut words);
2306 }
2307 (Column::Int(data), LitVal::F(v)) => {
2309 let floats: Vec<f64> = data.iter().map(|&x| x as f64).collect();
2311 columnar_cmp_f64(&floats, *v, effective_op, &mut words);
2312 }
2313 _ => return None,
2314 }
2315
2316 for (w, ew) in words.iter_mut().zip(existing_mask.words.iter()) {
2318 *w &= *ew;
2319 }
2320
2321 Some(BitMask { words, nrows })
2322 }
2323 _ => None,
2324 }
2325}
2326
2327#[inline]
2331pub(crate) fn columnar_cmp_f64(data: &[f64], lit: f64, op: DBinOp, out_words: &mut [u64]) {
2332 for (i, &val) in data.iter().enumerate() {
2333 let pass = match op {
2334 DBinOp::Gt => val > lit,
2335 DBinOp::Lt => val < lit,
2336 DBinOp::Ge => val >= lit,
2337 DBinOp::Le => val <= lit,
2338 DBinOp::Eq => val == lit,
2339 DBinOp::Ne => val != lit,
2340 _ => false,
2341 };
2342 if pass {
2343 out_words[i / 64] |= 1u64 << (i % 64);
2344 }
2345 }
2346}
2347
2348#[inline]
2351pub(crate) fn columnar_cmp_i64(data: &[i64], lit: i64, op: DBinOp, out_words: &mut [u64]) {
2352 for (i, &val) in data.iter().enumerate() {
2353 let pass = match op {
2354 DBinOp::Gt => val > lit,
2355 DBinOp::Lt => val < lit,
2356 DBinOp::Ge => val >= lit,
2357 DBinOp::Le => val <= lit,
2358 DBinOp::Eq => val == lit,
2359 DBinOp::Ne => val != lit,
2360 _ => false,
2361 };
2362 if pass {
2363 out_words[i / 64] |= 1u64 << (i % 64);
2364 }
2365 }
2366}
2367
2368impl TidyView {
2369 pub fn from_df(df: DataFrame) -> Self {
2373 let nrows = df.nrows();
2374 let ncols = df.ncols();
2375 TidyView {
2376 base: Rc::new(df),
2377 mask: AdaptiveSelection::all(nrows),
2378 proj: ProjectionMap::identity(ncols),
2379 }
2380 }
2381
2382 pub fn from_rc(df: Rc<DataFrame>) -> Self {
2384 let nrows = df.nrows();
2385 let ncols = df.ncols();
2386 TidyView {
2387 base: df,
2388 mask: AdaptiveSelection::all(nrows),
2389 proj: ProjectionMap::identity(ncols),
2390 }
2391 }
2392
2393 pub fn explain_selection_mode(&self) -> &'static str {
2396 self.mask.explain_selection_mode()
2397 }
2398
2399 pub fn nrows(&self) -> usize {
2403 self.mask.count()
2404 }
2405
2406 pub fn ncols(&self) -> usize {
2408 self.proj.len()
2409 }
2410
2411 pub fn column_names(&self) -> Vec<&str> {
2413 self.proj
2414 .indices()
2415 .iter()
2416 .map(|&ci| self.base.columns[ci].0.as_str())
2417 .collect()
2418 }
2419
2420 pub fn filter(&self, predicate: &DExpr) -> Result<TidyView, TidyError> {
2433 validate_expr_columns_proj(predicate, &self.base, &self.proj)?;
2435
2436 let nrows_base = self.base.nrows();
2437
2438 if let Some(bc) = predicate_bytecode::PredicateBytecode::lower(predicate, &self.base) {
2442 let count = self.mask.count();
2443
2444 if matches!(self.mask, AdaptiveSelection::Hybrid { .. })
2464 && !predicate_bytecode::should_use_sparse_path(count, nrows_base)
2465 {
2466 let fresh = bc.evaluate_to_selection(&self.base, nrows_base);
2467 let intersected = self.mask.intersect(&fresh);
2468 return Ok(TidyView {
2469 base: Rc::clone(&self.base),
2470 mask: intersected,
2471 proj: self.proj.clone(),
2472 });
2473 }
2474
2475 let new_mask = if predicate_bytecode::should_use_sparse_path(count, nrows_base) {
2476 let existing_indices: Vec<usize> = self.mask.iter_indices().collect();
2477 bc.interpret_sparse(&self.base, &existing_indices, nrows_base)
2478 } else {
2479 let current_mask = self.mask.materialize_mask();
2480 bc.interpret(&self.base, ¤t_mask)
2481 };
2482
2483 let words: Vec<u64> = new_mask.words_slice().to_vec();
2484 return Ok(TidyView {
2485 base: Rc::clone(&self.base),
2486 mask: AdaptiveSelection::from_predicate_result(words, nrows_base),
2487 proj: self.proj.clone(),
2488 });
2489 }
2490
2491 let current_mask = self.mask.materialize_mask();
2495
2496 if let Some(new_mask) = try_eval_predicate_columnar(&self.base, predicate, ¤t_mask) {
2501 let words: Vec<u64> = new_mask.words_slice().to_vec();
2502 return Ok(TidyView {
2503 base: Rc::clone(&self.base),
2504 mask: AdaptiveSelection::from_predicate_result(words, nrows_base),
2505 proj: self.proj.clone(),
2506 });
2507 }
2508
2509 let mut new_words: Vec<u64> = current_mask.words_slice().to_vec();
2511
2512 for row in self.mask.iter_indices() {
2515 let b = eval_expr_row_proj(&self.base, predicate, row, &self.proj)?;
2516 let pass = match b {
2517 ExprValue::Bool(v) => v,
2518 _ => {
2519 return Err(TidyError::PredicateNotBool {
2520 got: b.type_name().to_string(),
2521 })
2522 }
2523 };
2524 if !pass {
2525 new_words[row / 64] &= !(1u64 << (row % 64));
2526 }
2527 }
2528
2529 Ok(TidyView {
2530 base: Rc::clone(&self.base),
2531 mask: AdaptiveSelection::from_predicate_result(new_words, nrows_base),
2532 proj: self.proj.clone(),
2533 })
2534 }
2535
2536 pub fn select(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
2549 {
2551 let mut seen = std::collections::BTreeSet::new();
2552 for &name in cols {
2553 if !seen.insert(name) {
2554 return Err(TidyError::DuplicateColumn(name.to_string()));
2555 }
2556 }
2557 }
2558
2559 let mut new_indices = Vec::with_capacity(cols.len());
2561 for &name in cols {
2562 let idx = self
2563 .base
2564 .columns
2565 .iter()
2566 .position(|(n, _)| n == name)
2567 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
2568 new_indices.push(idx);
2569 }
2570
2571 Ok(TidyView {
2572 base: Rc::clone(&self.base),
2573 mask: self.mask.clone(),
2574 proj: ProjectionMap::from_indices(new_indices),
2575 })
2576 }
2577
2578 pub fn mutate(&self, assignments: &[(&str, DExpr)]) -> Result<TidyFrame, TidyError> {
2598 {
2600 let mut seen = std::collections::BTreeSet::new();
2601 for &(name, _) in assignments {
2602 if !seen.insert(name) {
2603 return Err(TidyError::DuplicateColumn(name.to_string()));
2604 }
2605 }
2606 }
2607
2608 let mut df = self.materialize()?;
2610
2611 let snapshot_names: Vec<String> = df.columns.iter().map(|(n, _)| n.clone()).collect();
2613
2614 for &(col_name, ref expr) in assignments {
2615 validate_expr_columns_snapshot(expr, &snapshot_names)?;
2617
2618 let nrows = df.nrows();
2619 let new_col = eval_expr_column(&df, expr, nrows)?;
2621
2622 if let Some(pos) = df.columns.iter().position(|(n, _)| n == col_name) {
2624 df.columns[pos].1 = new_col;
2625 } else {
2626 df.columns.push((col_name.to_string(), new_col));
2627 }
2628 }
2629
2630 Ok(TidyFrame {
2631 inner: Rc::new(RefCell::new(df)),
2632 })
2633 }
2634
2635 pub fn materialize(&self) -> Result<DataFrame, TidyError> {
2647 let row_indices: Vec<usize> = self.mask.iter_indices().collect();
2648
2649 let mut columns = Vec::with_capacity(self.proj.len());
2650 for &ci in self.proj.indices() {
2651 let (name, col) = &self.base.columns[ci];
2652 let new_col = gather_column(col, &row_indices);
2653 columns.push((name.clone(), new_col));
2654 }
2655
2656 DataFrame::from_columns(columns)
2657 .map_err(|e| TidyError::Internal(e.to_string()))
2658 }
2659
2660 pub fn to_tensor(&self, col_names: &[&str]) -> Result<cjc_runtime::Tensor, TidyError> {
2664 let df = self.materialize()?;
2665 df.to_tensor(col_names)
2666 .map_err(|e| TidyError::Internal(e.to_string()))
2667 }
2668
2669 pub fn mask(&self) -> BitMask {
2674 self.mask.materialize_mask()
2675 }
2676
2677 pub fn selection(&self) -> &AdaptiveSelection {
2681 &self.mask
2682 }
2683
2684 pub fn proj(&self) -> &ProjectionMap {
2686 &self.proj
2687 }
2688
2689 pub fn base_column(&self, name: &str) -> Option<&Column> {
2694 self.base.columns.iter()
2695 .find(|(n, _)| n == name)
2696 .map(|(_, c)| c)
2697 }
2698}
2699
2700#[derive(Debug, Clone)]
2708pub struct TidyFrame {
2709 inner: Rc<RefCell<DataFrame>>,
2710}
2711
2712impl TidyFrame {
2713 pub fn from_df(df: DataFrame) -> Self {
2715 TidyFrame {
2716 inner: Rc::new(RefCell::new(df)),
2717 }
2718 }
2719
2720 pub fn borrow(&self) -> std::cell::Ref<'_, DataFrame> {
2722 self.inner.borrow()
2723 }
2724
2725 pub fn view(&self) -> TidyView {
2727 let df = self.inner.borrow().clone();
2728 TidyView::from_df(df)
2729 }
2730
2731 pub fn mutate(&mut self, assignments: &[(&str, DExpr)]) -> Result<(), TidyError> {
2733 if Rc::strong_count(&self.inner) > 1 {
2735 let cloned = self.inner.borrow().clone();
2736 self.inner = Rc::new(RefCell::new(cloned));
2737 }
2738
2739 {
2741 let mut seen = std::collections::BTreeSet::new();
2742 for &(name, _) in assignments {
2743 if !seen.insert(name) {
2744 return Err(TidyError::DuplicateColumn(name.to_string()));
2745 }
2746 }
2747 }
2748
2749 let mut df = self.inner.borrow_mut();
2750
2751 let snapshot_names: Vec<String> = df.columns.iter().map(|(n, _)| n.clone()).collect();
2753
2754 for &(col_name, ref expr) in assignments {
2755 validate_expr_columns_snapshot(expr, &snapshot_names)?;
2756
2757 let nrows = df.nrows();
2758 let new_col = eval_expr_column(&df, expr, nrows)?;
2759
2760 if let Some(pos) = df.columns.iter().position(|(n, _)| n == col_name) {
2761 df.columns[pos].1 = new_col;
2762 } else {
2763 df.columns.push((col_name.to_string(), new_col));
2764 }
2765 }
2766
2767 Ok(())
2768 }
2769}
2770
2771#[derive(Debug, Clone, PartialEq)]
2775pub enum TidyError {
2776 ColumnNotFound(String),
2778 DuplicateColumn(String),
2780 PredicateNotBool { got: String },
2782 TypeMismatch { expected: String, got: String },
2784 LengthMismatch { expected: usize, got: usize },
2786 Internal(String),
2788 EmptyGroup,
2790 CapacityExceeded { limit: usize, got: usize },
2792}
2793
2794impl fmt::Display for TidyError {
2795 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2796 match self {
2797 TidyError::ColumnNotFound(n) => write!(f, "column `{}` not found", n),
2798 TidyError::DuplicateColumn(n) => write!(f, "duplicate column `{}`", n),
2799 TidyError::PredicateNotBool { got } => {
2800 write!(f, "filter predicate must be Bool, got {}", got)
2801 }
2802 TidyError::TypeMismatch { expected, got } => {
2803 write!(f, "type mismatch: expected {}, got {}", expected, got)
2804 }
2805 TidyError::LengthMismatch { expected, got } => {
2806 write!(
2807 f,
2808 "length mismatch: expected {} rows, got {}",
2809 expected, got
2810 )
2811 }
2812 TidyError::Internal(msg) => write!(f, "internal error: {}", msg),
2813 TidyError::EmptyGroup => write!(f, "aggregation on empty group"),
2814 TidyError::CapacityExceeded { limit, got } => {
2815 write!(f, "factor capacity exceeded: limit {} distinct levels, got {}", limit, got)
2816 }
2817 }
2818 }
2819}
2820
2821impl std::error::Error for TidyError {}
2822
2823fn extract_f64_column(df: &DataFrame, expr: &DExpr, nrows: usize) -> Result<Vec<f64>, TidyError> {
2835 let col = eval_expr_column(df, expr, nrows)?;
2836 match col {
2837 Column::Float(v) => Ok(v),
2838 Column::Int(v) => Ok(v.into_iter().map(|i| i as f64).collect()),
2839 _ => Err(TidyError::TypeMismatch {
2840 expected: "numeric".into(),
2841 got: "non-numeric".into(),
2842 }),
2843 }
2844}
2845
2846fn eval_window_column(
2849 df: &DataFrame,
2850 expr: &DExpr,
2851 nrows: usize,
2852) -> Result<Option<Column>, TidyError> {
2853 match expr {
2854 DExpr::RowNumber => {
2855 let vals: Vec<i64> = (1..=nrows as i64).collect();
2856 Ok(Some(Column::Int(vals)))
2857 }
2858 DExpr::CumSum(inner) => {
2859 let src = extract_f64_column(df, inner, nrows)?;
2860 let mut result = Vec::with_capacity(nrows);
2861 let mut sum = 0.0_f64;
2862 let mut comp = 0.0_f64; for &v in &src {
2864 let y = v - comp;
2865 let t = sum + y;
2866 comp = (t - sum) - y;
2867 sum = t;
2868 result.push(sum);
2869 }
2870 Ok(Some(Column::Float(result)))
2871 }
2872 DExpr::CumProd(inner) => {
2873 let src = extract_f64_column(df, inner, nrows)?;
2874 let mut result = Vec::with_capacity(nrows);
2875 let mut prod = 1.0_f64;
2876 for &v in &src {
2877 prod *= v;
2878 result.push(prod);
2879 }
2880 Ok(Some(Column::Float(result)))
2881 }
2882 DExpr::CumMax(inner) => {
2883 let src = extract_f64_column(df, inner, nrows)?;
2884 let mut result = Vec::with_capacity(nrows);
2885 let mut max = f64::NEG_INFINITY;
2886 for &v in &src {
2887 if v > max { max = v; }
2888 result.push(max);
2889 }
2890 Ok(Some(Column::Float(result)))
2891 }
2892 DExpr::CumMin(inner) => {
2893 let src = extract_f64_column(df, inner, nrows)?;
2894 let mut result = Vec::with_capacity(nrows);
2895 let mut min = f64::INFINITY;
2896 for &v in &src {
2897 if v < min { min = v; }
2898 result.push(min);
2899 }
2900 Ok(Some(Column::Float(result)))
2901 }
2902 DExpr::Lag(inner, k) => {
2903 let src = extract_f64_column(df, inner, nrows)?;
2904 let mut result = Vec::with_capacity(nrows);
2905 for i in 0..nrows {
2906 if i < *k {
2907 result.push(f64::NAN);
2908 } else {
2909 result.push(src[i - k]);
2910 }
2911 }
2912 Ok(Some(Column::Float(result)))
2913 }
2914 DExpr::Lead(inner, k) => {
2915 let src = extract_f64_column(df, inner, nrows)?;
2916 let mut result = Vec::with_capacity(nrows);
2917 for i in 0..nrows {
2918 if i + k >= nrows {
2919 result.push(f64::NAN);
2920 } else {
2921 result.push(src[i + k]);
2922 }
2923 }
2924 Ok(Some(Column::Float(result)))
2925 }
2926 DExpr::Rank(inner) => {
2927 let src = extract_f64_column(df, inner, nrows)?;
2928 let mut indexed: Vec<(usize, f64)> = src.iter().cloned().enumerate().collect();
2930 indexed.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2931 let mut ranks = vec![0.0_f64; nrows];
2932 let mut i = 0;
2933 while i < nrows {
2934 let mut j = i;
2935 while j < nrows && indexed[j].1 == indexed[i].1 {
2936 j += 1;
2937 }
2938 let avg_rank = (i + 1 + j) as f64 / 2.0; for idx in i..j {
2940 ranks[indexed[idx].0] = avg_rank;
2941 }
2942 i = j;
2943 }
2944 Ok(Some(Column::Float(ranks)))
2945 }
2946 DExpr::DenseRank(inner) => {
2947 let src = extract_f64_column(df, inner, nrows)?;
2948 let mut indexed: Vec<(usize, f64)> = src.iter().cloned().enumerate().collect();
2949 indexed.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
2950 let mut ranks = vec![0_i64; nrows];
2951 let mut rank = 0_i64;
2952 let mut prev: Option<f64> = None;
2953 for &(orig_idx, val) in &indexed {
2954 if prev.is_none() || prev.unwrap() != val {
2955 rank += 1;
2956 }
2957 ranks[orig_idx] = rank;
2958 prev = Some(val);
2959 }
2960 Ok(Some(Column::Int(ranks)))
2961 }
2962 DExpr::RollingSum(col_name, window) => {
2963 let vals = rolling_get_floats(df, col_name)?;
2964 let n = vals.len();
2965 let w = *window;
2966 let mut result = Vec::with_capacity(n);
2967 let mut sum = 0.0_f64;
2968 let mut comp = 0.0_f64;
2969 for i in 0..n {
2970 let y = vals[i] - comp;
2972 let t = sum + y;
2973 comp = (t - sum) - y;
2974 sum = t;
2975 if i >= w {
2977 let y2 = -vals[i - w] - comp;
2978 let t2 = sum + y2;
2979 comp = (t2 - sum) - y2;
2980 sum = t2;
2981 }
2982 result.push(sum);
2983 }
2984 Ok(Some(Column::Float(result)))
2985 }
2986 DExpr::RollingMean(col_name, window) => {
2987 let vals = rolling_get_floats(df, col_name)?;
2988 let n = vals.len();
2989 let w = *window;
2990 let mut result = Vec::with_capacity(n);
2991 let mut sum = 0.0_f64;
2992 let mut comp = 0.0_f64;
2993 for i in 0..n {
2994 let y = vals[i] - comp;
2995 let t = sum + y;
2996 comp = (t - sum) - y;
2997 sum = t;
2998 if i >= w {
2999 let y2 = -vals[i - w] - comp;
3000 let t2 = sum + y2;
3001 comp = (t2 - sum) - y2;
3002 sum = t2;
3003 }
3004 let count = if i < w { i + 1 } else { w };
3005 result.push(sum / count as f64);
3006 }
3007 Ok(Some(Column::Float(result)))
3008 }
3009 DExpr::RollingMin(col_name, window) => {
3010 let vals = rolling_get_floats(df, col_name)?;
3011 let n = vals.len();
3012 let w = *window;
3013 let mut result = Vec::with_capacity(n);
3014 let mut deque: VecDeque<usize> = VecDeque::new();
3015 for i in 0..n {
3016 while !deque.is_empty() && *deque.front().unwrap() + w <= i {
3018 deque.pop_front();
3019 }
3020 while !deque.is_empty() && vals[*deque.back().unwrap()] >= vals[i] {
3022 deque.pop_back();
3023 }
3024 deque.push_back(i);
3025 result.push(vals[*deque.front().unwrap()]);
3026 }
3027 Ok(Some(Column::Float(result)))
3028 }
3029 DExpr::RollingMax(col_name, window) => {
3030 let vals = rolling_get_floats(df, col_name)?;
3031 let n = vals.len();
3032 let w = *window;
3033 let mut result = Vec::with_capacity(n);
3034 let mut deque: VecDeque<usize> = VecDeque::new();
3035 for i in 0..n {
3036 while !deque.is_empty() && *deque.front().unwrap() + w <= i {
3037 deque.pop_front();
3038 }
3039 while !deque.is_empty() && vals[*deque.back().unwrap()] <= vals[i] {
3041 deque.pop_back();
3042 }
3043 deque.push_back(i);
3044 result.push(vals[*deque.front().unwrap()]);
3045 }
3046 Ok(Some(Column::Float(result)))
3047 }
3048 DExpr::RollingVar(col_name, window) => {
3049 let vals = rolling_get_floats(df, col_name)?;
3050 let n = vals.len();
3051 let w = *window;
3052 let mut result = Vec::with_capacity(n);
3053 let mut count = 0_usize;
3055 let mut mean = 0.0_f64;
3056 let mut m2 = 0.0_f64;
3057 for i in 0..n {
3058 count += 1;
3060 let delta = vals[i] - mean;
3061 mean += delta / count as f64;
3062 let delta2 = vals[i] - mean;
3063 m2 += delta * delta2;
3064 if i >= w {
3066 let old = vals[i - w];
3067 count -= 1;
3068 if count == 0 {
3069 mean = 0.0;
3070 m2 = 0.0;
3071 } else {
3072 let delta_old = old - mean;
3073 mean -= delta_old / count as f64;
3074 let delta_old2 = old - mean;
3075 m2 -= delta_old * delta_old2;
3076 }
3077 }
3078 if count < 2 {
3079 result.push(0.0);
3080 } else {
3081 result.push(m2 / (count - 1) as f64);
3084 }
3085 }
3086 Ok(Some(Column::Float(result)))
3087 }
3088 DExpr::RollingSd(col_name, window) => {
3089 let vals = rolling_get_floats(df, col_name)?;
3090 let n = vals.len();
3091 let w = *window;
3092 let mut result = Vec::with_capacity(n);
3093 let mut count = 0_usize;
3094 let mut mean = 0.0_f64;
3095 let mut m2 = 0.0_f64;
3096 for i in 0..n {
3097 count += 1;
3098 let delta = vals[i] - mean;
3099 mean += delta / count as f64;
3100 let delta2 = vals[i] - mean;
3101 m2 += delta * delta2;
3102 if i >= w {
3103 let old = vals[i - w];
3104 count -= 1;
3105 if count == 0 {
3106 mean = 0.0;
3107 m2 = 0.0;
3108 } else {
3109 let delta_old = old - mean;
3110 mean -= delta_old / count as f64;
3111 let delta_old2 = old - mean;
3112 m2 -= delta_old * delta_old2;
3113 }
3114 }
3115 if count < 2 {
3116 result.push(0.0);
3117 } else {
3118 result.push((m2 / (count - 1) as f64).sqrt());
3119 }
3120 }
3121 Ok(Some(Column::Float(result)))
3122 }
3123 _ => Ok(None),
3124 }
3125}
3126
3127fn rolling_get_floats(df: &DataFrame, col_name: &str) -> Result<Vec<f64>, TidyError> {
3129 let col = df
3130 .get_column(col_name)
3131 .ok_or_else(|| TidyError::ColumnNotFound(col_name.to_string()))?;
3132 match col {
3133 Column::Float(v) => Ok(v.clone()),
3134 Column::Int(v) => Ok(v.iter().map(|&i| i as f64).collect()),
3135 _ => Err(TidyError::TypeMismatch {
3136 expected: "numeric".into(),
3137 got: "non-numeric".into(),
3138 }),
3139 }
3140}
3141
3142fn vectorized_binop(op: DBinOp, left: &Column, right: &Column) -> Result<Column, TidyError> {
3147 match (left, right) {
3148 (Column::Int(a), Column::Int(b)) => {
3149 let n = a.len();
3150 match op {
3151 DBinOp::Add => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] + b[i]; } Ok(Column::Int(r)) }
3152 DBinOp::Sub => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] - b[i]; } Ok(Column::Int(r)) }
3153 DBinOp::Mul => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] * b[i]; } Ok(Column::Int(r)) }
3154 DBinOp::Div => { let mut r = vec![0i64; n]; for i in 0..n { r[i] = a[i] / b[i]; } Ok(Column::Int(r)) }
3155 DBinOp::Gt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] > b[i]; } Ok(Column::Bool(r)) }
3156 DBinOp::Lt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] < b[i]; } Ok(Column::Bool(r)) }
3157 DBinOp::Ge => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] >= b[i]; } Ok(Column::Bool(r)) }
3158 DBinOp::Le => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] <= b[i]; } Ok(Column::Bool(r)) }
3159 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3160 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3161 _ => Err(TidyError::Internal(format!("unsupported op {:?} on Int", op))),
3162 }
3163 }
3164 (Column::Float(a), Column::Float(b)) => {
3165 let n = a.len();
3166 match op {
3167 DBinOp::Add => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] + b[i]; } Ok(Column::Float(r)) }
3168 DBinOp::Sub => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] - b[i]; } Ok(Column::Float(r)) }
3169 DBinOp::Mul => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] * b[i]; } Ok(Column::Float(r)) }
3170 DBinOp::Div => { let mut r = vec![0.0f64; n]; for i in 0..n { r[i] = a[i] / b[i]; } Ok(Column::Float(r)) }
3171 DBinOp::Gt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] > b[i]; } Ok(Column::Bool(r)) }
3172 DBinOp::Lt => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] < b[i]; } Ok(Column::Bool(r)) }
3173 DBinOp::Ge => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] >= b[i]; } Ok(Column::Bool(r)) }
3174 DBinOp::Le => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] <= b[i]; } Ok(Column::Bool(r)) }
3175 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3176 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3177 _ => Err(TidyError::Internal(format!("unsupported op {:?} on Float", op))),
3178 }
3179 }
3180 (Column::Int(a), Column::Float(_b)) => {
3181 let promoted: Vec<f64> = a.iter().map(|&v| v as f64).collect();
3182 vectorized_binop(op, &Column::Float(promoted), right)
3183 }
3184 (Column::Float(_a), Column::Int(b)) => {
3185 let promoted: Vec<f64> = b.iter().map(|&v| v as f64).collect();
3186 vectorized_binop(op, left, &Column::Float(promoted))
3187 }
3188 (Column::Bool(a), Column::Bool(b)) => {
3189 let n = a.len();
3190 match op {
3191 DBinOp::And => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] && b[i]; } Ok(Column::Bool(r)) }
3192 DBinOp::Or => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] || b[i]; } Ok(Column::Bool(r)) }
3193 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3194 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3195 _ => Err(TidyError::Internal(format!("unsupported op {:?} on Bool", op))),
3196 }
3197 }
3198 (Column::Str(a), Column::Str(b)) => {
3199 let n = a.len();
3200 match op {
3201 DBinOp::Eq => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] == b[i]; } Ok(Column::Bool(r)) }
3202 DBinOp::Ne => { let mut r = vec![false; n]; for i in 0..n { r[i] = a[i] != b[i]; } Ok(Column::Bool(r)) }
3203 _ => Err(TidyError::Internal(format!("unsupported op {:?} on String", op))),
3204 }
3205 }
3206 _ => Err(TidyError::Internal("type mismatch in binary operation".into())),
3207 }
3208}
3209
3210fn vectorized_fn_call(name: &str, arg: &Column) -> Result<Column, TidyError> {
3213 let floats: Vec<f64> = match arg {
3214 Column::Float(v) => v.clone(),
3215 Column::Int(v) => v.iter().map(|&i| i as f64).collect(),
3216 _ => return Err(TidyError::Internal(format!(
3217 "FnCall '{}' requires numeric argument", name
3218 ))),
3219 };
3220 let f: fn(f64) -> f64 = match name {
3221 "log" => f64::ln,
3222 "exp" => f64::exp,
3223 "sqrt" => f64::sqrt,
3224 "abs" => f64::abs,
3225 "ceil" => f64::ceil,
3226 "floor" => f64::floor,
3227 "round" => f64::round,
3228 "sin" => f64::sin,
3229 "cos" => f64::cos,
3230 "tan" => f64::tan,
3231 _ => return Err(TidyError::Internal(format!(
3232 "unknown DExpr function: {}", name
3233 ))),
3234 };
3235 let mut result = vec![0.0f64; floats.len()];
3236 for i in 0..floats.len() {
3237 result[i] = f(floats[i]);
3238 }
3239 Ok(Column::Float(result))
3240}
3241
3242fn try_eval_expr_column_vectorized(
3246 df: &DataFrame,
3247 expr: &DExpr,
3248 nrows: usize,
3249) -> Option<Result<Column, TidyError>> {
3250 match expr {
3251 DExpr::Col(name) => {
3252 let col = df.get_column(name)?;
3253 let result = match col {
3254 Column::Int(v) => Column::Int(v[..nrows].to_vec()),
3255 Column::Float(v) => Column::Float(v[..nrows].to_vec()),
3256 Column::Str(v) => Column::Str(v[..nrows].to_vec()),
3257 Column::Bool(v) => Column::Bool(v[..nrows].to_vec()),
3258 Column::Categorical { levels, codes } => {
3259 let strs: Vec<String> = codes[..nrows]
3260 .iter()
3261 .map(|&c| levels[c as usize].clone())
3262 .collect();
3263 Column::Str(strs)
3264 }
3265 Column::CategoricalAdaptive(cc) => {
3266 let strs: Vec<String> = (0..nrows)
3267 .map(|i| match cc.get(i) {
3268 None => String::new(),
3269 Some(b) => String::from_utf8_lossy(b).into_owned(),
3270 })
3271 .collect();
3272 Column::Str(strs)
3273 }
3274 Column::DateTime(v) => Column::Int(v[..nrows].to_vec()),
3275 };
3276 Some(Ok(result))
3277 }
3278 DExpr::LitFloat(v) => Some(Ok(Column::Float(vec![*v; nrows]))),
3279 DExpr::LitInt(v) => Some(Ok(Column::Int(vec![*v; nrows]))),
3280 DExpr::LitBool(b) => Some(Ok(Column::Bool(vec![*b; nrows]))),
3281 DExpr::LitStr(s) => Some(Ok(Column::Str(vec![s.clone(); nrows]))),
3282 DExpr::BinOp { op, left, right } => {
3283 let left_col = try_eval_expr_column_vectorized(df, left, nrows)?.ok()?;
3284 let right_col = try_eval_expr_column_vectorized(df, right, nrows)?.ok()?;
3285 Some(vectorized_binop(*op, &left_col, &right_col))
3286 }
3287 DExpr::FnCall(name, args) if args.len() == 1 => {
3288 let arg_col = try_eval_expr_column_vectorized(df, &args[0], nrows)?.ok()?;
3289 Some(vectorized_fn_call(name, &arg_col))
3290 }
3291 _ => None,
3292 }
3293}
3294
3295fn eval_expr_column(df: &DataFrame, expr: &DExpr, nrows: usize) -> Result<Column, TidyError> {
3296 if nrows == 0 {
3297 return Ok(Column::Float(vec![]));
3299 }
3300
3301 if let DExpr::Col(name) = expr {
3309 if let Some(src) = df.get_column(name) {
3310 match src {
3311 Column::Categorical { .. } | Column::CategoricalAdaptive(_) => {
3312 return Ok(src.clone());
3313 }
3314 _ => {}
3315 }
3316 }
3317 }
3318
3319 if let Some(col) = eval_window_column(df, expr, nrows)? {
3321 return Ok(col);
3322 }
3323
3324 if let Some(result) = try_eval_expr_column_vectorized(df, expr, nrows) {
3326 return result;
3327 }
3328
3329 let sample = eval_dexpr_row(df, expr, 0)?;
3331 match sample {
3332 ExprValue::Int(_) => {
3333 let vals: Result<Vec<i64>, TidyError> = (0..nrows)
3334 .map(|r| {
3335 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3336 ExprValue::Int(i) => Ok(i),
3337 ExprValue::Float(f) => Ok(f as i64),
3338 other => Err(TidyError::TypeMismatch {
3339 expected: "Int".into(),
3340 got: other.type_name().into(),
3341 }),
3342 })
3343 })
3344 .collect();
3345 Ok(Column::Int(vals?))
3346 }
3347 ExprValue::Float(_) => {
3348 let vals: Result<Vec<f64>, TidyError> = (0..nrows)
3349 .map(|r| {
3350 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3351 ExprValue::Float(f) => Ok(f),
3352 ExprValue::Int(i) => Ok(i as f64),
3353 other => Err(TidyError::TypeMismatch {
3354 expected: "Float".into(),
3355 got: other.type_name().into(),
3356 }),
3357 })
3358 })
3359 .collect();
3360 Ok(Column::Float(vals?))
3361 }
3362 ExprValue::Bool(_) => {
3363 let vals: Result<Vec<bool>, TidyError> = (0..nrows)
3364 .map(|r| {
3365 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3366 ExprValue::Bool(b) => Ok(b),
3367 other => Err(TidyError::TypeMismatch {
3368 expected: "Bool".into(),
3369 got: other.type_name().into(),
3370 }),
3371 })
3372 })
3373 .collect();
3374 Ok(Column::Bool(vals?))
3375 }
3376 ExprValue::Str(_) => {
3377 let vals: Result<Vec<String>, TidyError> = (0..nrows)
3378 .map(|r| {
3379 eval_dexpr_row(df, expr, r).and_then(|v| match v {
3380 ExprValue::Str(s) => Ok(s),
3381 other => Err(TidyError::TypeMismatch {
3382 expected: "Str".into(),
3383 got: other.type_name().into(),
3384 }),
3385 })
3386 })
3387 .collect();
3388 Ok(Column::Str(vals?))
3389 }
3390 }
3391}
3392
3393fn eval_dexpr_row(df: &DataFrame, expr: &DExpr, row: usize) -> Result<ExprValue, TidyError> {
3395 eval_expr_row(df, expr, row).map_err(|e| TidyError::Internal(e.to_string()))
3396}
3397
3398fn eval_expr_row_proj(
3400 base: &DataFrame,
3401 expr: &DExpr,
3402 row: usize,
3403 _proj: &ProjectionMap,
3404) -> Result<ExprValue, TidyError> {
3405 eval_expr_row(base, expr, row).map_err(|e| TidyError::Internal(e.to_string()))
3409}
3410
3411fn validate_expr_columns_proj(
3418 expr: &DExpr,
3419 base: &DataFrame,
3420 _proj: &ProjectionMap,
3421) -> Result<(), TidyError> {
3422 let mut refs = Vec::new();
3423 collect_expr_columns(expr, &mut refs);
3424 for col_name in refs {
3425 if base.get_column(&col_name).is_none() {
3426 return Err(TidyError::ColumnNotFound(col_name));
3427 }
3428 }
3429 Ok(())
3430}
3431
3432fn validate_expr_columns_snapshot(
3434 expr: &DExpr,
3435 snapshot_names: &[String],
3436) -> Result<(), TidyError> {
3437 let mut refs = Vec::new();
3438 collect_expr_columns(expr, &mut refs);
3439 for col_name in refs {
3440 if !snapshot_names.iter().any(|n| n == &col_name) {
3441 return Err(TidyError::ColumnNotFound(col_name));
3442 }
3443 }
3444 Ok(())
3445}
3446
3447impl ExprValue {
3448 fn type_name(&self) -> &'static str {
3449 match self {
3450 ExprValue::Int(_) => "Int",
3451 ExprValue::Float(_) => "Float",
3452 ExprValue::Str(_) => "Str",
3453 ExprValue::Bool(_) => "Bool",
3454 }
3455 }
3456}
3457
3458impl DataFrame {
3461 pub fn tidy(self) -> TidyView {
3465 TidyView::from_df(self)
3466 }
3467
3468 pub fn tidy_mut(self) -> TidyFrame {
3470 TidyFrame::from_df(self)
3471 }
3472}
3473
3474#[derive(Debug, Clone, PartialEq, Eq)]
3532pub struct RowIndexMap {
3533 pub(crate) indices: Vec<usize>,
3536}
3537
3538impl RowIndexMap {
3539 pub fn new(indices: Vec<usize>) -> Self {
3541 RowIndexMap { indices }
3542 }
3543
3544 pub fn len(&self) -> usize {
3546 self.indices.len()
3547 }
3548
3549 pub fn is_empty(&self) -> bool {
3551 self.indices.is_empty()
3552 }
3553
3554 pub fn as_slice(&self) -> &[usize] {
3556 &self.indices
3557 }
3558}
3559
3560#[derive(Debug, Clone)]
3564pub struct GroupMeta {
3565 pub key_values: Vec<String>,
3567 pub row_indices: Vec<usize>,
3569}
3570
3571#[derive(Debug, Clone)]
3582pub struct GroupIndex {
3583 pub groups: Vec<GroupMeta>,
3585 pub key_names: Vec<String>,
3587}
3588
3589impl GroupIndex {
3590 pub fn build(
3595 base: &DataFrame,
3596 key_col_indices: &[usize],
3597 visible_rows: &[usize],
3598 key_names: Vec<String>,
3599 ) -> Self {
3600 let mut group_order: Vec<Vec<String>> = Vec::new(); let mut group_map: Vec<(Vec<String>, usize)> = Vec::new(); for &row in visible_rows {
3606 let key: Vec<String> = key_col_indices
3607 .iter()
3608 .map(|&ci| base.columns[ci].1.get_display(row))
3609 .collect();
3610
3611 let slot = group_map
3613 .iter()
3614 .position(|(k, _)| k == &key)
3615 .unwrap_or_else(|| {
3616 let s = group_map.len();
3617 group_map.push((key.clone(), s));
3618 group_order.push(key);
3619 s
3620 });
3621
3622 let _ = slot; }
3624
3625 let mut groups: Vec<GroupMeta> = group_order
3627 .iter()
3628 .map(|k| GroupMeta {
3629 key_values: k.clone(),
3630 row_indices: Vec::new(),
3631 })
3632 .collect();
3633
3634 let key_to_slot: Vec<(Vec<String>, usize)> = group_order
3636 .iter()
3637 .enumerate()
3638 .map(|(i, k)| (k.clone(), i))
3639 .collect();
3640
3641 for &row in visible_rows {
3642 let key: Vec<String> = key_col_indices
3643 .iter()
3644 .map(|&ci| base.columns[ci].1.get_display(row))
3645 .collect();
3646 if let Some((_, slot)) = key_to_slot.iter().find(|(k, _)| k == &key) {
3647 groups[*slot].row_indices.push(row);
3648 }
3649 }
3650
3651 GroupIndex { groups, key_names }
3652 }
3653}
3654
3655#[derive(Debug, Clone)]
3665pub struct GroupedTidyView {
3671 view: TidyView,
3672 index: GroupIndex,
3673}
3674
3675impl GroupedTidyView {
3676 pub fn ngroups(&self) -> usize {
3678 self.index.groups.len()
3679 }
3680
3681 pub fn ungroup(self) -> TidyView {
3683 self.view
3684 }
3685
3686 pub fn group_index(&self) -> &GroupIndex {
3688 &self.index
3689 }
3690
3691 pub fn summarise(
3710 &self,
3711 assignments: &[(&str, TidyAgg)],
3712 ) -> Result<TidyFrame, TidyError> {
3713 {
3715 let mut seen = std::collections::BTreeSet::new();
3716 for &(name, _) in assignments {
3717 if !seen.insert(name) {
3718 return Err(TidyError::DuplicateColumn(name.to_string()));
3719 }
3720 }
3721 }
3722
3723 let base = &self.view.base;
3724 let n_groups = self.index.groups.len();
3725
3726 let mut result_columns: Vec<(String, Column)> = Vec::new();
3728
3729 for key_name in &self.index.key_names {
3730 let base_col = base
3731 .get_column(key_name)
3732 .ok_or_else(|| TidyError::ColumnNotFound(key_name.clone()))?;
3733
3734 let col = match base_col {
3735 Column::Int(_) => {
3736 let vals: Vec<i64> = self
3737 .index
3738 .groups
3739 .iter()
3740 .map(|g| {
3741 g.key_values[self
3742 .index
3743 .key_names
3744 .iter()
3745 .position(|k| k == key_name)
3746 .unwrap()]
3747 .parse::<i64>()
3748 .unwrap_or(0)
3749 })
3750 .collect();
3751 Column::Int(vals)
3752 }
3753 Column::Bool(_) => {
3754 let vals: Vec<bool> = self
3755 .index
3756 .groups
3757 .iter()
3758 .map(|g| {
3759 let s = &g.key_values[self
3760 .index
3761 .key_names
3762 .iter()
3763 .position(|k| k == key_name)
3764 .unwrap()];
3765 matches!(s.as_str(), "true" | "1")
3766 })
3767 .collect();
3768 Column::Bool(vals)
3769 }
3770 _ => {
3771 let vals: Vec<String> = self
3773 .index
3774 .groups
3775 .iter()
3776 .map(|g| {
3777 g.key_values[self
3778 .index
3779 .key_names
3780 .iter()
3781 .position(|k| k == key_name)
3782 .unwrap()]
3783 .clone()
3784 })
3785 .collect();
3786 Column::Str(vals)
3787 }
3788 };
3789 result_columns.push((key_name.clone(), col));
3790 }
3791
3792 for &(out_name, ref agg) in assignments {
3794 let col_vals = self.eval_agg_over_groups_fast(agg, n_groups, base)?;
3795 result_columns.push((out_name.to_string(), col_vals));
3796 }
3797
3798 let df = DataFrame::from_columns(result_columns)
3799 .map_err(|e| TidyError::Internal(e.to_string()))?;
3800 Ok(TidyFrame::from_df(df))
3801 }
3802
3803 #[allow(dead_code)]
3805 fn eval_agg_over_groups(
3806 &self,
3807 agg: &TidyAgg,
3808 n_groups: usize,
3809 base: &DataFrame,
3810 ) -> Result<Column, TidyError> {
3811 match agg {
3812 TidyAgg::Count => {
3813 let counts: Vec<i64> = self
3814 .index
3815 .groups
3816 .iter()
3817 .map(|g| g.row_indices.len() as i64)
3818 .collect();
3819 Ok(Column::Int(counts))
3820 }
3821
3822 TidyAgg::Sum(col_name) | TidyAgg::Mean(col_name)
3823 | TidyAgg::Min(col_name) | TidyAgg::Max(col_name)
3824 | TidyAgg::First(col_name) | TidyAgg::Last(col_name)
3825 | TidyAgg::Median(col_name) | TidyAgg::Sd(col_name)
3826 | TidyAgg::Var(col_name) | TidyAgg::Quantile(col_name, _)
3827 | TidyAgg::NDistinct(col_name) | TidyAgg::Iqr(col_name) => {
3828 let src = base
3829 .get_column(col_name)
3830 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3831
3832 let mut vals = Vec::with_capacity(n_groups);
3833 for group in &self.index.groups {
3834 let v = agg_reduce(agg, src, &group.row_indices)?;
3835 vals.push(v);
3836 }
3837 Ok(Column::Float(vals))
3838 }
3839 }
3840 }
3841
3842 fn eval_agg_over_groups_fast(
3845 &self,
3846 agg: &TidyAgg,
3847 n_groups: usize,
3848 base: &DataFrame,
3849 ) -> Result<Column, TidyError> {
3850 match agg {
3851 TidyAgg::Count => {
3852 let counts: Vec<i64> = self
3853 .index
3854 .groups
3855 .iter()
3856 .map(|g| g.row_indices.len() as i64)
3857 .collect();
3858 Ok(Column::Int(counts))
3859 }
3860 TidyAgg::Sum(col_name) => {
3861 let src = base.get_column(col_name)
3862 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3863 Ok(Column::Float(fast_agg_sum(&self.index.groups, src)?))
3864 }
3865 TidyAgg::Mean(col_name) => {
3866 let src = base.get_column(col_name)
3867 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3868 Ok(Column::Float(fast_agg_mean(&self.index.groups, src)?))
3869 }
3870 TidyAgg::Min(col_name) => {
3871 let src = base.get_column(col_name)
3872 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3873 Ok(Column::Float(fast_agg_min(&self.index.groups, src)?))
3874 }
3875 TidyAgg::Max(col_name) => {
3876 let src = base.get_column(col_name)
3877 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3878 Ok(Column::Float(fast_agg_max(&self.index.groups, src)?))
3879 }
3880 TidyAgg::First(col_name) => {
3881 let src = base.get_column(col_name)
3882 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3883 Ok(Column::Float(fast_agg_first(&self.index.groups, src)?))
3884 }
3885 TidyAgg::Last(col_name) => {
3886 let src = base.get_column(col_name)
3887 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3888 Ok(Column::Float(fast_agg_last(&self.index.groups, src)?))
3889 }
3890 TidyAgg::Var(col_name)
3891 | TidyAgg::Sd(col_name)
3892 | TidyAgg::Median(col_name)
3893 | TidyAgg::Quantile(col_name, _)
3894 | TidyAgg::NDistinct(col_name)
3895 | TidyAgg::Iqr(col_name) => {
3896 let src = base.get_column(col_name)
3897 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
3898 Ok(Column::Float(fast_agg_arena(
3899 agg, &self.index.groups, src, n_groups,
3900 )?))
3901 }
3902 }
3903 }
3904}
3905
3906enum ColRef<'a> {
3909 Float(&'a [f64]),
3910 Int(&'a [i64]),
3911}
3912
3913fn col_to_ref(col: &Column) -> Result<ColRef<'_>, TidyError> {
3914 match col {
3915 Column::Float(v) => Ok(ColRef::Float(v)),
3916 Column::Int(v) => Ok(ColRef::Int(v)),
3917 _ => Err(TidyError::TypeMismatch {
3918 expected: "numeric (Int or Float)".into(),
3919 got: col.type_name().into(),
3920 }),
3921 }
3922}
3923
3924fn fast_agg_sum(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3925 use cjc_repro::kahan::KahanAccumulatorF64;
3926 let cr = col_to_ref(col)?;
3927 Ok(groups.iter().map(|g| {
3928 let mut acc = KahanAccumulatorF64::new();
3929 match cr {
3930 ColRef::Float(d) => { for &i in &g.row_indices { acc.add(d[i]); } }
3931 ColRef::Int(d) => { for &i in &g.row_indices { acc.add(d[i] as f64); } }
3932 }
3933 acc.finalize()
3934 }).collect())
3935}
3936
3937fn fast_agg_mean(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3938 use cjc_repro::kahan::KahanAccumulatorF64;
3939 let cr = col_to_ref(col)?;
3940 Ok(groups.iter().map(|g| {
3941 if g.row_indices.is_empty() { return f64::NAN; }
3942 let mut acc = KahanAccumulatorF64::new();
3943 match cr {
3944 ColRef::Float(d) => { for &i in &g.row_indices { acc.add(d[i]); } }
3945 ColRef::Int(d) => { for &i in &g.row_indices { acc.add(d[i] as f64); } }
3946 }
3947 acc.finalize() / g.row_indices.len() as f64
3948 }).collect())
3949}
3950
3951fn fast_agg_min(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3952 let cr = col_to_ref(col)?;
3953 Ok(groups.iter().map(|g| {
3954 if g.row_indices.is_empty() { return f64::NAN; }
3955 match cr {
3956 ColRef::Float(d) => g.row_indices.iter().fold(f64::INFINITY, |a, &i| {
3957 let b = d[i]; if b.is_nan() || b < a { b } else { a }
3958 }),
3959 ColRef::Int(d) => g.row_indices.iter().fold(f64::INFINITY, |a, &i| {
3960 let b = d[i] as f64; if b.is_nan() || b < a { b } else { a }
3961 }),
3962 }
3963 }).collect())
3964}
3965
3966fn fast_agg_max(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3967 let cr = col_to_ref(col)?;
3968 Ok(groups.iter().map(|g| {
3969 if g.row_indices.is_empty() { return f64::NAN; }
3970 match cr {
3971 ColRef::Float(d) => g.row_indices.iter().fold(f64::NEG_INFINITY, |a, &i| {
3972 let b = d[i]; if b.is_nan() || b > a { b } else { a }
3973 }),
3974 ColRef::Int(d) => g.row_indices.iter().fold(f64::NEG_INFINITY, |a, &i| {
3975 let b = d[i] as f64; if b.is_nan() || b > a { b } else { a }
3976 }),
3977 }
3978 }).collect())
3979}
3980
3981fn fast_agg_first(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3982 let cr = col_to_ref(col)?;
3983 let mut vals = Vec::with_capacity(groups.len());
3984 for g in groups {
3985 if g.row_indices.is_empty() { return Err(TidyError::EmptyGroup); }
3986 match cr {
3987 ColRef::Float(d) => vals.push(d[g.row_indices[0]]),
3988 ColRef::Int(d) => vals.push(d[g.row_indices[0]] as f64),
3989 }
3990 }
3991 Ok(vals)
3992}
3993
3994fn fast_agg_last(groups: &[GroupMeta], col: &Column) -> Result<Vec<f64>, TidyError> {
3995 let cr = col_to_ref(col)?;
3996 let mut vals = Vec::with_capacity(groups.len());
3997 for g in groups {
3998 if g.row_indices.is_empty() { return Err(TidyError::EmptyGroup); }
3999 let last = *g.row_indices.last().unwrap();
4000 match cr {
4001 ColRef::Float(d) => vals.push(d[last]),
4002 ColRef::Int(d) => vals.push(d[last] as f64),
4003 }
4004 }
4005 Ok(vals)
4006}
4007
4008fn fast_agg_arena(
4011 agg: &TidyAgg,
4012 groups: &[GroupMeta],
4013 col: &Column,
4014 n_groups: usize,
4015) -> Result<Vec<f64>, TidyError> {
4016 let cr = col_to_ref(col)?;
4017 let max_size = groups.iter().map(|g| g.row_indices.len()).max().unwrap_or(0);
4018 let mut arena: Vec<f64> = Vec::with_capacity(max_size);
4019 let mut results = Vec::with_capacity(n_groups);
4020 for group in groups {
4021 arena.clear();
4022 match cr {
4023 ColRef::Float(d) => { for &i in &group.row_indices { arena.push(d[i]); } }
4024 ColRef::Int(d) => { for &i in &group.row_indices { arena.push(d[i] as f64); } }
4025 }
4026 let val = agg_reduce_slice(agg, &mut arena)?;
4027 results.push(val);
4028 }
4029 Ok(results)
4030}
4031
4032fn agg_reduce_slice(agg: &TidyAgg, values: &mut [f64]) -> Result<f64, TidyError> {
4035 match agg {
4036 TidyAgg::Var(_) => {
4037 if values.len() < 2 {
4038 Ok(f64::NAN)
4039 } else {
4040 let n = values.len() as f64;
4041 let mean = kahan_sum_f64(values) / n;
4042 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4043 Ok(kahan_sum_f64(&sq_diffs) / (n - 1.0))
4044 }
4045 }
4046 TidyAgg::Sd(_) => {
4047 if values.len() < 2 {
4048 Ok(f64::NAN)
4049 } else {
4050 let n = values.len() as f64;
4051 let mean = kahan_sum_f64(values) / n;
4052 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4053 Ok((kahan_sum_f64(&sq_diffs) / (n - 1.0)).sqrt())
4054 }
4055 }
4056 TidyAgg::Median(_) => {
4057 if values.is_empty() {
4058 Ok(f64::NAN)
4059 } else {
4060 values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4061 let n = values.len();
4062 if n % 2 == 1 { Ok(values[n / 2]) }
4063 else { Ok((values[n / 2 - 1] + values[n / 2]) / 2.0) }
4064 }
4065 }
4066 TidyAgg::Quantile(_, p) => {
4067 if values.is_empty() {
4068 Ok(f64::NAN)
4069 } else {
4070 values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4071 let n = values.len();
4072 if n == 1 { return Ok(values[0]); }
4073 let pos = p * (n as f64 - 1.0);
4074 let lo = pos.floor() as usize;
4075 let hi = pos.ceil() as usize;
4076 let frac = pos - lo as f64;
4077 if lo == hi || hi >= n { Ok(values[lo.min(n - 1)]) }
4078 else { Ok(values[lo] + frac * (values[hi] - values[lo])) }
4079 }
4080 }
4081 TidyAgg::NDistinct(_) => {
4082 let distinct: std::collections::BTreeSet<u64> = values.iter().map(|v| v.to_bits()).collect();
4083 Ok(distinct.len() as f64)
4084 }
4085 TidyAgg::Iqr(_) => {
4086 if values.is_empty() {
4087 Ok(f64::NAN)
4088 } else {
4089 values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4090 let n = values.len();
4091 if n == 1 { return Ok(0.0); }
4092 let q1 = {
4093 let pos = 0.25 * (n as f64 - 1.0);
4094 let lo = pos.floor() as usize;
4095 let hi = pos.ceil() as usize;
4096 let frac = pos - lo as f64;
4097 if lo == hi || hi >= n { values[lo.min(n - 1)] }
4098 else { values[lo] + frac * (values[hi] - values[lo]) }
4099 };
4100 let q3 = {
4101 let pos = 0.75 * (n as f64 - 1.0);
4102 let lo = pos.floor() as usize;
4103 let hi = pos.ceil() as usize;
4104 let frac = pos - lo as f64;
4105 if lo == hi || hi >= n { values[lo.min(n - 1)] }
4106 else { values[lo] + frac * (values[hi] - values[lo]) }
4107 };
4108 Ok(q3 - q1)
4109 }
4110 }
4111 _ => unreachable!("agg_reduce_slice called for non-arena aggregator"),
4112 }
4113}
4114
4115#[allow(dead_code)]
4117fn agg_reduce(
4118 agg: &TidyAgg,
4119 col: &Column,
4120 rows: &[usize],
4121) -> Result<f64, TidyError> {
4122 let values: Vec<f64> = match col {
4124 Column::Int(v) => rows.iter().map(|&r| v[r] as f64).collect(),
4125 Column::Float(v) => rows.iter().map(|&r| v[r]).collect(),
4126 _ => {
4127 return Err(TidyError::TypeMismatch {
4128 expected: "numeric (Int or Float)".into(),
4129 got: col.type_name().into(),
4130 })
4131 }
4132 };
4133
4134 match agg {
4135 TidyAgg::Sum(_) => Ok(kahan_sum_f64(&values)),
4136 TidyAgg::Mean(_) => {
4137 if values.is_empty() {
4138 Ok(f64::NAN)
4139 } else {
4140 Ok(kahan_sum_f64(&values) / values.len() as f64)
4141 }
4142 }
4143 TidyAgg::Min(_) => {
4144 if values.is_empty() {
4145 Ok(f64::NAN)
4146 } else {
4147 Ok(values.iter().cloned().fold(f64::INFINITY, |a, b| {
4148 if b.is_nan() || b < a { b } else { a }
4149 }))
4150 }
4151 }
4152 TidyAgg::Max(_) => {
4153 if values.is_empty() {
4154 Ok(f64::NAN)
4155 } else {
4156 Ok(values.iter().cloned().fold(f64::NEG_INFINITY, |a, b| {
4157 if b.is_nan() || b > a { b } else { a }
4158 }))
4159 }
4160 }
4161 TidyAgg::First(_) => {
4162 if values.is_empty() {
4163 Err(TidyError::EmptyGroup)
4164 } else {
4165 Ok(values[0])
4166 }
4167 }
4168 TidyAgg::Last(_) => {
4169 if values.is_empty() {
4170 Err(TidyError::EmptyGroup)
4171 } else {
4172 Ok(*values.last().unwrap())
4173 }
4174 }
4175 TidyAgg::Count => Ok(values.len() as f64),
4176 TidyAgg::Median(_) => {
4177 if values.is_empty() {
4178 Ok(f64::NAN)
4179 } else {
4180 let mut sorted = values.clone();
4181 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4182 let n = sorted.len();
4183 if n % 2 == 1 {
4184 Ok(sorted[n / 2])
4185 } else {
4186 Ok((sorted[n / 2 - 1] + sorted[n / 2]) / 2.0)
4187 }
4188 }
4189 }
4190 TidyAgg::Var(_) => {
4191 if values.len() < 2 {
4192 Ok(f64::NAN)
4193 } else {
4194 let n = values.len() as f64;
4195 let mean = kahan_sum_f64(&values) / n;
4196 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4197 Ok(kahan_sum_f64(&sq_diffs) / (n - 1.0))
4198 }
4199 }
4200 TidyAgg::Sd(_) => {
4201 if values.len() < 2 {
4202 Ok(f64::NAN)
4203 } else {
4204 let n = values.len() as f64;
4205 let mean = kahan_sum_f64(&values) / n;
4206 let sq_diffs: Vec<f64> = values.iter().map(|v| (v - mean) * (v - mean)).collect();
4207 Ok((kahan_sum_f64(&sq_diffs) / (n - 1.0)).sqrt())
4208 }
4209 }
4210 TidyAgg::Quantile(_, p) => {
4211 if values.is_empty() {
4212 Ok(f64::NAN)
4213 } else {
4214 let mut sorted = values.clone();
4215 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4216 let n = sorted.len();
4217 if n == 1 {
4218 return Ok(sorted[0]);
4219 }
4220 let pos = p * (n as f64 - 1.0);
4221 let lo = pos.floor() as usize;
4222 let hi = pos.ceil() as usize;
4223 let frac = pos - lo as f64;
4224 if lo == hi || hi >= n {
4225 Ok(sorted[lo.min(n - 1)])
4226 } else {
4227 Ok(sorted[lo] + frac * (sorted[hi] - sorted[lo]))
4228 }
4229 }
4230 }
4231 TidyAgg::NDistinct(_) => {
4232 use std::collections::BTreeSet;
4233 let distinct: BTreeSet<u64> = values.iter().map(|v| v.to_bits()).collect();
4234 Ok(distinct.len() as f64)
4235 }
4236 TidyAgg::Iqr(_) => {
4237 if values.is_empty() {
4238 Ok(f64::NAN)
4239 } else {
4240 let mut sorted = values.clone();
4241 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
4242 let n = sorted.len();
4243 if n == 1 {
4244 return Ok(0.0);
4245 }
4246 let q1 = {
4247 let pos = 0.25 * (n as f64 - 1.0);
4248 let lo = pos.floor() as usize;
4249 let hi = pos.ceil() as usize;
4250 let frac = pos - lo as f64;
4251 if lo == hi || hi >= n { sorted[lo.min(n - 1)] }
4252 else { sorted[lo] + frac * (sorted[hi] - sorted[lo]) }
4253 };
4254 let q3 = {
4255 let pos = 0.75 * (n as f64 - 1.0);
4256 let lo = pos.floor() as usize;
4257 let hi = pos.ceil() as usize;
4258 let frac = pos - lo as f64;
4259 if lo == hi || hi >= n { sorted[lo.min(n - 1)] }
4260 else { sorted[lo] + frac * (sorted[hi] - sorted[lo]) }
4261 };
4262 Ok(q3 - q1)
4263 }
4264 }
4265 }
4266}
4267
4268#[derive(Debug, Clone)]
4272pub enum TidyAgg {
4273 Count,
4275 Sum(String),
4277 Mean(String),
4279 Min(String),
4281 Max(String),
4283 First(String),
4285 Last(String),
4287 Median(String),
4289 Sd(String),
4291 Var(String),
4293 Quantile(String, f64),
4295 NDistinct(String),
4297 Iqr(String),
4299}
4300
4301#[derive(Debug, Clone)]
4305pub struct ArrangeKey {
4306 pub col_name: String,
4308 pub descending: bool,
4310}
4311
4312impl ArrangeKey {
4313 pub fn asc(col_name: &str) -> Self {
4315 ArrangeKey { col_name: col_name.to_string(), descending: false }
4316 }
4317 pub fn desc(col_name: &str) -> Self {
4319 ArrangeKey { col_name: col_name.to_string(), descending: true }
4320 }
4321}
4322
4323impl TidyView {
4326
4327 pub fn group_by(&self, keys: &[&str]) -> Result<GroupedTidyView, TidyError> {
4341 let mut key_col_indices = Vec::with_capacity(keys.len());
4343 for &key in keys {
4344 let idx = self
4345 .base
4346 .columns
4347 .iter()
4348 .position(|(n, _)| n == key)
4349 .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
4350 key_col_indices.push(idx);
4351 }
4352
4353 let key_names: Vec<String> = keys.iter().map(|s| s.to_string()).collect();
4354
4355 let index = GroupIndex::build_fast(&self.base, &key_col_indices, self.mask.iter_indices(), key_names);
4359
4360 Ok(GroupedTidyView {
4361 view: self.clone(),
4362 index,
4363 })
4364 }
4365
4366 pub fn arrange(&self, keys: &[ArrangeKey]) -> Result<TidyView, TidyError> {
4386 for key in keys {
4388 if self.base.get_column(&key.col_name).is_none() {
4389 return Err(TidyError::ColumnNotFound(key.col_name.clone()));
4390 }
4391 }
4392
4393 let mut row_indices: Vec<usize> = self.mask.iter_indices().collect();
4395
4396 enum ArrangeKeyResolved<'a> {
4407 CatCodes { codes: &'a [u32], descending: bool },
4408 Legacy { col: &'a Column, descending: bool },
4409 }
4410
4411 fn levels_are_sorted(levels: &[String]) -> bool {
4412 levels.windows(2).all(|w| w[0] <= w[1])
4413 }
4414
4415 let resolved: Vec<ArrangeKeyResolved> = keys
4416 .iter()
4417 .map(|key| {
4418 let col = self.base.get_column(&key.col_name).unwrap();
4419 match col {
4420 Column::Categorical { levels, codes } if levels_are_sorted(levels) => {
4421 ArrangeKeyResolved::CatCodes {
4422 codes: codes.as_slice(),
4423 descending: key.descending,
4424 }
4425 }
4426 _ => ArrangeKeyResolved::Legacy { col, descending: key.descending },
4427 }
4428 })
4429 .collect();
4430
4431 row_indices.sort_by(|&a, &b| {
4433 for resolved_key in &resolved {
4434 let ord = match resolved_key {
4435 ArrangeKeyResolved::CatCodes { codes, descending } => {
4436 let raw = codes[a].cmp(&codes[b]);
4437 if *descending { raw.reverse() } else { raw }
4438 }
4439 ArrangeKeyResolved::Legacy { col, descending } => {
4440 let raw = compare_column_rows(col, a, b);
4441 if *descending { raw.reverse() } else { raw }
4442 }
4443 };
4444 if ord != std::cmp::Ordering::Equal {
4445 return ord;
4446 }
4447 }
4448 std::cmp::Ordering::Equal
4449 });
4450
4451 let mut new_columns = Vec::with_capacity(self.proj.len());
4453 for &ci in self.proj.indices() {
4454 let (name, col) = &self.base.columns[ci];
4455 let new_col = gather_column(col, &row_indices);
4456 new_columns.push((name.clone(), new_col));
4457 }
4458 let mut sorted_all_cols = Vec::with_capacity(self.base.ncols());
4461 for (name, col) in &self.base.columns {
4462 sorted_all_cols.push((name.clone(), gather_column(col, &row_indices)));
4463 }
4464
4465 let new_base = DataFrame::from_columns(sorted_all_cols)
4466 .map_err(|e| TidyError::Internal(e.to_string()))?;
4467 let nrows = new_base.nrows();
4468 let new_proj = self.proj.clone();
4469
4470 Ok(TidyView {
4471 base: Rc::new(new_base),
4472 mask: AdaptiveSelection::all(nrows),
4473 proj: new_proj,
4474 })
4475 }
4476
4477 pub fn slice(&self, start: usize, end: usize) -> TidyView {
4484 let visible: Vec<usize> = self.mask.iter_indices().collect();
4485 let n = visible.len();
4486 let s = start.min(n);
4487 let e = end.min(n);
4488 let selected = if s >= e { vec![] } else { visible[s..e].to_vec() };
4489 self.view_from_row_indices(selected)
4490 }
4491
4492 pub fn slice_head(&self, n: usize) -> TidyView {
4494 self.slice(0, n)
4495 }
4496
4497 pub fn slice_tail(&self, n: usize) -> TidyView {
4499 let total = self.mask.count();
4500 let start = total.saturating_sub(n);
4501 self.slice(start, total)
4502 }
4503
4504 pub fn slice_sample(&self, n: usize, seed: u64) -> TidyView {
4509 let mut visible: Vec<usize> = self.mask.iter_indices().collect();
4510 let total = visible.len();
4511 if n >= total {
4512 return self.view_from_row_indices(visible);
4513 }
4514 let mut rng = seed;
4516 let selected_count = n;
4517 for i in 0..selected_count {
4518 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407);
4520 let j = i + (rng as usize % (total - i));
4521 visible.swap(i, j);
4522 }
4523 visible.truncate(selected_count);
4524 visible.sort_unstable();
4526 self.view_from_row_indices(visible)
4527 }
4528
4529 pub fn distinct(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
4541 let mut col_indices = Vec::with_capacity(cols.len());
4543 for &name in cols {
4544 let idx = self
4545 .base
4546 .columns
4547 .iter()
4548 .position(|(n, _)| n == name)
4549 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
4550 col_indices.push(idx);
4551 }
4552
4553 if let Some(cat_keys) = collect_categorical_keys(&self.base, &col_indices) {
4559 let mut seen_codes: BTreeSet<Vec<u32>> = BTreeSet::new();
4560 let mut selected_rows: Vec<usize> = Vec::new();
4561 let mut key_buf: Vec<u32> = Vec::with_capacity(cat_keys.codes.len());
4562 for row in self.mask.iter_indices() {
4563 key_buf.clear();
4564 for c in &cat_keys.codes {
4565 key_buf.push(c[row]);
4566 }
4567 if seen_codes.insert(key_buf.clone()) {
4568 selected_rows.push(row);
4569 }
4570 }
4571 return Ok(self.view_from_row_indices(selected_rows));
4572 }
4573
4574 let mut seen_keys: BTreeSet<Vec<String>> = BTreeSet::new();
4576 let mut selected_rows: Vec<usize> = Vec::new();
4577
4578 for row in self.mask.iter_indices() {
4579 let key: Vec<String> = if col_indices.is_empty() {
4580 vec!["__all__".into()]
4581 } else {
4582 col_indices
4583 .iter()
4584 .map(|&ci| self.base.columns[ci].1.get_display(row))
4585 .collect()
4586 };
4587
4588 if seen_keys.insert(key) {
4589 selected_rows.push(row);
4590 }
4591 }
4592
4593 Ok(self.view_from_row_indices(selected_rows))
4594 }
4595
4596 pub fn inner_join(
4609 &self,
4610 right: &TidyView,
4611 on: &[(&str, &str)],
4612 ) -> Result<TidyFrame, TidyError> {
4613 let (left_rows, right_rows) = join_match_rows(self, right, on, JoinKind::Inner)?;
4614 build_join_frame(self, right, &left_rows, &right_rows, on, false)
4615 }
4616
4617 pub fn left_join(
4621 &self,
4622 right: &TidyView,
4623 on: &[(&str, &str)],
4624 ) -> Result<TidyFrame, TidyError> {
4625 let (left_rows, right_rows_opt) =
4626 join_match_rows_optional(self, right, on, JoinKind::Left)?;
4627 build_left_join_frame(self, right, &left_rows, &right_rows_opt, on)
4628 }
4629
4630 pub fn semi_join(
4634 &self,
4635 right: &TidyView,
4636 on: &[(&str, &str)],
4637 ) -> Result<TidyView, TidyError> {
4638 let included = semi_anti_match_rows(self, right, on, true)?;
4639 Ok(self.view_from_row_indices(included))
4640 }
4641
4642 pub fn anti_join(
4646 &self,
4647 right: &TidyView,
4648 on: &[(&str, &str)],
4649 ) -> Result<TidyView, TidyError> {
4650 let included = semi_anti_match_rows(self, right, on, false)?;
4651 Ok(self.view_from_row_indices(included))
4652 }
4653
4654 fn view_from_row_indices(&self, row_indices: Vec<usize>) -> TidyView {
4659 let nrows_base = self.base.nrows();
4660 let mut words = vec![0u64; nwords_for(nrows_base)];
4661 for &r in &row_indices {
4662 words[r / 64] |= 1u64 << (r % 64);
4663 }
4664 TidyView {
4665 base: Rc::clone(&self.base),
4666 mask: AdaptiveSelection::from_predicate_result(words, nrows_base),
4667 proj: self.proj.clone(),
4668 }
4669 }
4670}
4671
4672#[derive(Clone, Copy)]
4675enum JoinKind { Inner, Left }
4676
4677fn resolve_join_keys(
4679 left: &TidyView,
4680 right: &TidyView,
4681 on: &[(&str, &str)],
4682) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
4683 let mut li = Vec::new();
4684 let mut ri = Vec::new();
4685 for &(lk, rk) in on {
4686 let l = left.base.columns.iter().position(|(n, _)| n == lk)
4687 .ok_or_else(|| TidyError::ColumnNotFound(lk.to_string()))?;
4688 let r = right.base.columns.iter().position(|(n, _)| n == rk)
4689 .ok_or_else(|| TidyError::ColumnNotFound(rk.to_string()))?;
4690 li.push(l);
4691 ri.push(r);
4692 }
4693 Ok((li, ri))
4694}
4695
4696fn row_key(base: &DataFrame, col_indices: &[usize], row: usize) -> Vec<String> {
4698 col_indices
4699 .iter()
4700 .map(|&ci| base.columns[ci].1.get_display(row))
4701 .collect()
4702}
4703
4704fn build_right_lookup(
4707 right: &TidyView,
4708 right_key_cols: &[usize],
4709) -> Vec<(Vec<String>, usize)> {
4710 let mut lookup: Vec<(Vec<String>, usize)> = right
4711 .mask
4712 .iter_indices()
4713 .map(|r| (row_key(&right.base, right_key_cols, r), r))
4714 .collect();
4715 lookup.sort_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
4717 lookup
4718}
4719
4720fn find_matches(lookup: &[(Vec<String>, usize)], key: &[String]) -> Vec<usize> {
4722 let key_vec = key.to_vec();
4724 let start = lookup.partition_point(|(k, _)| k.as_slice() < key_vec.as_slice());
4725 let mut matches = Vec::new();
4726 for (k, r) in &lookup[start..] {
4727 if k == &key_vec {
4728 matches.push(*r);
4729 } else {
4730 break;
4731 }
4732 }
4733 matches
4734}
4735
4736fn build_right_lookup_btree(
4741 right: &TidyView,
4742 right_key_cols: &[usize],
4743) -> BTreeMap<Vec<String>, Vec<usize>> {
4744 let mut lookup: BTreeMap<Vec<String>, Vec<usize>> = BTreeMap::new();
4745 for r in right.mask.iter_indices() {
4746 let key = row_key(&right.base, right_key_cols, r);
4747 lookup.entry(key).or_default().push(r);
4748 }
4749 lookup
4750}
4751
4752pub(crate) struct CategoricalJoinKeys<'a> {
4771 pub(crate) left_codes: Vec<&'a [u32]>,
4773 pub(crate) right_codes: Vec<&'a [u32]>,
4775 pub(crate) right_to_left: Vec<Vec<Option<u32>>>,
4778}
4779
4780pub(crate) fn collect_categorical_join_keys<'a>(
4785 left_base: &'a DataFrame,
4786 left_cols: &[usize],
4787 right_base: &'a DataFrame,
4788 right_cols: &[usize],
4789) -> Option<CategoricalJoinKeys<'a>> {
4790 if left_cols.is_empty() || left_cols.len() != right_cols.len() {
4791 return None;
4792 }
4793 let mut left_codes = Vec::with_capacity(left_cols.len());
4794 let mut right_codes = Vec::with_capacity(left_cols.len());
4795 let mut right_to_left = Vec::with_capacity(left_cols.len());
4796
4797 for (li, ri) in left_cols.iter().zip(right_cols.iter()) {
4798 match (&left_base.columns[*li].1, &right_base.columns[*ri].1) {
4799 (
4800 Column::Categorical { levels: ll, codes: lc },
4801 Column::Categorical { levels: rl, codes: rc },
4802 ) => {
4803 let mut left_lookup: BTreeMap<&str, u32> = BTreeMap::new();
4807 for (i, lv) in ll.iter().enumerate() {
4808 left_lookup.insert(lv.as_str(), i as u32);
4809 }
4810 let remap: Vec<Option<u32>> = rl
4812 .iter()
4813 .map(|rv| left_lookup.get(rv.as_str()).copied())
4814 .collect();
4815 left_codes.push(lc.as_slice());
4816 right_codes.push(rc.as_slice());
4817 right_to_left.push(remap);
4818 }
4819 _ => return None,
4820 }
4821 }
4822 Some(CategoricalJoinKeys {
4823 left_codes,
4824 right_codes,
4825 right_to_left,
4826 })
4827}
4828
4829fn build_right_lookup_btree_categorical<'a>(
4834 cat: &CategoricalJoinKeys<'a>,
4835 right_visible: impl Iterator<Item = usize>,
4836) -> BTreeMap<Vec<u32>, Vec<usize>> {
4837 let nkeys = cat.right_codes.len();
4838 let mut lookup: BTreeMap<Vec<u32>, Vec<usize>> = BTreeMap::new();
4839 let mut key_buf: Vec<u32> = Vec::with_capacity(nkeys);
4840 for r in right_visible {
4841 key_buf.clear();
4842 let mut all_mappable = true;
4843 for i in 0..nkeys {
4844 let rc = cat.right_codes[i][r];
4845 match cat.right_to_left[i][rc as usize] {
4846 Some(lc) => key_buf.push(lc),
4847 None => {
4848 all_mappable = false;
4849 break;
4850 }
4851 }
4852 }
4853 if all_mappable {
4854 lookup.entry(key_buf.clone()).or_default().push(r);
4855 }
4856 }
4857 lookup
4858}
4859
4860#[inline]
4862fn left_join_key_codes(cat: &CategoricalJoinKeys<'_>, row: usize, buf: &mut Vec<u32>) {
4863 buf.clear();
4864 for codes in &cat.left_codes {
4865 buf.push(codes[row]);
4866 }
4867}
4868
4869fn join_match_rows(
4871 left: &TidyView,
4872 right: &TidyView,
4873 on: &[(&str, &str)],
4874 _kind: JoinKind,
4875) -> Result<(Vec<usize>, Vec<usize>), TidyError> {
4876 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
4877
4878 if let Some(cat) =
4882 collect_categorical_join_keys(&left.base, &left_key_cols, &right.base, &right_key_cols)
4883 {
4884 let lookup = build_right_lookup_btree_categorical(&cat, right.mask.iter_indices());
4885 let mut out_left = Vec::new();
4886 let mut out_right = Vec::new();
4887 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.left_codes.len());
4888 for l_row in left.mask.iter_indices() {
4889 left_join_key_codes(&cat, l_row, &mut key_buf);
4890 if let Some(matches) = lookup.get(&key_buf) {
4891 for &r_row in matches {
4892 out_left.push(l_row);
4893 out_right.push(r_row);
4894 }
4895 }
4896 }
4897 return Ok((out_left, out_right));
4898 }
4899
4900 let lookup = build_right_lookup_btree(right, &right_key_cols);
4902
4903 let mut out_left = Vec::new();
4904 let mut out_right = Vec::new();
4905
4906 for l_row in left.mask.iter_indices() {
4907 let key = row_key(&left.base, &left_key_cols, l_row);
4908 if let Some(matches) = lookup.get(&key) {
4909 for &r_row in matches {
4910 out_left.push(l_row);
4911 out_right.push(r_row);
4912 }
4913 }
4914 }
4915 Ok((out_left, out_right))
4916}
4917
4918fn join_match_rows_optional(
4920 left: &TidyView,
4921 right: &TidyView,
4922 on: &[(&str, &str)],
4923 _kind: JoinKind,
4924) -> Result<(Vec<usize>, Vec<Option<usize>>), TidyError> {
4925 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
4926
4927 if let Some(cat) =
4929 collect_categorical_join_keys(&left.base, &left_key_cols, &right.base, &right_key_cols)
4930 {
4931 let lookup = build_right_lookup_btree_categorical(&cat, right.mask.iter_indices());
4932 let mut out_left = Vec::new();
4933 let mut out_right: Vec<Option<usize>> = Vec::new();
4934 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.left_codes.len());
4935 for l_row in left.mask.iter_indices() {
4936 left_join_key_codes(&cat, l_row, &mut key_buf);
4937 match lookup.get(&key_buf) {
4938 Some(matches) if !matches.is_empty() => {
4939 for &r_row in matches {
4940 out_left.push(l_row);
4941 out_right.push(Some(r_row));
4942 }
4943 }
4944 _ => {
4945 out_left.push(l_row);
4946 out_right.push(None);
4947 }
4948 }
4949 }
4950 return Ok((out_left, out_right));
4951 }
4952
4953 let lookup = build_right_lookup_btree(right, &right_key_cols);
4955
4956 let mut out_left = Vec::new();
4957 let mut out_right: Vec<Option<usize>> = Vec::new();
4958
4959 for l_row in left.mask.iter_indices() {
4960 let key = row_key(&left.base, &left_key_cols, l_row);
4961 match lookup.get(&key) {
4962 Some(matches) if !matches.is_empty() => {
4963 for &r_row in matches {
4964 out_left.push(l_row);
4965 out_right.push(Some(r_row));
4966 }
4967 }
4968 _ => {
4969 out_left.push(l_row);
4970 out_right.push(None);
4971 }
4972 }
4973 }
4974 Ok((out_left, out_right))
4975}
4976
4977fn semi_anti_match_rows(
4979 left: &TidyView,
4980 right: &TidyView,
4981 on: &[(&str, &str)],
4982 semi: bool,
4983) -> Result<Vec<usize>, TidyError> {
4984 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
4985
4986 if let Some(cat) =
4988 collect_categorical_join_keys(&left.base, &left_key_cols, &right.base, &right_key_cols)
4989 {
4990 let lookup = build_right_lookup_btree_categorical(&cat, right.mask.iter_indices());
4991 let mut out = Vec::new();
4992 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.left_codes.len());
4993 for l_row in left.mask.iter_indices() {
4994 left_join_key_codes(&cat, l_row, &mut key_buf);
4995 let has_match = lookup.contains_key(&key_buf);
4996 if has_match == semi {
4997 out.push(l_row);
4998 }
4999 }
5000 return Ok(out);
5001 }
5002
5003 let lookup = build_right_lookup_btree(right, &right_key_cols);
5005
5006 let mut out = Vec::new();
5007 for l_row in left.mask.iter_indices() {
5008 let key = row_key(&left.base, &left_key_cols, l_row);
5009 let has_match = lookup.contains_key(&key);
5010 if has_match == semi {
5011 out.push(l_row);
5012 }
5013 }
5014 Ok(out)
5015}
5016
5017fn build_join_frame(
5020 left: &TidyView,
5021 right: &TidyView,
5022 left_rows: &[usize],
5023 right_rows: &[usize],
5024 on: &[(&str, &str)],
5025 _include_unmatched: bool,
5026) -> Result<TidyFrame, TidyError> {
5027 let right_key_names: std::collections::BTreeSet<&str> =
5028 on.iter().map(|(_, rk)| *rk).collect();
5029
5030 let n = left_rows.len();
5031 let mut columns: Vec<(String, Column)> = Vec::new();
5032
5033 for &ci in left.proj.indices() {
5035 let (name, col) = &left.base.columns[ci];
5036 columns.push((name.clone(), gather_column(col, left_rows)));
5037 }
5038
5039 for &ci in right.proj.indices() {
5041 let (name, col) = &right.base.columns[ci];
5042 if right_key_names.contains(name.as_str()) {
5043 continue;
5044 }
5045 columns.push((name.clone(), gather_column(col, right_rows)));
5046 }
5047
5048 assert_eq!(n, left_rows.len());
5049 let df = DataFrame::from_columns(columns)
5050 .map_err(|e| TidyError::Internal(e.to_string()))?;
5051 Ok(TidyFrame::from_df(df))
5052}
5053
5054fn build_left_join_frame(
5056 left: &TidyView,
5057 right: &TidyView,
5058 left_rows: &[usize],
5059 right_rows_opt: &[Option<usize>],
5060 on: &[(&str, &str)],
5061) -> Result<TidyFrame, TidyError> {
5062 let right_key_names: std::collections::BTreeSet<&str> =
5063 on.iter().map(|(_, rk)| *rk).collect();
5064
5065 let mut columns: Vec<(String, Column)> = Vec::new();
5066
5067 for &ci in left.proj.indices() {
5069 let (name, col) = &left.base.columns[ci];
5070 columns.push((name.clone(), gather_column(col, left_rows)));
5071 }
5072
5073 for &ci in right.proj.indices() {
5075 let (name, col) = &right.base.columns[ci];
5076 if right_key_names.contains(name.as_str()) {
5077 continue;
5078 }
5079 let new_col = gather_column_nullable(col, right_rows_opt);
5080 columns.push((name.clone(), new_col));
5081 }
5082
5083 let df = DataFrame::from_columns(columns)
5084 .map_err(|e| TidyError::Internal(e.to_string()))?;
5085 Ok(TidyFrame::from_df(df))
5086}
5087
5088fn compare_column_rows(col: &Column, a: usize, b: usize) -> std::cmp::Ordering {
5095 match col {
5096 Column::Int(v) => v[a].cmp(&v[b]),
5097 Column::Float(v) => {
5098 let va = v[a];
5099 let vb = v[b];
5100 match (va.is_nan(), vb.is_nan()) {
5101 (true, true) => std::cmp::Ordering::Equal,
5102 (true, false) => std::cmp::Ordering::Greater, (false, true) => std::cmp::Ordering::Less,
5104 (false, false) => va.partial_cmp(&vb).unwrap_or(std::cmp::Ordering::Equal),
5105 }
5106 }
5107 Column::Bool(v) => v[a].cmp(&v[b]),
5108 Column::Str(v) => v[a].cmp(&v[b]),
5109 Column::Categorical { levels, codes } => {
5110 levels[codes[a] as usize].cmp(&levels[codes[b] as usize])
5112 }
5113 Column::CategoricalAdaptive(cc) => {
5114 cc.get(a).cmp(&cc.get(b))
5117 }
5118 Column::DateTime(v) => v[a].cmp(&v[b]),
5119 }
5120}
5121
5122#[cfg(test)]
5143mod phase10_unit_tests {
5144 use super::*;
5145
5146 fn make_df() -> DataFrame {
5147 DataFrame::from_columns(vec![
5148 ("x".into(), Column::Int(vec![1, 2, 3, 4, 5])),
5149 ("y".into(), Column::Float(vec![1.0, 2.0, 3.0, 4.0, 5.0])),
5150 ("flag".into(), Column::Bool(vec![true, false, true, false, true])),
5151 ])
5152 .unwrap()
5153 }
5154
5155 #[test]
5156 fn bitmask_all_true_count() {
5157 let m = BitMask::all_true(5);
5158 assert_eq!(m.count_ones(), 5);
5159 }
5160
5161 #[test]
5162 fn bitmask_all_false_count() {
5163 let m = BitMask::all_false(5);
5164 assert_eq!(m.count_ones(), 0);
5165 }
5166
5167 #[test]
5168 fn bitmask_tail_bits_clean() {
5169 let m = BitMask::all_true(65);
5171 assert_eq!(m.count_ones(), 65);
5172 assert_eq!(m.words.len(), 2);
5173 assert_eq!(m.words[1], 1u64); }
5175
5176 #[test]
5177 fn bitmask_and_semantics() {
5178 let a = BitMask::from_bools(&[true, true, false, true]);
5179 let b = BitMask::from_bools(&[true, false, true, true]);
5180 let c = a.and(&b);
5181 assert!(c.get(0));
5182 assert!(!c.get(1));
5183 assert!(!c.get(2));
5184 assert!(c.get(3));
5185 }
5186
5187 #[test]
5188 fn tidy_view_nrows_ncols() {
5189 let df = make_df();
5190 let v = df.tidy();
5191 assert_eq!(v.nrows(), 5);
5192 assert_eq!(v.ncols(), 3);
5193 }
5194
5195 #[test]
5196 fn filter_basic() {
5197 let df = make_df();
5198 let v = df.tidy();
5199 let filtered = v
5200 .filter(&DExpr::BinOp {
5201 op: DBinOp::Gt,
5202 left: Box::new(DExpr::Col("x".into())),
5203 right: Box::new(DExpr::LitInt(2)),
5204 })
5205 .unwrap();
5206 assert_eq!(filtered.nrows(), 3);
5207 }
5208
5209 #[test]
5210 fn filter_empty_df() {
5211 let df = DataFrame::from_columns(vec![
5212 ("x".into(), Column::Int(vec![])),
5213 ])
5214 .unwrap();
5215 let v = df.tidy();
5216 let filtered = v
5217 .filter(&DExpr::BinOp {
5218 op: DBinOp::Gt,
5219 left: Box::new(DExpr::Col("x".into())),
5220 right: Box::new(DExpr::LitInt(0)),
5221 })
5222 .unwrap();
5223 assert_eq!(filtered.nrows(), 0);
5224 }
5225
5226 #[test]
5227 fn select_reorder() {
5228 let df = make_df();
5229 let v = df.tidy();
5230 let s = v.select(&["y", "x"]).unwrap();
5231 assert_eq!(s.column_names(), vec!["y", "x"]);
5232 }
5233
5234 #[test]
5235 fn select_zero_cols() {
5236 let df = make_df();
5237 let v = df.tidy();
5238 let s = v.select(&[]).unwrap();
5239 assert_eq!(s.ncols(), 0);
5240 assert_eq!(s.nrows(), 5);
5241 }
5242
5243 #[test]
5244 fn select_unknown_col() {
5245 let df = make_df();
5246 let v = df.tidy();
5247 let err = v.select(&["nonexistent"]).unwrap_err();
5248 assert!(matches!(err, TidyError::ColumnNotFound(_)));
5249 }
5250
5251 #[test]
5252 fn select_duplicate_col() {
5253 let df = make_df();
5254 let v = df.tidy();
5255 let err = v.select(&["x", "x"]).unwrap_err();
5256 assert!(matches!(err, TidyError::DuplicateColumn(_)));
5257 }
5258
5259 #[test]
5260 fn mutate_new_col() {
5261 let df = make_df();
5262 let v = df.tidy();
5263 let frame = v
5264 .mutate(&[("z", DExpr::BinOp {
5265 op: DBinOp::Add,
5266 left: Box::new(DExpr::Col("x".into())),
5267 right: Box::new(DExpr::LitInt(10)),
5268 })])
5269 .unwrap();
5270 let b = frame.borrow();
5271 let z = b.get_column("z").unwrap();
5272 assert_eq!(z.len(), 5);
5273 if let Column::Int(v) = z {
5274 assert_eq!(v[0], 11);
5275 assert_eq!(v[4], 15);
5276 } else {
5277 panic!("expected Int column");
5278 }
5279 }
5280
5281 #[test]
5282 fn mutate_type_promotion() {
5283 let df = make_df();
5284 let v = df.tidy();
5285 let frame = v
5287 .mutate(&[("promoted", DExpr::BinOp {
5288 op: DBinOp::Add,
5289 left: Box::new(DExpr::Col("x".into())),
5290 right: Box::new(DExpr::Col("y".into())),
5291 })])
5292 .unwrap();
5293 let b = frame.borrow();
5294 let col = b.get_column("promoted").unwrap();
5295 assert!(matches!(col, Column::Float(_)));
5296 }
5297}
5298
5299impl TidyError {
5351 pub fn schema_mismatch(msg: impl Into<String>) -> Self {
5353 TidyError::Internal(format!("schema mismatch: {}", msg.into()))
5354 }
5355 pub fn join_type_mismatch(col: &str, lt: &str, rt: &str) -> Self {
5357 TidyError::TypeMismatch {
5358 expected: format!("{} (from left key `{}`)", lt, col),
5359 got: rt.to_string(),
5360 }
5361 }
5362 pub fn duplicate_key(key: impl Into<String>) -> Self {
5364 TidyError::DuplicateColumn(format!("duplicate key: {}", key.into()))
5365 }
5366 pub fn empty_selection(msg: impl Into<String>) -> Self {
5368 TidyError::Internal(format!("empty selection: {}", msg.into()))
5369 }
5370}
5371
5372#[derive(Debug, Clone)]
5381pub struct NullableColumn<T: Clone> {
5382 pub values: Vec<T>,
5383 pub validity: BitMask,
5384}
5385
5386impl<T: Clone + Default> NullableColumn<T> {
5387 pub fn from_values(values: Vec<T>) -> Self {
5389 let n = values.len();
5390 Self {
5391 values,
5392 validity: BitMask::all_true(n),
5393 }
5394 }
5395
5396 pub fn new(values: Vec<T>, validity: BitMask) -> Self {
5399 assert_eq!(values.len(), validity.nrows(), "NullableColumn: length mismatch");
5400 Self { values, validity }
5401 }
5402
5403 pub fn len(&self) -> usize {
5405 self.values.len()
5406 }
5407
5408 pub fn is_null(&self, i: usize) -> bool {
5410 !self.validity.get(i)
5411 }
5412
5413 pub fn get(&self, i: usize) -> Option<&T> {
5415 if self.validity.get(i) { Some(&self.values[i]) } else { None }
5416 }
5417
5418 pub fn count_valid(&self) -> usize {
5420 self.validity.count_ones()
5421 }
5422
5423 pub fn gather(&self, indices: &[usize]) -> Self {
5425 let mut vals = Vec::with_capacity(indices.len());
5426 let mut bools = Vec::with_capacity(indices.len());
5427 for &i in indices {
5428 vals.push(self.values[i].clone());
5429 bools.push(self.validity.get(i));
5430 }
5431 let validity = BitMask::from_bools(&bools);
5432 Self { values: vals, validity }
5433 }
5434}
5435
5436#[derive(Debug, Clone)]
5449pub enum NullCol {
5450 Int(NullableColumn<i64>),
5452 Float(NullableColumn<f64>),
5454 Str(NullableColumn<String>),
5456 Bool(NullableColumn<bool>),
5458}
5459
5460impl NullCol {
5461 pub fn len(&self) -> usize {
5463 match self {
5464 NullCol::Int(c) => c.len(),
5465 NullCol::Float(c) => c.len(),
5466 NullCol::Str(c) => c.len(),
5467 NullCol::Bool(c) => c.len(),
5468 }
5469 }
5470
5471 pub fn is_null(&self, i: usize) -> bool {
5473 match self {
5474 NullCol::Int(c) => c.is_null(i),
5475 NullCol::Float(c) => c.is_null(i),
5476 NullCol::Str(c) => c.is_null(i),
5477 NullCol::Bool(c) => c.is_null(i),
5478 }
5479 }
5480
5481 pub fn type_name(&self) -> &'static str {
5483 match self {
5484 NullCol::Int(_) => "Int",
5485 NullCol::Float(_) => "Float",
5486 NullCol::Str(_) => "Str",
5487 NullCol::Bool(_) => "Bool",
5488 }
5489 }
5490
5491 pub fn from_column(col: &Column) -> Self {
5493 match col {
5494 Column::Int(v) => NullCol::Int(NullableColumn::from_values(v.clone())),
5495 Column::Float(v) => NullCol::Float(NullableColumn::from_values(v.clone())),
5496 Column::Str(v) => NullCol::Str(NullableColumn::from_values(v.clone())),
5497 Column::Bool(v) => NullCol::Bool(NullableColumn::from_values(v.clone())),
5498 Column::Categorical { levels, codes } => {
5500 let strings: Vec<String> = codes.iter().map(|&c| levels[c as usize].clone()).collect();
5501 NullCol::Str(NullableColumn::from_values(strings))
5502 }
5503 Column::CategoricalAdaptive(cc) => {
5504 let n = cc.len();
5505 let strings: Vec<String> = (0..n)
5506 .map(|i| match cc.get(i) {
5507 None => String::new(),
5508 Some(b) => String::from_utf8_lossy(b).into_owned(),
5509 })
5510 .collect();
5511 NullCol::Str(NullableColumn::from_values(strings))
5512 }
5513 Column::DateTime(v) => NullCol::Int(NullableColumn::from_values(v.clone())),
5514 }
5515 }
5516
5517 pub fn to_column_strict(&self) -> Result<Column, TidyError> {
5520 match self {
5521 NullCol::Int(nc) => {
5522 if nc.count_valid() == nc.len() {
5523 Ok(Column::Int(nc.values.clone()))
5524 } else {
5525 Err(TidyError::Internal("null values in non-nullable context".into()))
5526 }
5527 }
5528 NullCol::Float(nc) => {
5529 if nc.count_valid() == nc.len() {
5530 Ok(Column::Float(nc.values.clone()))
5531 } else {
5532 Err(TidyError::Internal("null values in non-nullable context".into()))
5533 }
5534 }
5535 NullCol::Str(nc) => {
5536 if nc.count_valid() == nc.len() {
5537 Ok(Column::Str(nc.values.clone()))
5538 } else {
5539 Err(TidyError::Internal("null values in non-nullable context".into()))
5540 }
5541 }
5542 NullCol::Bool(nc) => {
5543 if nc.count_valid() == nc.len() {
5544 Ok(Column::Bool(nc.values.clone()))
5545 } else {
5546 Err(TidyError::Internal("null values in non-nullable context".into()))
5547 }
5548 }
5549 }
5550 }
5551
5552 pub fn to_column_filled(&self) -> Column {
5555 match self {
5556 NullCol::Int(nc) => Column::Int(nc.values.clone()),
5557 NullCol::Float(nc) => {
5558 let v: Vec<f64> = (0..nc.len())
5559 .map(|i| if nc.is_null(i) { f64::NAN } else { nc.values[i] })
5560 .collect();
5561 Column::Float(v)
5562 }
5563 NullCol::Str(nc) => Column::Str(nc.values.clone()),
5564 NullCol::Bool(nc) => Column::Bool(nc.values.clone()),
5565 }
5566 }
5567
5568 pub fn get_display(&self, i: usize) -> String {
5570 if self.is_null(i) {
5571 return "null".to_string();
5572 }
5573 match self {
5574 NullCol::Int(nc) => format!("{}", nc.values[i]),
5575 NullCol::Float(nc) => format!("{}", nc.values[i]),
5576 NullCol::Str(nc) => nc.values[i].clone(),
5577 NullCol::Bool(nc) => format!("{}", nc.values[i]),
5578 }
5579 }
5580
5581 pub fn null_of_type(type_name: &str, len: usize) -> Self {
5583 match type_name {
5584 "Int" => NullCol::Int(NullableColumn {
5585 values: vec![0i64; len],
5586 validity: BitMask::all_false(len),
5587 }),
5588 "Float" => NullCol::Float(NullableColumn {
5589 values: vec![0.0f64; len],
5590 validity: BitMask::all_false(len),
5591 }),
5592 "Bool" => NullCol::Bool(NullableColumn {
5593 values: vec![false; len],
5594 validity: BitMask::all_false(len),
5595 }),
5596 _ => NullCol::Str(NullableColumn {
5597 values: vec![String::new(); len],
5598 validity: BitMask::all_false(len),
5599 }),
5600 }
5601 }
5602
5603 pub fn gather(&self, indices: &[usize]) -> Self {
5605 match self {
5606 NullCol::Int(nc) => NullCol::Int(nc.gather(indices)),
5607 NullCol::Float(nc) => NullCol::Float(nc.gather(indices)),
5608 NullCol::Str(nc) => NullCol::Str(nc.gather(indices)),
5609 NullCol::Bool(nc) => NullCol::Bool(nc.gather(indices)),
5610 }
5611 }
5612}
5613
5614#[derive(Debug, Clone)]
5617pub struct NullableFrame {
5618 pub columns: Vec<(String, NullCol)>,
5619}
5620
5621impl NullableFrame {
5622 pub fn new() -> Self {
5624 Self { columns: Vec::new() }
5625 }
5626
5627 pub fn nrows(&self) -> usize {
5629 self.columns.first().map(|(_, c)| c.len()).unwrap_or(0)
5630 }
5631
5632 pub fn ncols(&self) -> usize {
5634 self.columns.len()
5635 }
5636
5637 pub fn column_names(&self) -> Vec<&str> {
5639 self.columns.iter().map(|(n, _)| n.as_str()).collect()
5640 }
5641
5642 pub fn get_column(&self, name: &str) -> Option<&NullCol> {
5644 self.columns.iter().find(|(n, _)| n == name).map(|(_, c)| c)
5645 }
5646
5647 pub fn to_dataframe_filled(&self) -> DataFrame {
5649 let cols: Vec<(String, Column)> = self.columns.iter()
5650 .map(|(n, c)| (n.clone(), c.to_column_filled()))
5651 .collect();
5652 DataFrame { columns: cols }
5654 }
5655
5656 pub fn to_tidy_frame_filled(&self) -> TidyFrame {
5658 TidyFrame::from_df(self.to_dataframe_filled())
5659 }
5660
5661 pub fn to_tidy_view_filled(&self) -> TidyView {
5663 TidyView::from_df(self.to_dataframe_filled())
5664 }
5665}
5666
5667impl Default for NullableFrame {
5668 fn default() -> Self { Self::new() }
5669}
5670
5671fn gather_column_nullable_null(col: &Column, indices: &[Option<usize>]) -> NullCol {
5676 if matches!(col, Column::CategoricalAdaptive(_)) {
5677 return gather_column_nullable_null(&col.to_legacy_categorical(), indices);
5678 }
5679 match col {
5680 Column::Int(v) => {
5681 let mut vals = Vec::with_capacity(indices.len());
5682 let mut valid = Vec::with_capacity(indices.len());
5683 for &idx in indices {
5684 match idx {
5685 Some(i) => { vals.push(v[i]); valid.push(true); }
5686 None => { vals.push(0); valid.push(false); }
5687 }
5688 }
5689 NullCol::Int(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5690 }
5691 Column::Float(v) => {
5692 let mut vals = Vec::with_capacity(indices.len());
5693 let mut valid = Vec::with_capacity(indices.len());
5694 for &idx in indices {
5695 match idx {
5696 Some(i) => { vals.push(v[i]); valid.push(true); }
5697 None => { vals.push(0.0); valid.push(false); }
5698 }
5699 }
5700 NullCol::Float(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5701 }
5702 Column::Str(v) => {
5703 let mut vals = Vec::with_capacity(indices.len());
5704 let mut valid = Vec::with_capacity(indices.len());
5705 for &idx in indices {
5706 match idx {
5707 Some(i) => { vals.push(v[i].clone()); valid.push(true); }
5708 None => { vals.push(String::new()); valid.push(false); }
5709 }
5710 }
5711 NullCol::Str(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5712 }
5713 Column::Bool(v) => {
5714 let mut vals = Vec::with_capacity(indices.len());
5715 let mut valid = Vec::with_capacity(indices.len());
5716 for &idx in indices {
5717 match idx {
5718 Some(i) => { vals.push(v[i]); valid.push(true); }
5719 None => { vals.push(false); valid.push(false); }
5720 }
5721 }
5722 NullCol::Bool(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5723 }
5724 Column::Categorical { levels, codes } => {
5725 let mut vals = Vec::with_capacity(indices.len());
5726 let mut valid = Vec::with_capacity(indices.len());
5727 for &idx in indices {
5728 match idx {
5729 Some(i) => { vals.push(levels[codes[i] as usize].clone()); valid.push(true); }
5730 None => { vals.push(String::new()); valid.push(false); }
5731 }
5732 }
5733 NullCol::Str(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5734 }
5735 Column::DateTime(v) => {
5736 let mut vals = Vec::with_capacity(indices.len());
5737 let mut valid = Vec::with_capacity(indices.len());
5738 for &idx in indices {
5739 match idx {
5740 Some(i) => { vals.push(v[i]); valid.push(true); }
5741 None => { vals.push(0); valid.push(false); }
5742 }
5743 }
5744 NullCol::Int(NullableColumn::new(vals, BitMask::from_bools(&valid)))
5745 }
5746 Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
5747 }
5748}
5749
5750pub type AcrossFn = Box<dyn Fn(&str, &Column) -> Result<Column, TidyError>>;
5757
5758pub struct AcrossTransform {
5760 pub fn_name: String,
5762 pub func: AcrossFn,
5764}
5765
5766impl AcrossTransform {
5767 pub fn new(fn_name: impl Into<String>, func: impl Fn(&str, &Column) -> Result<Column, TidyError> + 'static) -> Self {
5769 Self {
5770 fn_name: fn_name.into(),
5771 func: Box::new(func),
5772 }
5773 }
5774}
5775
5776pub struct AcrossSpec {
5778 pub cols: Vec<String>,
5780 pub transform: AcrossTransform,
5782 pub name_template: Option<String>,
5785}
5786
5787impl AcrossSpec {
5788 pub fn new(cols: impl IntoIterator<Item = impl Into<String>>, transform: AcrossTransform) -> Self {
5790 Self {
5791 cols: cols.into_iter().map(|c| c.into()).collect(),
5792 transform,
5793 name_template: None,
5794 }
5795 }
5796
5797 pub fn with_template(mut self, tmpl: impl Into<String>) -> Self {
5799 self.name_template = Some(tmpl.into());
5800 self
5801 }
5802
5803 pub fn output_name(&self, col_name: &str) -> String {
5805 match &self.name_template {
5806 Some(tmpl) => tmpl
5807 .replace("{col}", col_name)
5808 .replace("{fn}", &self.transform.fn_name),
5809 None => format!("{}_{}", col_name, self.transform.fn_name),
5810 }
5811 }
5812}
5813
5814#[derive(Debug, Clone)]
5818pub struct JoinSuffix {
5819 pub left: String,
5820 pub right: String,
5821}
5822
5823impl Default for JoinSuffix {
5824 fn default() -> Self {
5825 Self { left: ".x".into(), right: ".y".into() }
5826 }
5827}
5828
5829impl JoinSuffix {
5830 pub fn new(left: impl Into<String>, right: impl Into<String>) -> Self {
5832 Self { left: left.into(), right: right.into() }
5833 }
5834}
5835
5836fn join_types_compatible(left: &Column, right: &Column) -> bool {
5841 match (left, right) {
5842 (Column::Int(_), Column::Int(_)) => true,
5843 (Column::Float(_), Column::Float(_)) => true,
5844 (Column::Int(_), Column::Float(_)) => true,
5845 (Column::Float(_), Column::Int(_)) => true,
5846 (Column::Str(_), Column::Str(_)) => true,
5847 (Column::Bool(_), Column::Bool(_)) => true,
5848 _ => false,
5849 }
5850}
5851
5852impl TidyView {
5855
5856 pub fn pivot_longer(
5874 &self,
5875 value_cols: &[&str],
5876 names_to: &str,
5877 values_to: &str,
5878 ) -> Result<TidyFrame, TidyError> {
5879 if value_cols.is_empty() {
5880 return Err(TidyError::empty_selection("pivot_longer requires at least one value_col"));
5881 }
5882
5883 let mut seen_vc: Vec<&str> = Vec::new();
5885 let mut vc_indices: Vec<usize> = Vec::new();
5886 for &name in value_cols {
5887 if seen_vc.contains(&name) {
5888 return Err(TidyError::DuplicateColumn(name.to_string()));
5889 }
5890 seen_vc.push(name);
5891 let idx = self.base.columns.iter().position(|(n, _)| n == name)
5892 .ok_or_else(|| TidyError::ColumnNotFound(name.to_string()))?;
5893 vc_indices.push(idx);
5894 }
5895
5896 let first_type = self.base.columns[vc_indices[0]].1.type_name();
5898 for &idx in &vc_indices[1..] {
5899 let t = self.base.columns[idx].1.type_name();
5900 if t != first_type {
5901 return Err(TidyError::TypeMismatch {
5902 expected: first_type.to_string(),
5903 got: t.to_string(),
5904 });
5905 }
5906 }
5907
5908 let vc_set: std::collections::BTreeSet<usize> = vc_indices.iter().copied().collect();
5910 let id_col_indices: Vec<usize> = self.proj.indices().iter()
5911 .copied()
5912 .filter(|i| !vc_set.contains(i))
5913 .collect();
5914
5915 let visible_rows: Vec<usize> = self.mask.iter_indices().collect();
5916 let n_out = visible_rows.len() * value_cols.len();
5917
5918 let mut out_cols: Vec<(String, Column)> = Vec::new();
5920 for &id_idx in &id_col_indices {
5921 let (name, col_orig) = &self.base.columns[id_idx];
5922 let legacy_owned;
5923 let col: &Column = if matches!(col_orig, Column::CategoricalAdaptive(_)) {
5924 legacy_owned = col_orig.to_legacy_categorical();
5925 &legacy_owned
5926 } else {
5927 col_orig
5928 };
5929 let new_col = match col {
5930 Column::Int(v) => {
5931 let mut out = Vec::with_capacity(n_out);
5932 for &r in &visible_rows {
5933 for _ in 0..value_cols.len() { out.push(v[r]); }
5934 }
5935 Column::Int(out)
5936 }
5937 Column::Float(v) => {
5938 let mut out = Vec::with_capacity(n_out);
5939 for &r in &visible_rows {
5940 for _ in 0..value_cols.len() { out.push(v[r]); }
5941 }
5942 Column::Float(out)
5943 }
5944 Column::Str(v) => {
5945 let mut out = Vec::with_capacity(n_out);
5946 for &r in &visible_rows {
5947 for _ in 0..value_cols.len() { out.push(v[r].clone()); }
5948 }
5949 Column::Str(out)
5950 }
5951 Column::Bool(v) => {
5952 let mut out = Vec::with_capacity(n_out);
5953 for &r in &visible_rows {
5954 for _ in 0..value_cols.len() { out.push(v[r]); }
5955 }
5956 Column::Bool(out)
5957 }
5958 Column::Categorical { levels, codes } => {
5959 let mut out = Vec::with_capacity(n_out);
5960 for &r in &visible_rows {
5961 for _ in 0..value_cols.len() { out.push(codes[r]); }
5962 }
5963 Column::Categorical { levels: levels.clone(), codes: out }
5964 }
5965 Column::DateTime(v) => {
5966 let mut out = Vec::with_capacity(n_out);
5967 for &r in &visible_rows {
5968 for _ in 0..value_cols.len() { out.push(v[r]); }
5969 }
5970 Column::DateTime(out)
5971 }
5972 Column::CategoricalAdaptive(_) => unreachable!("converted via legacy_owned"),
5973 };
5974 out_cols.push((name.clone(), new_col));
5975 }
5976
5977 let names_col: Vec<String> = visible_rows.iter()
5979 .flat_map(|_| value_cols.iter().map(|s| s.to_string()))
5980 .collect();
5981 out_cols.push((names_to.to_string(), Column::Str(names_col)));
5982
5983 match &self.base.columns[vc_indices[0]].1 {
5985 Column::Int(_) => {
5986 let mut vals: Vec<i64> = Vec::with_capacity(n_out);
5987 for &r in &visible_rows {
5988 for &vci in &vc_indices {
5989 if let Column::Int(v) = &self.base.columns[vci].1 {
5990 vals.push(v[r]);
5991 }
5992 }
5993 }
5994 out_cols.push((values_to.to_string(), Column::Int(vals)));
5995 }
5996 Column::Float(_) => {
5997 let mut vals: Vec<f64> = Vec::with_capacity(n_out);
5998 for &r in &visible_rows {
5999 for &vci in &vc_indices {
6000 if let Column::Float(v) = &self.base.columns[vci].1 {
6001 vals.push(v[r]);
6002 }
6003 }
6004 }
6005 out_cols.push((values_to.to_string(), Column::Float(vals)));
6006 }
6007 Column::Str(_) => {
6008 let mut vals: Vec<String> = Vec::with_capacity(n_out);
6009 for &r in &visible_rows {
6010 for &vci in &vc_indices {
6011 if let Column::Str(v) = &self.base.columns[vci].1 {
6012 vals.push(v[r].clone());
6013 }
6014 }
6015 }
6016 out_cols.push((values_to.to_string(), Column::Str(vals)));
6017 }
6018 Column::Bool(_) => {
6019 let mut vals: Vec<bool> = Vec::with_capacity(n_out);
6020 for &r in &visible_rows {
6021 for &vci in &vc_indices {
6022 if let Column::Bool(v) = &self.base.columns[vci].1 {
6023 vals.push(v[r]);
6024 }
6025 }
6026 }
6027 out_cols.push((values_to.to_string(), Column::Bool(vals)));
6028 }
6029 Column::Categorical { .. } | Column::CategoricalAdaptive(_) | Column::DateTime(_) => {
6030 let mut vals: Vec<String> = Vec::with_capacity(n_out);
6032 for &r in &visible_rows {
6033 for &vci in &vc_indices {
6034 vals.push(self.base.columns[vci].1.get_display(r));
6035 }
6036 }
6037 out_cols.push((values_to.to_string(), Column::Str(vals)));
6038 }
6039 }
6040
6041 let df = DataFrame::from_columns(out_cols)
6042 .map_err(|e| TidyError::Internal(e.to_string()))?;
6043 Ok(TidyFrame::from_df(df))
6044 }
6045
6046 pub fn pivot_wider(
6063 &self,
6064 id_cols: &[&str],
6065 names_from: &str,
6066 values_from: &str,
6067 ) -> Result<NullableFrame, TidyError> {
6068 let _names_col_idx = self.base.columns.iter().position(|(n, _)| n == names_from)
6070 .ok_or_else(|| TidyError::ColumnNotFound(names_from.to_string()))?;
6071 let _values_col_idx = self.base.columns.iter().position(|(n, _)| n == values_from)
6072 .ok_or_else(|| TidyError::ColumnNotFound(values_from.to_string()))?;
6073 for &id in id_cols {
6074 let _ = self.base.columns.iter().position(|(n, _)| n == id)
6075 .ok_or_else(|| TidyError::ColumnNotFound(id.to_string()))?;
6076 }
6077
6078 let visible_rows: Vec<usize> = self.mask.iter_indices().collect();
6079
6080 let mut key_values: Vec<String> = Vec::new();
6082 for &r in &visible_rows {
6083 let kv = self.base.get_column(names_from).unwrap().get_display(r);
6084 if !key_values.contains(&kv) {
6085 key_values.push(kv);
6086 }
6087 }
6088
6089 let id_col_refs: Vec<&Column> = id_cols.iter()
6092 .map(|&name| self.base.get_column(name).unwrap())
6093 .collect();
6094
6095 let mut id_order: Vec<Vec<String>> = Vec::new(); let mut id_to_slot: Vec<(Vec<String>, usize)> = Vec::new(); for &r in &visible_rows {
6099 let id_key: Vec<String> = id_col_refs.iter()
6100 .map(|col| col.get_display(r))
6101 .collect();
6102 if !id_to_slot.iter().any(|(k, _)| k == &id_key) {
6103 let slot = id_order.len();
6104 id_order.push(id_key.clone());
6105 id_to_slot.push((id_key, slot));
6106 }
6107 }
6108
6109 let n_rows = id_order.len();
6110 let n_keys = key_values.len();
6111
6112 let mut cell_map: Vec<Vec<Option<usize>>> = vec![vec![None; n_keys]; n_rows];
6115
6116 for &r in &visible_rows {
6117 let id_key: Vec<String> = id_col_refs.iter()
6118 .map(|col| col.get_display(r))
6119 .collect();
6120 let id_slot = id_to_slot.iter().find(|(k, _)| k == &id_key).unwrap().1;
6121
6122 let kv = self.base.get_column(names_from).unwrap().get_display(r);
6123 let key_slot = key_values.iter().position(|v| v == &kv).unwrap();
6124
6125 if cell_map[id_slot][key_slot].is_some() {
6126 return Err(TidyError::duplicate_key(
6127 format!("({}, {})", id_key.join(", "), kv)
6128 ));
6129 }
6130 cell_map[id_slot][key_slot] = Some(r);
6131 }
6132
6133 let mut out_cols: Vec<(String, NullCol)> = Vec::new();
6135
6136 for (id_idx, &id_name) in id_cols.iter().enumerate() {
6138 let id_col = self.base.get_column(id_name).unwrap();
6139 let id_row_indices: Vec<usize> = id_order.iter()
6140 .map(|id_tup| {
6141 *visible_rows.iter().find(|&&r| {
6143 id_col_refs.iter().enumerate().all(|(i, col)| {
6144 col.get_display(r) == id_tup[i]
6145 })
6146 }).unwrap()
6147 })
6148 .collect();
6149 let gathered = gather_column(id_col, &id_row_indices);
6150 out_cols.push((id_name.to_string(), NullCol::from_column(&gathered)));
6151 let _ = id_idx;
6152 }
6153
6154 let values_col = self.base.get_column(values_from).unwrap();
6156 let val_type = values_col.type_name();
6157 for (key_slot, key_val) in key_values.iter().enumerate() {
6158 let row_opts: Vec<Option<usize>> = (0..n_rows)
6159 .map(|id_slot| cell_map[id_slot][key_slot])
6160 .collect();
6161 let null_col = gather_column_nullable_null(values_col, &row_opts);
6162 out_cols.push((key_val.clone(), null_col));
6163 let _ = val_type;
6164 }
6165
6166 Ok(NullableFrame { columns: out_cols })
6167 }
6168
6169 pub fn rename(&self, renames: &[(&str, &str)]) -> Result<TidyView, TidyError> {
6180 let mut rename_map: Vec<(usize, String)> = Vec::new();
6182 let col_names: Vec<&str> = self.base.columns.iter().map(|(n, _)| n.as_str()).collect();
6183
6184 for &(old, new) in renames {
6185 let idx = col_names.iter().position(|&n| n == old)
6186 .ok_or_else(|| TidyError::ColumnNotFound(old.to_string()))?;
6187 if old != new {
6189 let new_name_exists = col_names.iter().any(|&n| n == new)
6190 || rename_map.iter().any(|(_, n)| n == new);
6191 if new_name_exists {
6192 return Err(TidyError::DuplicateColumn(new.to_string()));
6193 }
6194 }
6195 rename_map.push((idx, new.to_string()));
6196 }
6197
6198 let mut new_cols: Vec<(String, Column)> = Vec::new();
6200 for (i, (name, col)) in self.base.columns.iter().enumerate() {
6201 let new_name = rename_map.iter()
6202 .find(|(idx, _)| *idx == i)
6203 .map(|(_, n)| n.clone())
6204 .unwrap_or_else(|| name.clone());
6205 new_cols.push((new_name, col.clone()));
6206 }
6207
6208 let new_base = DataFrame { columns: new_cols };
6209 Ok(TidyView {
6210 base: Rc::new(new_base),
6211 mask: self.mask.clone(),
6212 proj: self.proj.clone(),
6213 })
6214 }
6215
6216 pub fn relocate(&self, cols: &[&str], position: RelocatePos<'_>) -> Result<TidyView, TidyError> {
6231 let proj_names: Vec<&str> = self.column_names();
6233 for &name in cols {
6234 if !proj_names.contains(&name) {
6235 return Err(TidyError::ColumnNotFound(name.to_string()));
6236 }
6237 }
6238
6239 let moved_set: std::collections::BTreeSet<&str> = cols.iter().copied().collect();
6241 let remaining: Vec<&str> = proj_names.iter()
6242 .copied()
6243 .filter(|n| !moved_set.contains(n))
6244 .collect();
6245
6246 let new_order: Vec<&str> = match &position {
6247 RelocatePos::Front => {
6248 let mut v: Vec<&str> = cols.to_vec();
6249 v.extend_from_slice(&remaining);
6250 v
6251 }
6252 RelocatePos::Back => {
6253 let mut v = remaining.clone();
6254 v.extend_from_slice(cols);
6255 v
6256 }
6257 RelocatePos::Before(anchor) => {
6258 if !proj_names.contains(anchor) {
6259 return Err(TidyError::ColumnNotFound(anchor.to_string()));
6260 }
6261 let mut v = Vec::new();
6262 for &n in &remaining {
6263 if n == *anchor {
6264 v.extend_from_slice(cols);
6265 }
6266 v.push(n);
6267 }
6268 v
6269 }
6270 RelocatePos::After(anchor) => {
6271 if !proj_names.contains(anchor) {
6272 return Err(TidyError::ColumnNotFound(anchor.to_string()));
6273 }
6274 let mut v = Vec::new();
6275 for &n in &remaining {
6276 v.push(n);
6277 if n == *anchor {
6278 v.extend_from_slice(cols);
6279 }
6280 }
6281 v
6282 }
6283 };
6284
6285 let new_indices: Vec<usize> = new_order.iter()
6287 .map(|&name| {
6288 self.base.columns.iter().position(|(n, _)| n == name).unwrap()
6289 })
6290 .collect();
6291
6292 Ok(TidyView {
6293 base: Rc::clone(&self.base),
6294 mask: self.mask.clone(),
6295 proj: ProjectionMap::from_indices(new_indices),
6296 })
6297 }
6298
6299 pub fn drop_cols(&self, cols: &[&str]) -> Result<TidyView, TidyError> {
6309 let proj_names = self.column_names();
6310 for &name in cols {
6311 if !proj_names.contains(&name) {
6312 return Err(TidyError::ColumnNotFound(name.to_string()));
6313 }
6314 }
6315 let drop_set: std::collections::BTreeSet<&str> = cols.iter().copied().collect();
6316 let keep: Vec<&str> = proj_names.iter()
6317 .copied()
6318 .filter(|n| !drop_set.contains(n))
6319 .collect();
6320 self.select(&keep)
6321 }
6322
6323 pub fn bind_rows(&self, other: &TidyView) -> Result<TidyFrame, TidyError> {
6334 let self_names = self.column_names();
6335 let other_names = other.column_names();
6336
6337 if self_names != other_names {
6338 return Err(TidyError::schema_mismatch(format!(
6339 "left has {:?}, right has {:?}",
6340 self_names, other_names
6341 )));
6342 }
6343
6344 let self_rows: Vec<usize> = self.mask.iter_indices().collect();
6345 let other_rows: Vec<usize> = other.mask.iter_indices().collect();
6346
6347 let mut out_cols: Vec<(String, Column)> = Vec::new();
6348 for &ci in self.proj.indices() {
6349 let (name, self_col) = &self.base.columns[ci];
6350 let other_ci = other.proj.indices().iter().copied()
6352 .find(|&i| other.base.columns[i].0 == *name)
6353 .ok_or_else(|| TidyError::ColumnNotFound(name.clone()))?;
6354 let other_col = &other.base.columns[other_ci].1;
6355
6356 let col = concat_columns(self_col, &self_rows, other_col, &other_rows)?;
6357 out_cols.push((name.clone(), col));
6358 }
6359
6360 let df = DataFrame::from_columns(out_cols)
6361 .map_err(|e| TidyError::Internal(e.to_string()))?;
6362 Ok(TidyFrame::from_df(df))
6363 }
6364
6365 pub fn bind_cols(&self, other: &TidyView) -> Result<TidyFrame, TidyError> {
6376 let self_nrows = self.nrows();
6377 let other_nrows = other.nrows();
6378
6379 if self_nrows != other_nrows {
6380 return Err(TidyError::LengthMismatch {
6381 expected: self_nrows,
6382 got: other_nrows,
6383 });
6384 }
6385
6386 let self_names = self.column_names();
6387 let other_names = other.column_names();
6388 for name in &other_names {
6389 if self_names.contains(name) {
6390 return Err(TidyError::DuplicateColumn(name.to_string()));
6391 }
6392 }
6393
6394 let self_rows: Vec<usize> = self.mask.iter_indices().collect();
6395 let other_rows: Vec<usize> = other.mask.iter_indices().collect();
6396
6397 let mut out_cols: Vec<(String, Column)> = Vec::new();
6398
6399 for &ci in self.proj.indices() {
6400 let (name, col) = &self.base.columns[ci];
6401 out_cols.push((name.clone(), gather_column(col, &self_rows)));
6402 }
6403 for &ci in other.proj.indices() {
6404 let (name, col) = &other.base.columns[ci];
6405 out_cols.push((name.clone(), gather_column(col, &other_rows)));
6406 }
6407
6408 let df = DataFrame::from_columns(out_cols)
6409 .map_err(|e| TidyError::Internal(e.to_string()))?;
6410 Ok(TidyFrame::from_df(df))
6411 }
6412
6413 pub fn mutate_across(&self, specs: &[AcrossSpec]) -> Result<TidyFrame, TidyError> {
6423 let base_df = self.materialize()?;
6425
6426 let mut output_names: Vec<String> = base_df.column_names()
6428 .into_iter().map(|s| s.to_string()).collect();
6429 let mut extra_cols: Vec<(String, Column)> = Vec::new();
6430
6431 for spec in specs {
6432 for col_name in &spec.cols {
6433 let out_name = spec.output_name(col_name);
6434 if output_names.contains(&out_name) && !base_df.column_names().contains(&out_name.as_str()) {
6436 return Err(TidyError::DuplicateColumn(out_name));
6437 }
6438 let col = base_df.get_column(col_name)
6439 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
6440 let new_col = (spec.transform.func)(col_name, col)?;
6441 if !base_df.column_names().contains(&out_name.as_str()) {
6443 output_names.push(out_name.clone());
6444 }
6445 extra_cols.push((out_name, new_col));
6446 }
6447 }
6448
6449 let mut col_map: indexmap_simple::IndexMap = indexmap_simple::IndexMap::from_df(&base_df);
6451 for (name, col) in extra_cols {
6452 col_map.insert(name, col);
6453 }
6454 let df = col_map.into_df()
6455 .map_err(|e| TidyError::Internal(e.to_string()))?;
6456 Ok(TidyFrame::from_df(df))
6457 }
6458
6459 pub fn right_join(
6467 &self,
6468 right: &TidyView,
6469 on: &[(&str, &str)],
6470 suffix: &JoinSuffix,
6471 ) -> Result<NullableFrame, TidyError> {
6472 validate_join_key_types(self, right, on)?;
6474 let swapped_on: Vec<(&str, &str)> = on.iter().map(|&(l, r)| (r, l)).collect();
6476 let (right_rows, left_rows_opt) =
6477 join_match_rows_optional(right, self, &swapped_on, JoinKind::Left)?;
6478 build_right_join_frame(self, right, &left_rows_opt, &right_rows, on, suffix)
6479 }
6480
6481 pub fn full_join(
6487 &self,
6488 right: &TidyView,
6489 on: &[(&str, &str)],
6490 suffix: &JoinSuffix,
6491 ) -> Result<NullableFrame, TidyError> {
6492 validate_join_key_types(self, right, on)?;
6493 build_full_join_frame(self, right, on, suffix)
6494 }
6495
6496 pub fn inner_join_typed(
6504 &self,
6505 right: &TidyView,
6506 on: &[(&str, &str)],
6507 suffix: &JoinSuffix,
6508 ) -> Result<TidyFrame, TidyError> {
6509 validate_join_key_types(self, right, on)?;
6510 let (left_rows, right_rows) = join_match_rows(self, right, on, JoinKind::Inner)?;
6511 build_join_frame_with_suffix(self, right, &left_rows, &right_rows, on, suffix, false)
6512 }
6513
6514 pub fn left_join_typed(
6518 &self,
6519 right: &TidyView,
6520 on: &[(&str, &str)],
6521 suffix: &JoinSuffix,
6522 ) -> Result<TidyFrame, TidyError> {
6523 validate_join_key_types(self, right, on)?;
6524 let (left_rows, right_rows_opt) =
6525 join_match_rows_optional(self, right, on, JoinKind::Left)?;
6526 build_left_join_frame_with_suffix(self, right, &left_rows, &right_rows_opt, on, suffix)
6527 }
6528}
6529
6530pub enum RelocatePos<'a> {
6534 Front,
6536 Back,
6538 Before(&'a str),
6540 After(&'a str),
6542}
6543
6544fn concat_columns(
6547 left: &Column,
6548 left_rows: &[usize],
6549 right: &Column,
6550 right_rows: &[usize],
6551) -> Result<Column, TidyError> {
6552 match (left, right) {
6553 (Column::Int(lv), Column::Int(rv)) => {
6554 let mut out: Vec<i64> = left_rows.iter().map(|&i| lv[i]).collect();
6555 out.extend(right_rows.iter().map(|&i| rv[i]));
6556 Ok(Column::Int(out))
6557 }
6558 (Column::Float(lv), Column::Float(rv)) => {
6559 let mut out: Vec<f64> = left_rows.iter().map(|&i| lv[i]).collect();
6560 out.extend(right_rows.iter().map(|&i| rv[i]));
6561 Ok(Column::Float(out))
6562 }
6563 (Column::Int(lv), Column::Float(rv)) => {
6564 let mut out: Vec<f64> = left_rows.iter().map(|&i| lv[i] as f64).collect();
6565 out.extend(right_rows.iter().map(|&i| rv[i]));
6566 Ok(Column::Float(out))
6567 }
6568 (Column::Float(lv), Column::Int(rv)) => {
6569 let mut out: Vec<f64> = left_rows.iter().map(|&i| lv[i]).collect();
6570 out.extend(right_rows.iter().map(|&i| rv[i] as f64));
6571 Ok(Column::Float(out))
6572 }
6573 (Column::Str(lv), Column::Str(rv)) => {
6574 let mut out: Vec<String> = left_rows.iter().map(|&i| lv[i].clone()).collect();
6575 out.extend(right_rows.iter().map(|&i| rv[i].clone()));
6576 Ok(Column::Str(out))
6577 }
6578 (Column::Bool(lv), Column::Bool(rv)) => {
6579 let mut out: Vec<bool> = left_rows.iter().map(|&i| lv[i]).collect();
6580 out.extend(right_rows.iter().map(|&i| rv[i]));
6581 Ok(Column::Bool(out))
6582 }
6583 _ => Err(TidyError::schema_mismatch(format!(
6584 "type mismatch in bind_rows: {} vs {}",
6585 left.type_name(), right.type_name()
6586 ))),
6587 }
6588}
6589
6590fn validate_join_key_types(
6593 left: &TidyView,
6594 right: &TidyView,
6595 on: &[(&str, &str)],
6596) -> Result<(), TidyError> {
6597 for &(lk, rk) in on {
6598 let l_col = left.base.get_column(lk)
6599 .ok_or_else(|| TidyError::ColumnNotFound(lk.to_string()))?;
6600 let r_col = right.base.get_column(rk)
6601 .ok_or_else(|| TidyError::ColumnNotFound(rk.to_string()))?;
6602 if !join_types_compatible(l_col, r_col) {
6603 return Err(TidyError::join_type_mismatch(lk, l_col.type_name(), r_col.type_name()));
6604 }
6605 }
6606 Ok(())
6607}
6608
6609fn build_join_frame_with_suffix(
6612 left: &TidyView,
6613 right: &TidyView,
6614 left_rows: &[usize],
6615 right_rows: &[usize],
6616 on: &[(&str, &str)],
6617 suffix: &JoinSuffix,
6618 _include_unmatched: bool,
6619) -> Result<TidyFrame, TidyError> {
6620 let right_key_names: std::collections::BTreeSet<&str> =
6621 on.iter().map(|(_, rk)| *rk).collect();
6622
6623 let left_col_names: Vec<String> = left.proj.indices().iter()
6625 .map(|&ci| left.base.columns[ci].0.clone())
6626 .collect();
6627
6628 let mut columns: Vec<(String, Column)> = Vec::new();
6629
6630 for &ci in left.proj.indices() {
6632 let (name, col) = &left.base.columns[ci];
6633 columns.push((name.clone(), gather_column(col, left_rows)));
6634 }
6635
6636 for &ci in right.proj.indices() {
6638 let (name, col) = &right.base.columns[ci];
6639 if right_key_names.contains(name.as_str()) {
6640 continue; }
6642 let out_name = if left_col_names.contains(name) {
6643 format!("{}{}", name, suffix.right)
6644 } else {
6645 name.clone()
6646 };
6647 if left_col_names.contains(name) {
6649 let left_pos = columns.iter().position(|(n, _)| n == name);
6651 if let Some(pos) = left_pos {
6652 let entry = &mut columns[pos];
6653 entry.0 = format!("{}{}", entry.0, suffix.left);
6654 }
6655 }
6656 columns.push((out_name, gather_column(col, right_rows)));
6657 }
6658
6659 let df = DataFrame::from_columns(columns)
6660 .map_err(|e| TidyError::Internal(e.to_string()))?;
6661 Ok(TidyFrame::from_df(df))
6662}
6663
6664fn build_left_join_frame_with_suffix(
6665 left: &TidyView,
6666 right: &TidyView,
6667 left_rows: &[usize],
6668 right_rows_opt: &[Option<usize>],
6669 on: &[(&str, &str)],
6670 suffix: &JoinSuffix,
6671) -> Result<TidyFrame, TidyError> {
6672 let right_key_names: std::collections::BTreeSet<&str> =
6673 on.iter().map(|(_, rk)| *rk).collect();
6674
6675 let left_col_names: Vec<String> = left.proj.indices().iter()
6676 .map(|&ci| left.base.columns[ci].0.clone())
6677 .collect();
6678
6679 let mut columns: Vec<(String, Column)> = Vec::new();
6680
6681 for &ci in left.proj.indices() {
6683 let (name, col) = &left.base.columns[ci];
6684 columns.push((name.clone(), gather_column(col, left_rows)));
6685 }
6686
6687 for &ci in right.proj.indices() {
6689 let (name, col) = &right.base.columns[ci];
6690 if right_key_names.contains(name.as_str()) { continue; }
6691 let out_name = if left_col_names.contains(name) {
6692 let left_pos = columns.iter().position(|(n, _)| n == name);
6694 if let Some(pos) = left_pos {
6695 columns[pos].0 = format!("{}{}", name, suffix.left);
6696 }
6697 format!("{}{}", name, suffix.right)
6698 } else {
6699 name.clone()
6700 };
6701 let new_col = gather_column_nullable(col, right_rows_opt);
6702 columns.push((out_name, new_col));
6703 }
6704
6705 let df = DataFrame::from_columns(columns)
6706 .map_err(|e| TidyError::Internal(e.to_string()))?;
6707 Ok(TidyFrame::from_df(df))
6708}
6709
6710fn build_right_join_frame(
6711 left: &TidyView,
6712 right: &TidyView,
6713 left_rows_opt: &[Option<usize>],
6714 right_rows: &[usize],
6715 on: &[(&str, &str)],
6716 suffix: &JoinSuffix,
6717) -> Result<NullableFrame, TidyError> {
6718 let right_key_names: std::collections::BTreeSet<&str> =
6719 on.iter().map(|(_, rk)| *rk).collect();
6720 let left_key_names: std::collections::BTreeSet<&str> =
6721 on.iter().map(|(lk, _)| *lk).collect();
6722
6723 let right_col_names: Vec<String> = right.proj.indices().iter()
6724 .map(|&ci| right.base.columns[ci].0.clone())
6725 .collect();
6726
6727 let mut columns: Vec<(String, NullCol)> = Vec::new();
6728
6729 for &ci in left.proj.indices() {
6731 let (name, col) = &left.base.columns[ci];
6732 if left_key_names.contains(name.as_str()) { continue; }
6733 let out_name = if right_col_names.contains(name) {
6734 format!("{}{}", name, suffix.left)
6735 } else {
6736 name.clone()
6737 };
6738 let null_col = gather_column_nullable_null(col, left_rows_opt);
6739 columns.push((out_name, null_col));
6740 }
6741
6742 for &ci in right.proj.indices() {
6744 let (name, col) = &right.base.columns[ci];
6745 let out_name = if !right_key_names.contains(name.as_str())
6746 && left.proj.indices().iter().any(|&lci| left.base.columns[lci].0 == *name)
6747 && !left_key_names.contains(name.as_str())
6748 {
6749 format!("{}{}", name, suffix.right)
6750 } else {
6751 name.clone()
6752 };
6753 columns.push((out_name, NullCol::from_column(&gather_column(col, right_rows))));
6754 }
6755
6756 Ok(NullableFrame { columns })
6757}
6758
6759fn build_full_join_frame(
6760 left: &TidyView,
6761 right: &TidyView,
6762 on: &[(&str, &str)],
6763 suffix: &JoinSuffix,
6764) -> Result<NullableFrame, TidyError> {
6765 let (left_key_cols, right_key_cols) = resolve_join_keys(left, right, on)?;
6766 let lookup = build_right_lookup(right, &right_key_cols);
6767
6768 let mut out_left_rows: Vec<usize> = Vec::new();
6770 let mut out_right_rows: Vec<Option<usize>> = Vec::new();
6771 let mut right_matched: Vec<bool> = vec![false; right.base.nrows()];
6772
6773 for l_row in left.mask.iter_indices() {
6774 let key = row_key(&left.base, &left_key_cols, l_row);
6775 let matches = find_matches(&lookup, &key);
6776 if matches.is_empty() {
6777 out_left_rows.push(l_row);
6778 out_right_rows.push(None);
6779 } else {
6780 for r_row in &matches {
6781 out_left_rows.push(l_row);
6782 out_right_rows.push(Some(*r_row));
6783 if *r_row < right_matched.len() {
6784 right_matched[*r_row] = true;
6785 }
6786 }
6787 }
6788 }
6789
6790 let mut unmatched_right: Vec<usize> = Vec::new();
6792 for r_row in right.mask.iter_indices() {
6793 if r_row < right_matched.len() && !right_matched[r_row] {
6794 unmatched_right.push(r_row);
6795 }
6796 }
6797
6798 let right_key_names: std::collections::BTreeSet<&str> =
6799 on.iter().map(|(_, rk)| *rk).collect();
6800 let left_key_names: std::collections::BTreeSet<&str> =
6801 on.iter().map(|(lk, _)| *lk).collect();
6802 let right_col_names: Vec<String> = right.proj.indices().iter()
6803 .map(|&ci| right.base.columns[ci].0.clone())
6804 .collect();
6805
6806 let n_matched = out_left_rows.len();
6807 let n_unmatched_r = unmatched_right.len();
6808 let total = n_matched + n_unmatched_r;
6809
6810 let mut columns: Vec<(String, NullCol)> = Vec::new();
6811
6812 for &ci in left.proj.indices() {
6814 let (name, col) = &left.base.columns[ci];
6815 let out_name = if right_col_names.contains(name) && !left_key_names.contains(name.as_str()) {
6816 format!("{}{}", name, suffix.left)
6817 } else {
6818 name.clone()
6819 };
6820 let mut matched_vals: Vec<Option<usize>> = out_left_rows.iter()
6821 .map(|&r| Some(r))
6822 .collect();
6823 matched_vals.extend(std::iter::repeat(None).take(n_unmatched_r));
6825 assert_eq!(matched_vals.len(), total);
6826 columns.push((out_name, gather_column_nullable_null(col, &matched_vals)));
6827 }
6828
6829 for &ci in right.proj.indices() {
6831 let (name, col) = &right.base.columns[ci];
6832 if right_key_names.contains(name.as_str()) { continue; }
6833 let out_name = if left.proj.indices().iter().any(|&lci| left.base.columns[lci].0 == *name)
6834 && !left_key_names.contains(name.as_str())
6835 {
6836 format!("{}{}", name, suffix.right)
6837 } else {
6838 name.clone()
6839 };
6840
6841 let mut row_opts: Vec<Option<usize>> = out_right_rows.clone();
6842 row_opts.extend(unmatched_right.iter().map(|&r| Some(r)));
6844 assert_eq!(row_opts.len(), total);
6845 columns.push((out_name, gather_column_nullable_null(col, &row_opts)));
6846 }
6847
6848 Ok(NullableFrame { columns })
6854}
6855
6856impl GroupedTidyView {
6859
6860 pub fn mutate_across(&self, specs: &[AcrossSpec]) -> Result<TidyFrame, TidyError> {
6866 self.view.mutate_across(specs)
6869 }
6870
6871 pub fn summarise_across(&self, specs: &[AcrossSpec]) -> Result<TidyFrame, TidyError> {
6878 let n_groups = self.ngroups();
6879
6880 let key_names = &self.index.key_names;
6882 let mut out_cols: Vec<(String, Column)> = Vec::new();
6883
6884 for ki in 0..key_names.len() {
6886 let col_vals: Vec<String> = self.index.groups.iter()
6887 .map(|g| g.key_values[ki].clone())
6888 .collect();
6889 out_cols.push((key_names[ki].clone(), Column::Str(col_vals)));
6890 }
6891
6892 for spec in specs {
6894 for col_name in &spec.cols {
6895 let out_name = spec.output_name(col_name);
6896 if out_cols.iter().any(|(n, _)| n == &out_name) {
6898 return Err(TidyError::DuplicateColumn(out_name));
6899 }
6900
6901 let base_col = self.view.base.get_column(col_name)
6902 .ok_or_else(|| TidyError::ColumnNotFound(col_name.clone()))?;
6903
6904 let mut agg_floats: Vec<f64> = Vec::with_capacity(n_groups);
6906 for group in &self.index.groups {
6907 let group_col = gather_column(base_col, &group.row_indices);
6908 let result_col = (spec.transform.func)(col_name, &group_col)?;
6909 if result_col.len() != 1 {
6910 return Err(TidyError::LengthMismatch {
6911 expected: 1,
6912 got: result_col.len(),
6913 });
6914 }
6915 let v = match &result_col {
6916 Column::Float(v) => v[0],
6917 Column::Int(v) => v[0] as f64,
6918 _ => return Err(TidyError::TypeMismatch {
6919 expected: "Float or Int".into(),
6920 got: result_col.type_name().into(),
6921 }),
6922 };
6923 agg_floats.push(v);
6924 }
6925 out_cols.push((out_name, Column::Float(agg_floats)));
6926 }
6927 }
6928
6929 let df = DataFrame::from_columns(out_cols)
6930 .map_err(|e| TidyError::Internal(e.to_string()))?;
6931 Ok(TidyFrame::from_df(df))
6932 }
6933}
6934
6935mod indexmap_simple {
6940 use super::{Column, DataFrame, DataError};
6941
6942 pub struct IndexMap {
6943 entries: Vec<(String, Column)>,
6944 }
6945
6946 impl IndexMap {
6947 pub fn from_df(df: &DataFrame) -> Self {
6948 Self {
6949 entries: df.columns.iter()
6950 .map(|(n, c)| (n.clone(), c.clone()))
6951 .collect(),
6952 }
6953 }
6954
6955 pub fn insert(&mut self, name: String, col: Column) {
6957 if let Some(pos) = self.entries.iter().position(|(n, _)| n == &name) {
6958 self.entries[pos] = (name, col);
6959 } else {
6960 self.entries.push((name, col));
6961 }
6962 }
6963
6964 pub fn into_df(self) -> Result<DataFrame, DataError> {
6965 DataFrame::from_columns(self.entries)
6966 }
6967 }
6968}
6969
6970impl GroupIndex {
6987 pub fn build_fast<I: IntoIterator<Item = usize>>(
7003 base: &DataFrame,
7004 key_col_indices: &[usize],
7005 visible_rows: I,
7006 key_names: Vec<String>,
7007 ) -> Self {
7008 use std::collections::BTreeMap;
7009
7010 if let Some(cat_keys) = collect_categorical_keys(base, key_col_indices) {
7013 return build_groupindex_categorical(cat_keys, visible_rows, key_names);
7014 }
7015
7016 let mut groups: Vec<GroupMeta> = Vec::new();
7017 let mut key_to_slot: BTreeMap<Vec<String>, usize> = BTreeMap::new();
7018
7019 for row in visible_rows {
7020 let key: Vec<String> = key_col_indices.iter()
7021 .map(|&ci| base.columns[ci].1.get_display(row))
7022 .collect();
7023
7024 if let Some(&slot) = key_to_slot.get(&key) {
7025 groups[slot].row_indices.push(row);
7026 } else {
7027 let slot = groups.len();
7028 let key_values = key.clone();
7029 key_to_slot.insert(key, slot);
7030 groups.push(GroupMeta { key_values, row_indices: vec![row] });
7031 }
7032 }
7033
7034 GroupIndex { groups, key_names }
7035 }
7036}
7037
7038pub(crate) struct CategoricalKeys<'a> {
7057 pub(crate) levels: Vec<&'a [String]>,
7059 pub(crate) codes: Vec<&'a [u32]>,
7061}
7062
7063pub(crate) fn collect_categorical_keys<'a>(
7067 base: &'a DataFrame,
7068 key_col_indices: &[usize],
7069) -> Option<CategoricalKeys<'a>> {
7070 if key_col_indices.is_empty() {
7071 return None;
7072 }
7073 let mut levels: Vec<&[String]> = Vec::with_capacity(key_col_indices.len());
7074 let mut codes: Vec<&[u32]> = Vec::with_capacity(key_col_indices.len());
7075 for &ci in key_col_indices {
7076 match &base.columns[ci].1 {
7077 Column::Categorical { levels: l, codes: c } => {
7078 levels.push(l.as_slice());
7079 codes.push(c.as_slice());
7080 }
7081 _ => return None,
7082 }
7083 }
7084 Some(CategoricalKeys { levels, codes })
7085}
7086
7087fn build_groupindex_categorical<I: IntoIterator<Item = usize>>(
7090 cat: CategoricalKeys<'_>,
7091 visible_rows: I,
7092 key_names: Vec<String>,
7093) -> GroupIndex {
7094 use std::collections::BTreeMap;
7095 let nkeys = cat.codes.len();
7096 let mut groups: Vec<GroupMeta> = Vec::new();
7097 let mut key_to_slot: BTreeMap<Vec<u32>, usize> = BTreeMap::new();
7098 let mut key_buf: Vec<u32> = Vec::with_capacity(nkeys);
7099
7100 for row in visible_rows {
7101 key_buf.clear();
7102 for c in &cat.codes {
7103 key_buf.push(c[row]);
7104 }
7105 if let Some(&slot) = key_to_slot.get(&key_buf) {
7106 groups[slot].row_indices.push(row);
7107 } else {
7108 let key_values: Vec<String> = (0..nkeys)
7110 .map(|i| cat.levels[i][key_buf[i] as usize].clone())
7111 .collect();
7112 let slot = groups.len();
7113 key_to_slot.insert(key_buf.clone(), slot);
7114 groups.push(GroupMeta { key_values, row_indices: vec![row] });
7115 }
7116 }
7117
7118 GroupIndex { groups, key_names }
7119}
7120
7121impl TidyView {
7124 pub fn group_by_fast(&self, keys: &[&str]) -> Result<GroupedTidyView, TidyError> {
7129 let mut key_col_indices = Vec::with_capacity(keys.len());
7130 for &key in keys {
7131 let idx = self.base.columns.iter().position(|(n, _)| n == key)
7132 .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
7133 key_col_indices.push(idx);
7134 }
7135 let key_names: Vec<String> = keys.iter().map(|s| s.to_string()).collect();
7136 let index = GroupIndex::build_fast(&self.base, &key_col_indices, self.mask.iter_indices(), key_names);
7137 Ok(GroupedTidyView { view: self.clone(), index })
7138 }
7139}
7140
7141#[derive(Debug, Clone)]
7173pub enum StreamingAgg {
7174 Count,
7176 Sum(String),
7178 Mean(String),
7180 Min(String),
7182 Max(String),
7184 Var(String),
7186 Sd(String),
7188}
7189
7190#[derive(Debug, Clone)]
7192enum AccState {
7193 Count {
7194 n: u64,
7195 },
7196 Sum {
7197 sum: f64,
7199 c: f64,
7200 },
7201 Mean {
7202 sum: f64,
7204 c: f64,
7205 n: u64,
7206 },
7207 Min {
7208 cur: f64,
7209 any: bool,
7210 },
7211 Max {
7212 cur: f64,
7213 any: bool,
7214 },
7215 Welford {
7217 n: u64,
7218 mean: f64,
7219 m2: f64,
7220 },
7221}
7222
7223impl AccState {
7224 fn from_agg(agg: &StreamingAgg) -> Self {
7225 match agg {
7226 StreamingAgg::Count => AccState::Count { n: 0 },
7227 StreamingAgg::Sum(_) => AccState::Sum { sum: 0.0, c: 0.0 },
7228 StreamingAgg::Mean(_) => AccState::Mean { sum: 0.0, c: 0.0, n: 0 },
7229 StreamingAgg::Min(_) => AccState::Min {
7230 cur: f64::INFINITY,
7231 any: false,
7232 },
7233 StreamingAgg::Max(_) => AccState::Max {
7234 cur: f64::NEG_INFINITY,
7235 any: false,
7236 },
7237 StreamingAgg::Var(_) | StreamingAgg::Sd(_) => AccState::Welford {
7238 n: 0,
7239 mean: 0.0,
7240 m2: 0.0,
7241 },
7242 }
7243 }
7244
7245 fn update(&mut self, x: f64) {
7246 match self {
7247 AccState::Count { n } => *n += 1,
7248 AccState::Sum { sum, c } => {
7249 let y = x - *c;
7251 let t = *sum + y;
7252 *c = (t - *sum) - y;
7253 *sum = t;
7254 }
7255 AccState::Mean { sum, c, n } => {
7256 let y = x - *c;
7257 let t = *sum + y;
7258 *c = (t - *sum) - y;
7259 *sum = t;
7260 *n += 1;
7261 }
7262 AccState::Min { cur, any } => {
7263 if !x.is_nan() {
7264 if !*any || x < *cur {
7265 *cur = x;
7266 *any = true;
7267 }
7268 }
7269 }
7270 AccState::Max { cur, any } => {
7271 if !x.is_nan() {
7272 if !*any || x > *cur {
7273 *cur = x;
7274 *any = true;
7275 }
7276 }
7277 }
7278 AccState::Welford { n, mean, m2 } => {
7279 *n += 1;
7281 let delta = x - *mean;
7282 *mean += delta / (*n as f64);
7283 let delta2 = x - *mean;
7284 *m2 += delta * delta2;
7285 }
7286 }
7287 }
7288
7289 fn finalize(&self, agg: &StreamingAgg) -> f64 {
7290 match (self, agg) {
7291 (AccState::Count { n }, StreamingAgg::Count) => *n as f64,
7292 (AccState::Sum { sum, .. }, StreamingAgg::Sum(_)) => *sum,
7293 (AccState::Mean { sum, n, .. }, StreamingAgg::Mean(_)) => {
7294 if *n == 0 {
7295 f64::NAN
7296 } else {
7297 *sum / (*n as f64)
7298 }
7299 }
7300 (AccState::Min { cur, any }, StreamingAgg::Min(_)) => {
7301 if *any {
7302 *cur
7303 } else {
7304 f64::NAN
7305 }
7306 }
7307 (AccState::Max { cur, any }, StreamingAgg::Max(_)) => {
7308 if *any {
7309 *cur
7310 } else {
7311 f64::NAN
7312 }
7313 }
7314 (AccState::Welford { n, m2, .. }, StreamingAgg::Var(_)) => {
7315 if *n < 2 {
7316 f64::NAN
7317 } else {
7318 *m2 / ((*n - 1) as f64)
7319 }
7320 }
7321 (AccState::Welford { n, m2, .. }, StreamingAgg::Sd(_)) => {
7322 if *n < 2 {
7323 f64::NAN
7324 } else {
7325 (*m2 / ((*n - 1) as f64)).sqrt()
7326 }
7327 }
7328 _ => f64::NAN,
7329 }
7330 }
7331}
7332
7333fn row_as_f64(col: &Column, row: usize) -> f64 {
7335 match col {
7336 Column::Float(v) => v[row],
7337 Column::Int(v) => v[row] as f64,
7338 _ => f64::NAN,
7339 }
7340}
7341
7342impl TidyView {
7343 pub fn summarise_streaming(
7351 &self,
7352 keys: &[&str],
7353 aggs: &[(&str, StreamingAgg)],
7354 ) -> Result<TidyFrame, TidyError> {
7355 {
7357 let mut seen = std::collections::BTreeSet::new();
7358 for &(name, _) in aggs {
7359 if !seen.insert(name) {
7360 return Err(TidyError::DuplicateColumn(name.to_string()));
7361 }
7362 }
7363 for &k in keys {
7364 if seen.contains(k) {
7365 return Err(TidyError::DuplicateColumn(k.to_string()));
7366 }
7367 }
7368 }
7369
7370 let mut key_col_indices = Vec::with_capacity(keys.len());
7372 for &key in keys {
7373 let idx = self
7374 .base
7375 .columns
7376 .iter()
7377 .position(|(n, _)| n == key)
7378 .ok_or_else(|| TidyError::ColumnNotFound(key.to_string()))?;
7379 key_col_indices.push(idx);
7380 }
7381
7382 let agg_col_indices: Vec<Option<usize>> = aggs
7384 .iter()
7385 .map(|(_, agg)| match agg {
7386 StreamingAgg::Count => None,
7387 StreamingAgg::Sum(c)
7388 | StreamingAgg::Mean(c)
7389 | StreamingAgg::Min(c)
7390 | StreamingAgg::Max(c)
7391 | StreamingAgg::Var(c)
7392 | StreamingAgg::Sd(c) => self.base.columns.iter().position(|(n, _)| n == c),
7393 })
7394 .collect();
7395 for (i, (_, agg)) in aggs.iter().enumerate() {
7396 if matches!(agg, StreamingAgg::Count) {
7397 continue;
7398 }
7399 if agg_col_indices[i].is_none() {
7400 let col_name = match agg {
7401 StreamingAgg::Sum(c)
7402 | StreamingAgg::Mean(c)
7403 | StreamingAgg::Min(c)
7404 | StreamingAgg::Max(c)
7405 | StreamingAgg::Var(c)
7406 | StreamingAgg::Sd(c) => c.clone(),
7407 _ => String::new(),
7408 };
7409 return Err(TidyError::ColumnNotFound(col_name));
7410 }
7411 }
7412
7413 let cat = collect_categorical_keys(&self.base, &key_col_indices);
7415
7416 use std::collections::BTreeMap;
7419
7420 let n_aggs = aggs.len();
7421 let init_accs = || -> Vec<AccState> {
7422 aggs.iter().map(|(_, a)| AccState::from_state(a)).collect()
7423 };
7424
7425 fn _unused() {}
7428
7429 let (cat_state, str_state) = if let Some(cat) = cat.as_ref() {
7431 let mut state: BTreeMap<Vec<u32>, Vec<AccState>> = BTreeMap::new();
7432 let mut key_buf: Vec<u32> = Vec::with_capacity(cat.codes.len());
7433 for row in self.mask.iter_indices() {
7434 key_buf.clear();
7435 for c in &cat.codes {
7436 key_buf.push(c[row]);
7437 }
7438 let entry = state
7439 .entry(key_buf.clone())
7440 .or_insert_with(&init_accs);
7441 for (i, (_, agg)) in aggs.iter().enumerate() {
7442 if let Some(col_idx) = agg_col_indices[i] {
7443 entry[i].update(row_as_f64(&self.base.columns[col_idx].1, row));
7444 } else {
7445 entry[i].update(0.0); }
7447 }
7448 }
7449 (Some(state), None)
7450 } else {
7451 let mut state: BTreeMap<Vec<String>, Vec<AccState>> = BTreeMap::new();
7452 for row in self.mask.iter_indices() {
7453 let key: Vec<String> = key_col_indices
7454 .iter()
7455 .map(|&ci| self.base.columns[ci].1.get_display(row))
7456 .collect();
7457 let entry = state.entry(key).or_insert_with(&init_accs);
7458 for (i, (_, agg)) in aggs.iter().enumerate() {
7459 if let Some(col_idx) = agg_col_indices[i] {
7460 entry[i].update(row_as_f64(&self.base.columns[col_idx].1, row));
7461 } else {
7462 entry[i].update(0.0);
7463 }
7464 }
7465 }
7466 (None, Some(state))
7467 };
7468
7469 let n_groups = cat_state
7471 .as_ref()
7472 .map(|s| s.len())
7473 .unwrap_or_else(|| str_state.as_ref().unwrap().len());
7474
7475 let mut result_columns: Vec<(String, Column)> = Vec::with_capacity(keys.len() + n_aggs);
7476
7477 if let Some(state) = &cat_state {
7479 let cat = cat.as_ref().unwrap();
7480 for (ki, &key_col_idx) in key_col_indices.iter().enumerate() {
7481 let mut vals: Vec<String> = Vec::with_capacity(n_groups);
7482 for key_codes in state.keys() {
7483 let code = key_codes[ki] as usize;
7484 vals.push(cat.levels[ki][code].clone());
7485 }
7486 let key_name = self.base.columns[key_col_idx].0.clone();
7487 let levels: Vec<String> = cat.levels[ki].to_vec();
7489 let codes: Vec<u32> = state.keys().map(|k| k[ki]).collect();
7490 result_columns.push((key_name, Column::Categorical { levels, codes }));
7491 let _ = vals;
7492 }
7493 } else {
7494 let state = str_state.as_ref().unwrap();
7495 for (ki, &key_col_idx) in key_col_indices.iter().enumerate() {
7496 let key_name = self.base.columns[key_col_idx].0.clone();
7497 let vals: Vec<String> = state.keys().map(|k| k[ki].clone()).collect();
7498 result_columns.push((key_name, Column::Str(vals)));
7499 }
7500 }
7501
7502 for (i, (out_name, agg)) in aggs.iter().enumerate() {
7504 let vals: Vec<f64> = if let Some(state) = &cat_state {
7505 state.values().map(|accs| accs[i].finalize(agg)).collect()
7506 } else {
7507 str_state
7508 .as_ref()
7509 .unwrap()
7510 .values()
7511 .map(|accs| accs[i].finalize(agg))
7512 .collect()
7513 };
7514 let col = if matches!(agg, StreamingAgg::Count) {
7515 Column::Int(vals.into_iter().map(|x| x as i64).collect())
7516 } else {
7517 Column::Float(vals)
7518 };
7519 result_columns.push((out_name.to_string(), col));
7520 }
7521
7522 let df = DataFrame::from_columns(result_columns)
7523 .map_err(|e| TidyError::Internal(e.to_string()))?;
7524 Ok(TidyFrame::from_df(df))
7525 }
7526}
7527
7528impl AccState {
7529 fn from_state(agg: &StreamingAgg) -> Self {
7530 Self::from_agg(agg)
7531 }
7532}
7533
7534#[derive(Clone, Debug)]
7600pub struct FctColumn {
7601 pub levels: Vec<String>,
7604 pub data: Vec<u16>,
7606}
7607
7608impl FctColumn {
7609 pub fn encode(strings: &[String]) -> Result<Self, TidyError> {
7616 use std::collections::BTreeMap;
7617 let mut levels: Vec<String> = Vec::new();
7618 let mut level_map: BTreeMap<String, u16> = BTreeMap::new();
7622 let mut data: Vec<u16> = Vec::with_capacity(strings.len());
7623
7624 for s in strings {
7625 let idx = if let Some(&existing) = level_map.get(s.as_str()) {
7626 existing
7627 } else {
7628 let next = levels.len();
7629 if next >= 65_535 {
7630 return Err(TidyError::CapacityExceeded {
7631 limit: 65_535,
7632 got: next + 1,
7633 });
7634 }
7635 let idx = next as u16;
7636 levels.push(s.clone());
7637 level_map.insert(s.clone(), idx);
7638 idx
7639 };
7640 data.push(idx);
7641 }
7642 Ok(FctColumn { levels, data })
7643 }
7644
7645 pub fn encode_from_view(view: &TidyView, col: &str) -> Result<Self, TidyError> {
7647 let base_idx = view.base.columns.iter()
7648 .position(|(n, _)| n == col)
7649 .ok_or_else(|| TidyError::ColumnNotFound(col.to_string()))?;
7650 if !view.proj.indices().contains(&base_idx) {
7652 return Err(TidyError::ColumnNotFound(col.to_string()));
7653 }
7654 let col_data = &view.base.columns[base_idx].1;
7655 let visible: Vec<usize> = view.mask.iter_indices().collect();
7656 let strings: Vec<String> = visible.iter()
7657 .map(|&r| col_data.get_display(r))
7658 .collect();
7659 Self::encode(&strings)
7660 }
7661
7662 pub fn nrows(&self) -> usize { self.data.len() }
7666 pub fn nlevels(&self) -> usize { self.levels.len() }
7668
7669 pub fn decode(&self, i: usize) -> &str {
7671 &self.levels[self.data[i] as usize]
7672 }
7673
7674 pub fn fct_lump(&self, n: usize) -> Result<Self, TidyError> {
7686 if n >= self.levels.len() {
7687 return Ok(self.clone()); }
7689
7690 let mut freq = vec![0usize; self.levels.len()];
7692 for &idx in &self.data {
7693 freq[idx as usize] += 1;
7694 }
7695
7696 let mut ranked: Vec<(usize, usize)> = freq.iter().copied().enumerate().collect();
7699 ranked.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
7700
7701 let mut keep_set: Vec<usize> = ranked[..n].iter().map(|(i, _)| *i).collect();
7703 keep_set.sort_unstable(); let mut other_name = "Other".to_string();
7707 while keep_set.iter().any(|&ki| self.levels[ki] == other_name) {
7708 other_name.push('_');
7709 }
7710
7711 let mut new_levels: Vec<String> = keep_set.iter().map(|&ki| self.levels[ki].clone()).collect();
7713 let other_idx = new_levels.len() as u16;
7714 new_levels.push(other_name);
7715
7716 let mut remap = vec![other_idx; self.levels.len()];
7718 for (new_i, &old_i) in keep_set.iter().enumerate() {
7719 remap[old_i] = new_i as u16;
7720 }
7721
7722 let new_data: Vec<u16> = self.data.iter().map(|&d| remap[d as usize]).collect();
7723 Ok(FctColumn { levels: new_levels, data: new_data })
7724 }
7725
7726 pub fn fct_reorder(&self, summary_vals: &[f64], descending: bool) -> Result<Self, TidyError> {
7735 if summary_vals.len() != self.levels.len() {
7736 return Err(TidyError::LengthMismatch {
7737 expected: self.levels.len(),
7738 got: summary_vals.len(),
7739 });
7740 }
7741 let mut order: Vec<usize> = (0..self.levels.len()).collect();
7745 order.sort_by(|&a, &b| {
7746 let va = summary_vals[a];
7747 let vb = summary_vals[b];
7748 match (va.is_nan(), vb.is_nan()) {
7749 (true, true) => std::cmp::Ordering::Equal,
7750 (true, false) => std::cmp::Ordering::Greater, (false, true) => std::cmp::Ordering::Less, (false, false) => {
7753 let cmp = va.partial_cmp(&vb).unwrap_or(std::cmp::Ordering::Equal);
7754 if descending { cmp.reverse() } else { cmp }
7755 }
7756 }
7757 });
7758
7759 let new_levels: Vec<String> = order.iter().map(|&i| self.levels[i].clone()).collect();
7761
7762 let mut remap = vec![0u16; self.levels.len()];
7764 for (new_i, &old_i) in order.iter().enumerate() {
7765 remap[old_i] = new_i as u16;
7766 }
7767
7768 let new_data: Vec<u16> = self.data.iter().map(|&d| remap[d as usize]).collect();
7769 Ok(FctColumn { levels: new_levels, data: new_data })
7770 }
7771
7772 pub fn fct_reorder_by_col(&self, numeric_col: &Column, descending: bool) -> Result<Self, TidyError> {
7778 if numeric_col.len() != self.data.len() {
7779 return Err(TidyError::LengthMismatch {
7780 expected: self.data.len(),
7781 got: numeric_col.len(),
7782 });
7783 }
7784 let mut sums = vec![0.0f64; self.levels.len()];
7785 let mut counts = vec![0usize; self.levels.len()];
7786 match numeric_col {
7787 Column::Float(v) => {
7788 for (i, &d) in self.data.iter().enumerate() {
7789 let val = v[i];
7790 if !val.is_nan() {
7791 sums[d as usize] += val;
7792 counts[d as usize] += 1;
7793 }
7794 }
7795 }
7796 Column::Int(v) => {
7797 for (i, &d) in self.data.iter().enumerate() {
7798 sums[d as usize] += v[i] as f64;
7799 counts[d as usize] += 1;
7800 }
7801 }
7802 _ => return Err(TidyError::TypeMismatch {
7803 expected: "Float or Int".to_string(),
7804 got: numeric_col.type_name().to_string(),
7805 }),
7806 }
7807 let means: Vec<f64> = sums.iter().zip(counts.iter())
7808 .map(|(&s, &c)| if c == 0 { f64::NAN } else { s / c as f64 })
7809 .collect();
7810 self.fct_reorder(&means, descending)
7811 }
7812
7813 pub fn fct_collapse(&self, mapping: &[(&str, &str)]) -> Result<Self, TidyError> {
7830 if mapping.is_empty() {
7831 return Ok(self.clone());
7832 }
7833 let new_name_for: Vec<String> = self.levels.iter().map(|old| {
7835 if let Some((_, new)) = mapping.iter().find(|(o, _)| *o == old.as_str()) {
7836 new.to_string()
7837 } else {
7838 old.clone()
7839 }
7840 }).collect();
7841
7842 use std::collections::BTreeMap;
7845 let mut new_levels: Vec<String> = Vec::new();
7846 let mut new_name_to_idx: BTreeMap<String, u16> = BTreeMap::new();
7847
7848 let mut old_to_new: Vec<u16> = Vec::with_capacity(self.levels.len());
7849 for name in &new_name_for {
7850 let idx = if let Some(&existing) = new_name_to_idx.get(name.as_str()) {
7851 existing
7852 } else {
7853 let idx = new_levels.len() as u16;
7854 new_levels.push(name.clone());
7855 new_name_to_idx.insert(name.clone(), idx);
7856 idx
7857 };
7858 old_to_new.push(idx);
7859 }
7860
7861 let changed = old_to_new.iter().enumerate().any(|(i, &new)| new != i as u16);
7863 let new_data = if changed {
7864 self.data.iter().map(|&d| old_to_new[d as usize]).collect()
7865 } else {
7866 self.data.clone()
7867 };
7868 Ok(FctColumn { levels: new_levels, data: new_data })
7869 }
7870
7871 pub fn to_str_column(&self) -> Column {
7875 Column::Str(self.data.iter().map(|&d| self.levels[d as usize].clone()).collect())
7876 }
7877
7878 pub fn gather(&self, indices: &[usize]) -> FctColumn {
7880 FctColumn {
7881 levels: self.levels.clone(),
7882 data: indices.iter().map(|&i| self.data[i]).collect(),
7883 }
7884 }
7885}
7886
7887impl TidyError {
7890 pub fn capacity_exceeded(limit: usize, got: usize) -> Self {
7892 TidyError::CapacityExceeded { limit, got }
7893 }
7894}
7895
7896#[derive(Clone, Debug)]
7901pub struct NullableFactor {
7902 pub fct: FctColumn,
7903 pub validity: BitMask,
7904}
7905
7906impl NullableFactor {
7907 pub fn from_fct(fct: FctColumn) -> Self {
7909 let n = fct.nrows();
7910 NullableFactor { fct, validity: BitMask::all_true(n) }
7911 }
7912
7913 pub fn new(fct: FctColumn, validity: BitMask) -> Self {
7915 NullableFactor { fct, validity }
7916 }
7917
7918 pub fn encode_nullable(strings: &[Option<String>]) -> Result<Self, TidyError> {
7922 use std::collections::BTreeMap;
7923 let mut levels: Vec<String> = Vec::new();
7924 let mut level_map: BTreeMap<String, u16> = BTreeMap::new();
7925 let mut data: Vec<u16> = Vec::with_capacity(strings.len());
7926 let mut valid_flags: Vec<bool> = Vec::with_capacity(strings.len());
7927
7928 for opt in strings {
7929 match opt {
7930 None => {
7931 data.push(0); valid_flags.push(false);
7933 }
7934 Some(s) => {
7935 let idx = if let Some(&existing) = level_map.get(s.as_str()) {
7936 existing
7937 } else {
7938 let next = levels.len();
7939 if next >= 65_535 {
7940 return Err(TidyError::CapacityExceeded { limit: 65_535, got: next + 1 });
7941 }
7942 let idx = next as u16;
7943 levels.push(s.clone());
7944 level_map.insert(s.clone(), idx);
7945 idx
7946 };
7947 data.push(idx);
7948 valid_flags.push(true);
7949 }
7950 }
7951 }
7952 let fct = FctColumn { levels, data };
7953 let validity = BitMask::from_bools(&valid_flags);
7954 Ok(NullableFactor { fct, validity })
7955 }
7956
7957 pub fn nrows(&self) -> usize { self.fct.nrows() }
7959 pub fn nlevels(&self) -> usize { self.fct.nlevels() }
7961 pub fn is_null(&self, i: usize) -> bool { !self.validity.get(i) }
7963 pub fn count_valid(&self) -> usize { self.validity.count_ones() }
7965
7966 pub fn decode(&self, i: usize) -> Option<&str> {
7968 if self.is_null(i) { None } else { Some(self.fct.decode(i)) }
7969 }
7970
7971 pub fn fct_lump(&self, n: usize) -> Result<Self, TidyError> {
7973 let lumped = self.fct.fct_lump(n)?;
7974 Ok(NullableFactor { fct: lumped, validity: self.validity.clone() })
7975 }
7976
7977 pub fn fct_reorder(&self, summary_vals: &[f64], descending: bool) -> Result<Self, TidyError> {
7979 let reordered = self.fct.fct_reorder(summary_vals, descending)?;
7980 Ok(NullableFactor { fct: reordered, validity: self.validity.clone() })
7981 }
7982
7983 pub fn fct_collapse(&self, mapping: &[(&str, &str)]) -> Result<Self, TidyError> {
7985 let collapsed = self.fct.fct_collapse(mapping)?;
7986 Ok(NullableFactor { fct: collapsed, validity: self.validity.clone() })
7987 }
7988}
7989
7990impl TidyView {
7993 pub fn fct_encode(&self, col: &str) -> Result<FctColumn, TidyError> {
7998 FctColumn::encode_from_view(self, col)
7999 }
8000
8001 pub fn fct_summary_means(
8006 &self,
8007 fct: &FctColumn,
8008 numeric_col: &str,
8009 ) -> Result<Vec<f64>, TidyError> {
8010 let base_idx = self.base.columns.iter()
8011 .position(|(n, _)| n == numeric_col)
8012 .ok_or_else(|| TidyError::ColumnNotFound(numeric_col.to_string()))?;
8013 let nc = &self.base.columns[base_idx].1;
8014 if nc.len() != fct.nrows() {
8015 return Err(TidyError::LengthMismatch { expected: fct.nrows(), got: nc.len() });
8016 }
8017 match nc {
8019 Column::Float(_) | Column::Int(_) => {}
8020 _ => return Err(TidyError::TypeMismatch {
8021 expected: "Float or Int".to_string(),
8022 got: nc.type_name().to_string(),
8023 }),
8024 }
8025 let mut sums = vec![0.0f64; fct.levels.len()];
8026 let mut counts = vec![0usize; fct.levels.len()];
8027 match nc {
8028 Column::Float(v) => {
8029 for (i, &d) in fct.data.iter().enumerate() {
8030 if !v[i].is_nan() {
8031 sums[d as usize] += v[i];
8032 counts[d as usize] += 1;
8033 }
8034 }
8035 }
8036 Column::Int(v) => {
8037 for (i, &d) in fct.data.iter().enumerate() {
8038 sums[d as usize] += v[i] as f64;
8039 counts[d as usize] += 1;
8040 }
8041 }
8042 _ => unreachable!(),
8043 }
8044 Ok(sums.iter().zip(counts.iter())
8045 .map(|(&s, &c)| if c == 0 { f64::NAN } else { s / c as f64 })
8046 .collect())
8047 }
8048}
8049
8050pub fn label_encode(col: &[String]) -> (Vec<String>, Vec<u32>) {
8057 let unique: BTreeSet<&str> = col.iter().map(|s| s.as_str()).collect();
8058 let levels: Vec<String> = unique.into_iter().map(|s| s.to_string()).collect();
8059
8060 let lookup: BTreeMap<&str, u32> = levels
8061 .iter()
8062 .enumerate()
8063 .map(|(i, s)| (s.as_str(), i as u32))
8064 .collect();
8065
8066 let codes: Vec<u32> = col.iter().map(|s| lookup[s.as_str()]).collect();
8067 (levels, codes)
8068}
8069
8070pub fn ordinal_encode(col: &[String], order: &[String]) -> Result<(Vec<String>, Vec<u32>), String> {
8075 let lookup: BTreeMap<&str, u32> = order
8076 .iter()
8077 .enumerate()
8078 .map(|(i, s)| (s.as_str(), i as u32))
8079 .collect();
8080
8081 let mut codes = Vec::with_capacity(col.len());
8082 for s in col {
8083 match lookup.get(s.as_str()) {
8084 Some(&idx) => codes.push(idx),
8085 None => return Err(format!("value {:?} not found in specified order", s)),
8086 }
8087 }
8088 Ok((order.to_vec(), codes))
8089}
8090
8091pub fn one_hot_encode(levels: &[String], codes: &[u32]) -> (Vec<String>, Vec<Vec<bool>>) {
8096 let n_levels = levels.len();
8097 let n_rows = codes.len();
8098
8099 let mut columns: Vec<Vec<bool>> = vec![vec![false; n_rows]; n_levels];
8100 for (row, &code) in codes.iter().enumerate() {
8101 columns[code as usize][row] = true;
8102 }
8103
8104 let names: Vec<String> = levels.to_vec();
8105 (names, columns)
8106}
8107
8108#[cfg(test)]
8109mod rolling_window_tests {
8110 use super::*;
8111
8112 fn make_df(col_name: &str, vals: Vec<f64>) -> DataFrame {
8114 DataFrame {
8115 columns: vec![(col_name.to_string(), Column::Float(vals))],
8116 }
8117 }
8118
8119 #[test]
8120 fn rolling_sum_basic() {
8121 let df = make_df("x", vec![1.0, 2.0, 3.0, 4.0, 5.0]);
8124 let expr = DExpr::RollingSum("x".into(), 3);
8125 let col = eval_expr_column(&df, &expr, 5).unwrap();
8126 match col {
8127 Column::Float(v) => {
8128 assert_eq!(v.len(), 5);
8129 assert!((v[0] - 1.0).abs() < 1e-12);
8130 assert!((v[1] - 3.0).abs() < 1e-12);
8131 assert!((v[2] - 6.0).abs() < 1e-12);
8132 assert!((v[3] - 9.0).abs() < 1e-12);
8133 assert!((v[4] - 12.0).abs() < 1e-12);
8134 }
8135 _ => panic!("expected Float column"),
8136 }
8137 }
8138
8139 #[test]
8140 fn rolling_mean_basic() {
8141 let df = make_df("x", vec![1.0, 2.0, 3.0, 4.0, 5.0]);
8144 let expr = DExpr::RollingMean("x".into(), 3);
8145 let col = eval_expr_column(&df, &expr, 5).unwrap();
8146 match col {
8147 Column::Float(v) => {
8148 assert_eq!(v.len(), 5);
8149 assert!((v[0] - 1.0).abs() < 1e-12);
8150 assert!((v[1] - 1.5).abs() < 1e-12);
8151 assert!((v[2] - 2.0).abs() < 1e-12);
8152 assert!((v[3] - 3.0).abs() < 1e-12);
8153 assert!((v[4] - 4.0).abs() < 1e-12);
8154 }
8155 _ => panic!("expected Float column"),
8156 }
8157 }
8158
8159 #[test]
8160 fn rolling_min_basic() {
8161 let df = make_df("x", vec![5.0, 3.0, 4.0, 1.0, 2.0]);
8164 let expr = DExpr::RollingMin("x".into(), 3);
8165 let col = eval_expr_column(&df, &expr, 5).unwrap();
8166 match col {
8167 Column::Float(v) => {
8168 assert_eq!(v.len(), 5);
8169 assert!((v[0] - 5.0).abs() < 1e-12);
8170 assert!((v[1] - 3.0).abs() < 1e-12);
8171 assert!((v[2] - 3.0).abs() < 1e-12);
8172 assert!((v[3] - 1.0).abs() < 1e-12);
8173 assert!((v[4] - 1.0).abs() < 1e-12);
8174 }
8175 _ => panic!("expected Float column"),
8176 }
8177 }
8178
8179 #[test]
8180 fn rolling_max_basic() {
8181 let df = make_df("x", vec![1.0, 5.0, 3.0, 2.0, 4.0]);
8184 let expr = DExpr::RollingMax("x".into(), 3);
8185 let col = eval_expr_column(&df, &expr, 5).unwrap();
8186 match col {
8187 Column::Float(v) => {
8188 assert_eq!(v.len(), 5);
8189 assert!((v[0] - 1.0).abs() < 1e-12);
8190 assert!((v[1] - 5.0).abs() < 1e-12);
8191 assert!((v[2] - 5.0).abs() < 1e-12);
8192 assert!((v[3] - 5.0).abs() < 1e-12);
8193 assert!((v[4] - 4.0).abs() < 1e-12);
8194 }
8195 _ => panic!("expected Float column"),
8196 }
8197 }
8198
8199 #[test]
8200 fn rolling_var_basic() {
8201 let df = make_df("x", vec![2.0, 4.0, 6.0, 8.0]);
8203 let expr = DExpr::RollingVar("x".into(), 3);
8204 let col = eval_expr_column(&df, &expr, 4).unwrap();
8205 match col {
8206 Column::Float(v) => {
8207 assert_eq!(v.len(), 4);
8208 assert!((v[0] - 0.0).abs() < 1e-12);
8210 assert!((v[1] - 2.0).abs() < 1e-10);
8212 assert!((v[2] - 4.0).abs() < 1e-10);
8214 assert!((v[3] - 4.0).abs() < 1e-10);
8216 }
8217 _ => panic!("expected Float column"),
8218 }
8219 }
8220
8221 #[test]
8222 fn rolling_sd_basic() {
8223 let df = make_df("x", vec![2.0, 4.0, 6.0, 8.0]);
8224 let expr = DExpr::RollingSd("x".into(), 3);
8225 let col = eval_expr_column(&df, &expr, 4).unwrap();
8226 match col {
8227 Column::Float(v) => {
8228 assert_eq!(v.len(), 4);
8229 assert!((v[0] - 0.0).abs() < 1e-12);
8230 assert!((v[1] - 2.0_f64.sqrt()).abs() < 1e-10);
8231 assert!((v[2] - 2.0).abs() < 1e-10);
8232 assert!((v[3] - 2.0).abs() < 1e-10);
8233 }
8234 _ => panic!("expected Float column"),
8235 }
8236 }
8237
8238 #[test]
8239 fn rolling_window_larger_than_data() {
8240 let df = make_df("x", vec![1.0, 2.0, 3.0]);
8241 let expr = DExpr::RollingSum("x".into(), 10);
8242 let col = eval_expr_column(&df, &expr, 3).unwrap();
8243 match col {
8244 Column::Float(v) => {
8245 assert_eq!(v.len(), 3);
8246 assert!((v[0] - 1.0).abs() < 1e-12);
8247 assert!((v[1] - 3.0).abs() < 1e-12);
8248 assert!((v[2] - 6.0).abs() < 1e-12);
8249 }
8250 _ => panic!("expected Float column"),
8251 }
8252 }
8253
8254 #[test]
8255 fn rolling_window_of_one() {
8256 let df = make_df("x", vec![3.0, 1.0, 4.0, 1.0, 5.0]);
8257 let expr_min = DExpr::RollingMin("x".into(), 1);
8258 let expr_max = DExpr::RollingMax("x".into(), 1);
8259 let col_min = eval_expr_column(&df, &expr_min, 5).unwrap();
8260 let col_max = eval_expr_column(&df, &expr_max, 5).unwrap();
8261 match (col_min, col_max) {
8262 (Column::Float(mins), Column::Float(maxs)) => {
8263 let expected = [3.0, 1.0, 4.0, 1.0, 5.0];
8264 for i in 0..5 {
8265 assert!((mins[i] - expected[i]).abs() < 1e-12, "min[{}]", i);
8266 assert!((maxs[i] - expected[i]).abs() < 1e-12, "max[{}]", i);
8267 }
8268 }
8269 _ => panic!("expected Float columns"),
8270 }
8271 }
8272
8273 #[test]
8274 fn rolling_sum_with_nan() {
8275 let df = make_df("x", vec![1.0, f64::NAN, 3.0, 4.0]);
8276 let expr = DExpr::RollingSum("x".into(), 2);
8277 let col = eval_expr_column(&df, &expr, 4).unwrap();
8278 match col {
8279 Column::Float(v) => {
8280 assert_eq!(v.len(), 4);
8281 assert!((v[0] - 1.0).abs() < 1e-12);
8282 assert!(v[1].is_nan());
8283 assert!(v[2].is_nan());
8284 assert!(v[3].is_nan()); }
8286 _ => panic!("expected Float column"),
8287 }
8288 }
8289
8290 #[test]
8291 fn rolling_determinism() {
8292 let df = make_df("x", vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]);
8293 let expr = DExpr::RollingSum("x".into(), 4);
8294 let mut runs: Vec<Vec<f64>> = Vec::new();
8295 for _ in 0..3 {
8296 let col = eval_expr_column(&df, &expr, 10).unwrap();
8297 match col {
8298 Column::Float(v) => runs.push(v),
8299 _ => panic!("expected Float column"),
8300 }
8301 }
8302 assert_eq!(runs[0], runs[1]);
8303 assert_eq!(runs[1], runs[2]);
8304 }
8305
8306 #[test]
8307 fn rolling_display() {
8308 let expr = DExpr::RollingSum("val".into(), 5);
8309 assert_eq!(format!("{}", expr), "rolling_sum(\"val\", 5)");
8310 let expr2 = DExpr::RollingMean("col".into(), 3);
8311 assert_eq!(format!("{}", expr2), "rolling_mean(\"col\", 3)");
8312 }
8313
8314 #[test]
8315 fn rolling_collect_columns() {
8316 let expr = DExpr::RollingSum("revenue".into(), 7);
8317 let mut cols = Vec::new();
8318 collect_expr_columns(&expr, &mut cols);
8319 assert_eq!(cols, vec!["revenue".to_string()]);
8320 }
8321
8322 #[test]
8323 fn rolling_not_allowed_in_row_context() {
8324 let df = make_df("x", vec![1.0, 2.0, 3.0]);
8325 let expr = DExpr::RollingSum("x".into(), 2);
8326 let result = eval_expr_row(&df, &expr, 0);
8327 assert!(result.is_err());
8328 }
8329
8330 fn cat_col(levels: &[&str], codes: &[u32]) -> Column {
8333 Column::Categorical {
8334 levels: levels.iter().map(|s| s.to_string()).collect(),
8335 codes: codes.to_vec(),
8336 }
8337 }
8338
8339 #[test]
8340 fn phase4_collect_cat_keys_returns_some_when_all_categorical() {
8341 let left = DataFrame::from_columns(vec![
8342 ("k".into(), cat_col(&["a", "b", "c"], &[0, 1, 2, 0])),
8343 ])
8344 .unwrap();
8345 let right = DataFrame::from_columns(vec![
8346 ("k".into(), cat_col(&["b", "a"], &[0, 1, 1])),
8347 ])
8348 .unwrap();
8349 let cat = collect_categorical_join_keys(&left, &[0], &right, &[0]).unwrap();
8350 assert_eq!(cat.right_to_left[0], vec![Some(1u32), Some(0u32)]);
8352 }
8353
8354 #[test]
8355 fn phase4_collect_cat_keys_returns_none_on_mixed_types() {
8356 let left = DataFrame::from_columns(vec![
8357 ("k".into(), cat_col(&["a"], &[0])),
8358 ("n".into(), Column::Int(vec![1])),
8359 ])
8360 .unwrap();
8361 let right = DataFrame::from_columns(vec![
8362 ("k".into(), cat_col(&["a"], &[0])),
8363 ("n".into(), Column::Int(vec![1])),
8364 ])
8365 .unwrap();
8366 assert!(collect_categorical_join_keys(&left, &[0, 1], &right, &[0, 1]).is_none());
8368 }
8369
8370 #[test]
8371 fn phase4_collect_cat_keys_unknown_right_level_yields_none_in_remap() {
8372 let left = DataFrame::from_columns(vec![
8373 ("k".into(), cat_col(&["a", "b"], &[0, 1])),
8374 ])
8375 .unwrap();
8376 let right = DataFrame::from_columns(vec![
8377 ("k".into(), cat_col(&["a", "z"], &[0, 1])),
8378 ])
8379 .unwrap();
8380 let cat = collect_categorical_join_keys(&left, &[0], &right, &[0]).unwrap();
8381 assert_eq!(cat.right_to_left[0], vec![Some(0u32), None]);
8383 }
8384
8385 #[test]
8386 fn phase4_column_to_categorical_column_roundtrip() {
8387 let original = cat_col(&["red", "green", "blue"], &[0, 1, 2, 1, 0]);
8388 let cc = original.to_categorical_column().unwrap();
8389 let restored = Column::from_categorical_column(&cc).unwrap();
8390 match (&original, &restored) {
8391 (
8392 Column::Categorical { levels: l1, codes: c1 },
8393 Column::Categorical { levels: l2, codes: c2 },
8394 ) => {
8395 assert_eq!(l1, l2);
8396 assert_eq!(c1, c2);
8397 }
8398 _ => panic!("expected Categorical"),
8399 }
8400 }
8401
8402 #[test]
8403 fn phase4_column_to_categorical_column_none_for_non_categorical() {
8404 assert!(Column::Int(vec![1, 2, 3]).to_categorical_column().is_none());
8405 assert!(Column::Str(vec!["a".into()]).to_categorical_column().is_none());
8406 assert!(Column::Float(vec![1.0]).to_categorical_column().is_none());
8407 }
8408
8409 #[test]
8410 fn phase4_column_from_categorical_column_rejects_nulls() {
8411 use crate::byte_dict::CategoricalColumn;
8414 let mut cc = CategoricalColumn::new();
8415 cc.push(b"a").unwrap();
8416 cc.push_null();
8417 cc.push(b"b").unwrap();
8418 assert!(Column::from_categorical_column(&cc).is_none());
8419 }
8420
8421 #[test]
8422 fn phase4_column_from_categorical_column_rejects_non_utf8() {
8423 use crate::byte_dict::CategoricalColumn;
8424 let mut cc = CategoricalColumn::new();
8425 cc.push(&[0xFFu8]).unwrap();
8427 assert!(Column::from_categorical_column(&cc).is_none());
8428 }
8429}
8430
8431