Skip to main content

cjc_data/
tidy_dispatch.rs

1//! Shared tidy dispatch: maps CJC language method calls on TidyView /
2//! GroupedTidyView values to the concrete cjc_data API.
3//!
4//! Both `cjc-eval` and `cjc-mir-exec` call into `dispatch_tidy_method` and
5//! `dispatch_grouped_method` so that every tidy operation has a single source
6//! of truth.  The executors only need to pattern-match `Value::TidyView` or
7//! `Value::GroupedTidyView` and delegate here.
8//!
9//! # Error handling
10//! All errors are returned as `Err(String)`.  The caller wraps the string
11//! into its own error type (EvalError / MirExecError).
12
13use std::rc::Rc;
14use std::any::Any;
15
16use cjc_runtime::value::Value;
17
18use crate::{
19    ArrangeKey, Column, CsvConfig, CsvReader, DExpr, DBinOp, DataFrame, GroupedTidyView,
20    TidyAgg, TidyView,
21};
22
23// ============================================================================
24//  Public entry points
25// ============================================================================
26
27/// Dispatch a method call on a `Value::TidyView`.
28///
29/// Returns `Ok(Some(value))` if the method is known, `Ok(None)` if not
30/// recognised (allows the caller to fall through to other dispatch paths).
31pub fn dispatch_tidy_method(
32    inner: &Rc<dyn Any>,
33    method: &str,
34    args: &[Value],
35) -> Result<Option<Value>, String> {
36    let view = downcast_view(inner)?;
37    match method {
38        // -- shape ----------------------------------------------------------
39        "nrows" => Ok(Some(Value::Int(view.nrows() as i64))),
40        "ncols" => Ok(Some(Value::Int(view.ncols() as i64))),
41        "column_names" => {
42            let names: Vec<Value> = view
43                .column_names()
44                .into_iter()
45                .map(|s| Value::String(Rc::new(s.to_string())))
46                .collect();
47            Ok(Some(Value::Array(Rc::new(names))))
48        }
49
50        // -- filter ---------------------------------------------------------
51        "filter" => {
52            if args.len() != 1 {
53                return Err("TidyView.filter requires 1 argument: predicate DExpr".into());
54            }
55            let predicate = value_to_dexpr(&args[0])?;
56            let new_view = view.filter(&predicate).map_err(|e| format!("{e}"))?;
57            Ok(Some(wrap_view(new_view)))
58        }
59
60        // -- select ---------------------------------------------------------
61        "select" => {
62            if args.len() != 1 {
63                return Err("TidyView.select requires 1 argument: column names array".into());
64            }
65            let cols = value_to_str_vec(&args[0])?;
66            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
67            let new_view = view.select(&col_refs).map_err(|e| format!("{e}"))?;
68            Ok(Some(wrap_view(new_view)))
69        }
70
71        // -- mutate ---------------------------------------------------------
72        "mutate" => {
73            // mutate(name, expr) or mutate([(name, expr), ...])
74            // We support: mutate("col_name", dexpr_value)
75            if args.len() != 2 {
76                return Err("TidyView.mutate requires 2 arguments: column_name and expression".into());
77            }
78            let col_name = value_to_string(&args[0])?;
79            let expr = value_to_dexpr(&args[1])?;
80            let frame = view.mutate(&[(&col_name, expr)]).map_err(|e| format!("{e}"))?;
81            // mutate returns TidyFrame; convert to TidyView for pipeline continuity
82            Ok(Some(wrap_view(frame.view())))
83        }
84
85        // -- group_by -------------------------------------------------------
86        "group_by" => {
87            if args.len() != 1 {
88                return Err("TidyView.group_by requires 1 argument: key columns array".into());
89            }
90            let keys = value_to_str_vec(&args[0])?;
91            let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
92            let grouped = view.group_by(&key_refs).map_err(|e| format!("{e}"))?;
93            Ok(Some(wrap_grouped(grouped)))
94        }
95
96        // -- arrange --------------------------------------------------------
97        "arrange" => {
98            if args.len() != 1 {
99                return Err("TidyView.arrange requires 1 argument: sort keys array".into());
100            }
101            let keys = value_to_arrange_keys(&args[0])?;
102            let new_view = view.arrange(&keys).map_err(|e| format!("{e}"))?;
103            Ok(Some(wrap_view(new_view)))
104        }
105
106        // -- distinct -------------------------------------------------------
107        "distinct" => {
108            let cols = if args.is_empty() {
109                view.column_names().iter().map(|s| s.to_string()).collect::<Vec<_>>()
110            } else {
111                value_to_str_vec(&args[0])?
112            };
113            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
114            let new_view = view.distinct(&col_refs).map_err(|e| format!("{e}"))?;
115            Ok(Some(wrap_view(new_view)))
116        }
117
118        // -- slice family ---------------------------------------------------
119        "slice" => {
120            if args.len() != 2 {
121                return Err("TidyView.slice requires 2 arguments: start, end".into());
122            }
123            let start = value_to_usize(&args[0])?;
124            let end = value_to_usize(&args[1])?;
125            Ok(Some(wrap_view(view.slice(start, end))))
126        }
127        "slice_head" => {
128            if args.len() != 1 {
129                return Err("TidyView.slice_head requires 1 argument: n".into());
130            }
131            let n = value_to_usize(&args[0])?;
132            Ok(Some(wrap_view(view.slice_head(n))))
133        }
134        "slice_tail" => {
135            if args.len() != 1 {
136                return Err("TidyView.slice_tail requires 1 argument: n".into());
137            }
138            let n = value_to_usize(&args[0])?;
139            Ok(Some(wrap_view(view.slice_tail(n))))
140        }
141        "slice_sample" => {
142            if args.len() != 2 {
143                return Err("TidyView.slice_sample requires 2 arguments: n, seed".into());
144            }
145            let n = value_to_usize(&args[0])?;
146            let seed = match &args[1] {
147                Value::Int(i) => *i as u64,
148                _ => return Err("slice_sample seed must be Int".into()),
149            };
150            Ok(Some(wrap_view(view.slice_sample(n, seed))))
151        }
152
153        // -- joins ----------------------------------------------------------
154        "inner_join" | "left_join" | "semi_join" | "anti_join" | "full_join" => {
155            dispatch_join(view, args, method)
156        }
157
158        // -- reshape --------------------------------------------------------
159        "pivot_longer" => {
160            if args.len() < 2 || args.len() > 3 {
161                return Err(
162                    "TidyView.pivot_longer requires 2-3 args: cols, names_to, [values_to]".into(),
163                );
164            }
165            let cols = value_to_str_vec(&args[0])?;
166            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
167            let names_to = value_to_string(&args[1])?;
168            let values_to = if args.len() == 3 {
169                value_to_string(&args[2])?
170            } else {
171                "value".to_string()
172            };
173            let frame = view
174                .pivot_longer(&col_refs, &names_to, &values_to)
175                .map_err(|e| format!("{e}"))?;
176            Ok(Some(wrap_view(frame.view())))
177        }
178        "pivot_wider" => {
179            if args.len() != 3 {
180                return Err(
181                    "TidyView.pivot_wider requires 3 args: id_cols, names_from, values_from"
182                        .into(),
183                );
184            }
185            let id_cols = value_to_str_vec(&args[0])?;
186            let id_refs: Vec<&str> = id_cols.iter().map(|s| s.as_str()).collect();
187            let names_from = value_to_string(&args[1])?;
188            let values_from = value_to_string(&args[2])?;
189            let nullable_frame = view
190                .pivot_wider(&id_refs, &names_from, &values_from)
191                .map_err(|e| format!("{e}"))?;
192            // NullableFrame → fill nulls with defaults → TidyView
193            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
194        }
195
196        // -- rename / relocate / drop_cols / bind ----------------------------
197        "rename" => {
198            if args.len() != 1 {
199                return Err("TidyView.rename requires 1 argument: array of [old, new] pairs".into());
200            }
201            let pairs = value_to_rename_pairs(&args[0])?;
202            let pair_refs: Vec<(&str, &str)> =
203                pairs.iter().map(|(a, b)| (a.as_str(), b.as_str())).collect();
204            let new_view = view.rename(&pair_refs).map_err(|e| format!("{e}"))?;
205            Ok(Some(wrap_view(new_view)))
206        }
207        "drop_cols" => {
208            if args.len() != 1 {
209                return Err("TidyView.drop_cols requires 1 argument: column names array".into());
210            }
211            let cols = value_to_str_vec(&args[0])?;
212            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
213            let new_view = view.drop_cols(&col_refs).map_err(|e| format!("{e}"))?;
214            Ok(Some(wrap_view(new_view)))
215        }
216        "bind_rows" => {
217            if args.len() != 1 {
218                return Err("TidyView.bind_rows requires 1 argument: other TidyView".into());
219            }
220            let other_rc = match &args[0] {
221                Value::TidyView(rc) => rc,
222                _ => return Err("bind_rows argument must be a TidyView".into()),
223            };
224            let other = downcast_view(other_rc)?;
225            let frame = view.bind_rows(other).map_err(|e| format!("{e}"))?;
226            Ok(Some(wrap_view(frame.view())))
227        }
228        "bind_cols" => {
229            if args.len() != 1 {
230                return Err("TidyView.bind_cols requires 1 argument: other TidyView".into());
231            }
232            let other_rc = match &args[0] {
233                Value::TidyView(rc) => rc,
234                _ => return Err("bind_cols argument must be a TidyView".into()),
235            };
236            let other = downcast_view(other_rc)?;
237            let frame = view.bind_cols(other).map_err(|e| format!("{e}"))?;
238            Ok(Some(wrap_view(frame.view())))
239        }
240
241        // -- column extraction / tensor -------------------------------------
242        "column" => {
243            if args.len() != 1 {
244                return Err("TidyView.column requires 1 argument: column_name".into());
245            }
246            let name = value_to_string(&args[0])?;
247            let df = view.materialize().map_err(|e| format!("{e}"))?;
248            let col = df
249                .get_column(&name)
250                .ok_or_else(|| format!("column '{}' not found", name))?;
251            Ok(Some(column_to_value(col)))
252        }
253        "to_tensor" => {
254            if args.len() != 1 {
255                return Err("TidyView.to_tensor requires 1 argument: column_names array".into());
256            }
257            let cols = value_to_str_vec(&args[0])?;
258            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
259            let t = view.to_tensor(&col_refs).map_err(|e| format!("{e}"))?;
260            Ok(Some(Value::Tensor(t)))
261        }
262
263        // -- materialize to DataFrame Struct --------------------------------
264        "collect" => {
265            let df = view.materialize().map_err(|e| format!("{e}"))?;
266            Ok(Some(dataframe_to_value(df)))
267        }
268
269        // -- print (for debugging) ------------------------------------------
270        "print" => {
271            let df = view.materialize().map_err(|e| format!("{e}"))?;
272            let s = format_dataframe(&df);
273            // Returning the formatted string; the caller is responsible for
274            // printing and capturing in output buffer.
275            Ok(Some(Value::String(Rc::new(s))))
276        }
277
278        // -- DataFrame inspection builtins -----------------------------------
279        "head" => {
280            let n = if args.is_empty() { 10 } else {
281                match &args[0] { Value::Int(n) => *n as usize, _ => return Err("head: argument must be Int".into()) }
282            };
283            let sliced = view.slice_head(n);
284            let df = sliced.materialize().map_err(|e| format!("{e}"))?;
285            let s = format_dataframe(&df);
286            Ok(Some(Value::String(Rc::new(s))))
287        }
288        "tail" => {
289            let n = if args.is_empty() { 10 } else {
290                match &args[0] { Value::Int(n) => *n as usize, _ => return Err("tail: argument must be Int".into()) }
291            };
292            let sliced = view.slice_tail(n);
293            let df = sliced.materialize().map_err(|e| format!("{e}"))?;
294            let s = format_dataframe(&df);
295            Ok(Some(Value::String(Rc::new(s))))
296        }
297        "shape" => {
298            let result = Value::Tuple(Rc::new(vec![
299                Value::Int(view.nrows() as i64),
300                Value::Int(view.ncols() as i64),
301            ]));
302            Ok(Some(result))
303        }
304        "columns" => {
305            // Alias for column_names — returns array of column name strings
306            let names: Vec<Value> = view
307                .column_names()
308                .into_iter()
309                .map(|s| Value::String(Rc::new(s.to_string())))
310                .collect();
311            Ok(Some(Value::Array(Rc::new(names))))
312        }
313        "dtypes" => {
314            // Returns a Struct mapping column_name → type_name
315            let df = view.materialize().map_err(|e| format!("{e}"))?;
316            let mut fields = std::collections::BTreeMap::new();
317            for (name, col) in &df.columns {
318                fields.insert(name.clone(), Value::String(Rc::new(col.type_name().to_string())));
319            }
320            Ok(Some(Value::Struct { name: "Dtypes".to_string(), fields }))
321        }
322        "describe" => {
323            let df = view.materialize().map_err(|e| format!("{e}"))?;
324            let s = format_describe(&df);
325            Ok(Some(Value::String(Rc::new(s))))
326        }
327        "glimpse" => {
328            let df = view.materialize().map_err(|e| format!("{e}"))?;
329            let s = format_glimpse(&df);
330            Ok(Some(Value::String(Rc::new(s))))
331        }
332
333        _ => Ok(None), // unknown method — caller falls through
334    }
335}
336
337/// Dispatch a method call on a `Value::GroupedTidyView`.
338pub fn dispatch_grouped_method(
339    inner: &Rc<dyn Any>,
340    method: &str,
341    args: &[Value],
342) -> Result<Option<Value>, String> {
343    let grouped = downcast_grouped(inner)?;
344    match method {
345        "ngroups" => Ok(Some(Value::Int(grouped.ngroups() as i64))),
346
347        "summarise" | "summarize" => {
348            if args.len() % 2 != 0 || args.is_empty() {
349                return Err(
350                    "summarise requires pairs of (name, agg) arguments".into(),
351                );
352            }
353            let mut assignments: Vec<(String, TidyAgg)> = Vec::new();
354            let mut i = 0;
355            while i < args.len() {
356                let name = value_to_string(&args[i])?;
357                let agg = value_to_tidy_agg(&args[i + 1])?;
358                assignments.push((name, agg));
359                i += 2;
360            }
361            let asg_refs: Vec<(&str, TidyAgg)> = assignments
362                .iter()
363                .map(|(n, a)| (n.as_str(), a.clone()))
364                .collect();
365            let frame = grouped.summarise(&asg_refs).map_err(|e| format!("{e}"))?;
366            Ok(Some(wrap_view(frame.view())))
367        }
368
369        "ungroup" => {
370            let view = grouped.clone().ungroup();
371            Ok(Some(wrap_view(view)))
372        }
373
374        _ => Ok(None),
375    }
376}
377
378// ============================================================================
379//  Helpers — Value ↔ cjc_data conversions
380// ============================================================================
381
382fn downcast_view(inner: &Rc<dyn Any>) -> Result<&TidyView, String> {
383    inner
384        .downcast_ref::<TidyView>()
385        .ok_or_else(|| "internal error: TidyView downcast failed".to_string())
386}
387
388fn downcast_grouped(inner: &Rc<dyn Any>) -> Result<&GroupedTidyView, String> {
389    inner
390        .downcast_ref::<GroupedTidyView>()
391        .ok_or_else(|| "internal error: GroupedTidyView downcast failed".to_string())
392}
393
394/// Wrap a `TidyView` into `Value::TidyView`.
395pub fn wrap_view(view: TidyView) -> Value {
396    Value::TidyView(Rc::new(view) as Rc<dyn Any>)
397}
398
399/// Wrap a `GroupedTidyView` into `Value::GroupedTidyView`.
400pub fn wrap_grouped(grouped: GroupedTidyView) -> Value {
401    Value::GroupedTidyView(Rc::new(grouped) as Rc<dyn Any>)
402}
403
404/// Convert `Value::String` → `String`.
405fn value_to_string(v: &Value) -> Result<String, String> {
406    match v {
407        Value::String(s) => Ok(s.as_ref().clone()),
408        _ => Err(format!("expected String, got {}", v.type_name())),
409    }
410}
411
412/// Convert `Value::Int` → `usize`.
413fn value_to_usize(v: &Value) -> Result<usize, String> {
414    match v {
415        Value::Int(i) if *i >= 0 => Ok(*i as usize),
416        Value::Int(i) => Err(format!("expected non-negative Int, got {i}")),
417        _ => Err(format!("expected Int, got {}", v.type_name())),
418    }
419}
420
421/// Convert `Value::Array([String, ...])` → `Vec<String>`.
422fn value_to_str_vec(v: &Value) -> Result<Vec<String>, String> {
423    match v {
424        Value::Array(arr) => arr
425            .iter()
426            .map(|v| match v {
427                Value::String(s) => Ok(s.as_ref().clone()),
428                _ => Err(format!("expected String in array, got {}", v.type_name())),
429            })
430            .collect(),
431        _ => Err(format!("expected Array, got {}", v.type_name())),
432    }
433}
434
435/// Parse a `Value::Struct { name: "DExpr", ... }` into a `DExpr`.
436///
437/// The CJC language constructs DExpr values via helper builtins:
438///   col("name")        → Struct { name: "DExpr", kind: "col", value: "name" }
439///   binop(">", l, r)   → Struct { name: "DExpr", kind: "binop", op: ">", left: l, right: r }
440///   lit_int(42)         → Struct { name: "DExpr", kind: "lit_int", value: 42 }
441///   etc.
442///
443/// For ergonomic use, we also accept raw literals directly:
444///   Value::Int(42)      → DExpr::LitInt(42)
445///   Value::Float(3.14)  → DExpr::LitFloat(3.14)
446///   Value::Bool(true)   → DExpr::LitBool(true)
447///   Value::String("x")  → DExpr::Col("x")   -- shorthand for col("x")
448pub fn value_to_dexpr(v: &Value) -> Result<DExpr, String> {
449    match v {
450        // Literal shorthand
451        Value::Int(i) => Ok(DExpr::LitInt(*i)),
452        Value::Float(f) => Ok(DExpr::LitFloat(*f)),
453        Value::Bool(b) => Ok(DExpr::LitBool(*b)),
454        Value::String(s) => Ok(DExpr::Col(s.as_ref().clone())),
455        // Struct-encoded DExpr
456        Value::Struct { name, fields } if name == "DExpr" => {
457            let kind = fields
458                .get("kind")
459                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
460                .ok_or("DExpr struct missing 'kind' string field")?;
461            match kind {
462                "col" => {
463                    let col_name = fields
464                        .get("value")
465                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
466                        .ok_or("DExpr col missing 'value' string field")?;
467                    Ok(DExpr::Col(col_name))
468                }
469                "lit_int" => {
470                    let val = fields
471                        .get("value")
472                        .and_then(|v| if let Value::Int(i) = v { Some(*i) } else { None })
473                        .ok_or("DExpr lit_int missing 'value' int field")?;
474                    Ok(DExpr::LitInt(val))
475                }
476                "lit_float" => {
477                    let val = fields
478                        .get("value")
479                        .and_then(|v| if let Value::Float(f) = v { Some(*f) } else { None })
480                        .ok_or("DExpr lit_float missing 'value' float field")?;
481                    Ok(DExpr::LitFloat(val))
482                }
483                "lit_bool" => {
484                    let val = fields
485                        .get("value")
486                        .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
487                        .ok_or("DExpr lit_bool missing 'value' bool field")?;
488                    Ok(DExpr::LitBool(val))
489                }
490                "lit_str" => {
491                    let val = fields
492                        .get("value")
493                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
494                        .ok_or("DExpr lit_str missing 'value' string field")?;
495                    Ok(DExpr::LitStr(val))
496                }
497                "binop" => {
498                    let op_str = fields
499                        .get("op")
500                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
501                        .ok_or("DExpr binop missing 'op' field")?;
502                    let op = parse_binop(op_str)?;
503                    let left = fields.get("left").ok_or("DExpr binop missing 'left'")?;
504                    let right = fields.get("right").ok_or("DExpr binop missing 'right'")?;
505                    Ok(DExpr::BinOp {
506                        op,
507                        left: Box::new(value_to_dexpr(left)?),
508                        right: Box::new(value_to_dexpr(right)?),
509                    })
510                }
511                "count" => Ok(DExpr::Count),
512                other => Err(format!("unknown DExpr kind: {other}")),
513            }
514        }
515        _ => Err(format!(
516            "cannot convert {} to DExpr (expected DExpr struct, Int, Float, Bool, or String)",
517            v.type_name()
518        )),
519    }
520}
521
522fn parse_binop(s: &str) -> Result<DBinOp, String> {
523    match s {
524        "+" | "add" => Ok(DBinOp::Add),
525        "-" | "sub" => Ok(DBinOp::Sub),
526        "*" | "mul" => Ok(DBinOp::Mul),
527        "/" | "div" => Ok(DBinOp::Div),
528        ">" | "gt" => Ok(DBinOp::Gt),
529        "<" | "lt" => Ok(DBinOp::Lt),
530        ">=" | "ge" => Ok(DBinOp::Ge),
531        "<=" | "le" => Ok(DBinOp::Le),
532        "==" | "eq" => Ok(DBinOp::Eq),
533        "!=" | "ne" => Ok(DBinOp::Ne),
534        "&&" | "and" => Ok(DBinOp::And),
535        "||" | "or" => Ok(DBinOp::Or),
536        other => Err(format!("unknown binop: {other}")),
537    }
538}
539
540/// Parse a `Value::Struct` representing a TidyAgg, e.g.:
541///   Struct { name: "TidyAgg", kind: "sum", col: "salary" }
542///   Struct { name: "TidyAgg", kind: "count" }
543fn value_to_tidy_agg(v: &Value) -> Result<TidyAgg, String> {
544    match v {
545        Value::Struct { name, fields } if name == "TidyAgg" => {
546            let kind = fields
547                .get("kind")
548                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
549                .ok_or("TidyAgg struct missing 'kind' string")?;
550            match kind {
551                "count" => Ok(TidyAgg::Count),
552                "sum" | "mean" | "min" | "max" | "first" | "last"
553                | "median" | "sd" | "var" | "n_distinct" | "iqr" => {
554                    let col = fields
555                        .get("col")
556                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
557                        .ok_or_else(|| format!("TidyAgg {kind} missing 'col' string"))?;
558                    match kind {
559                        "sum" => Ok(TidyAgg::Sum(col)),
560                        "mean" => Ok(TidyAgg::Mean(col)),
561                        "min" => Ok(TidyAgg::Min(col)),
562                        "max" => Ok(TidyAgg::Max(col)),
563                        "first" => Ok(TidyAgg::First(col)),
564                        "last" => Ok(TidyAgg::Last(col)),
565                        "median" => Ok(TidyAgg::Median(col)),
566                        "sd" => Ok(TidyAgg::Sd(col)),
567                        "var" => Ok(TidyAgg::Var(col)),
568                        "n_distinct" => Ok(TidyAgg::NDistinct(col)),
569                        "iqr" => Ok(TidyAgg::Iqr(col)),
570                        _ => unreachable!(),
571                    }
572                }
573                "quantile" => {
574                    let col = fields
575                        .get("col")
576                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
577                        .ok_or("TidyAgg quantile missing 'col' string")?;
578                    let p = fields
579                        .get("p")
580                        .and_then(|v| match v {
581                            Value::Float(f) => Some(*f),
582                            Value::Int(i) => Some(*i as f64),
583                            _ => None,
584                        })
585                        .ok_or("TidyAgg quantile missing 'p' float")?;
586                    Ok(TidyAgg::Quantile(col, p))
587                }
588                other => Err(format!("unknown TidyAgg kind: {other}")),
589            }
590        }
591        _ => Err(format!("expected TidyAgg struct, got {}", v.type_name())),
592    }
593}
594
595/// Parse ArrangeKey array. Each element can be:
596///   - String "col_name"       → ascending
597///   - Struct { name: "ArrangeKey", col: "name", desc: bool }
598fn value_to_arrange_keys(v: &Value) -> Result<Vec<ArrangeKey>, String> {
599    match v {
600        Value::Array(arr) => {
601            let mut keys = Vec::with_capacity(arr.len());
602            for item in arr.iter() {
603                match item {
604                    Value::String(s) => keys.push(ArrangeKey::asc(s)),
605                    Value::Struct { name, fields } if name == "ArrangeKey" => {
606                        let col = fields
607                            .get("col")
608                            .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
609                            .ok_or("ArrangeKey missing 'col'")?;
610                        let desc = fields
611                            .get("desc")
612                            .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
613                            .unwrap_or(false);
614                        keys.push(if desc { ArrangeKey::desc(col) } else { ArrangeKey::asc(col) });
615                    }
616                    _ => return Err(format!("arrange key must be String or ArrangeKey struct, got {}", item.type_name())),
617                }
618            }
619            Ok(keys)
620        }
621        _ => Err(format!("arrange requires Array of keys, got {}", v.type_name())),
622    }
623}
624
625/// Parse rename pairs from `[["old","new"], ["old2","new2"]]`.
626fn value_to_rename_pairs(v: &Value) -> Result<Vec<(String, String)>, String> {
627    match v {
628        Value::Array(arr) => {
629            let mut pairs = Vec::with_capacity(arr.len());
630            for item in arr.iter() {
631                match item {
632                    Value::Array(pair) if pair.len() == 2 => {
633                        let old = value_to_string(&pair[0])?;
634                        let new = value_to_string(&pair[1])?;
635                        pairs.push((old, new));
636                    }
637                    _ => return Err("rename pairs must be arrays of [old, new] strings".into()),
638                }
639            }
640            Ok(pairs)
641        }
642        _ => Err(format!("rename requires Array of pairs, got {}", v.type_name())),
643    }
644}
645
646// ============================================================================
647//  Join dispatcher
648// ============================================================================
649
650/// Dispatch inner_join / left_join / semi_join / anti_join.
651///
652/// The CJC API is: `view.inner_join(other, left_on, right_on)`.
653/// The Rust API is: `view.inner_join(&other, &[(&left_on, &right_on)])`.
654fn dispatch_join(
655    view: &TidyView,
656    args: &[Value],
657    kind: &str,
658) -> Result<Option<Value>, String> {
659    if args.len() != 3 {
660        return Err(format!(
661            "TidyView.{kind} requires 3 args: other_view, left_on, right_on"
662        ));
663    }
664    let other_rc = match &args[0] {
665        Value::TidyView(rc) => rc,
666        _ => return Err(format!("{kind}: first arg must be a TidyView")),
667    };
668    let other = downcast_view(other_rc)?;
669    let left_on = value_to_string(&args[1])?;
670    let right_on = value_to_string(&args[2])?;
671    let on_pairs: Vec<(&str, &str)> = vec![(&left_on, &right_on)];
672
673    match kind {
674        "inner_join" => {
675            let frame = view.inner_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
676            Ok(Some(wrap_view(frame.view())))
677        }
678        "left_join" => {
679            let frame = view.left_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
680            Ok(Some(wrap_view(frame.view())))
681        }
682        "semi_join" => {
683            let new_view = view.semi_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
684            Ok(Some(wrap_view(new_view)))
685        }
686        "anti_join" => {
687            let new_view = view.anti_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
688            Ok(Some(wrap_view(new_view)))
689        }
690        "full_join" => {
691            let suffix = crate::JoinSuffix::default();
692            let nullable_frame = view.full_join(other, &on_pairs, &suffix).map_err(|e| format!("{e}"))?;
693            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
694        }
695        _ => Ok(None),
696    }
697}
698
699// ============================================================================
700//  Column → Value conversion
701// ============================================================================
702
703/// Convert a `Column` to a `Value::Array`.
704fn column_to_value(col: &Column) -> Value {
705    let vals: Vec<Value> = match col {
706        Column::Int(v) => v.iter().map(|i| Value::Int(*i)).collect(),
707        Column::Float(v) => v.iter().map(|f| Value::Float(*f)).collect(),
708        Column::Str(v) => v
709            .iter()
710            .map(|s| Value::String(Rc::new(s.clone())))
711            .collect(),
712        Column::Bool(v) => v.iter().map(|b| Value::Bool(*b)).collect(),
713        Column::Categorical { levels, codes } => codes
714            .iter()
715            .map(|&c| Value::String(Rc::new(levels[c as usize].clone())))
716            .collect(),
717        Column::DateTime(v) => v.iter().map(|i| Value::Int(*i)).collect(),
718    };
719    Value::Array(Rc::new(vals))
720}
721
722// ============================================================================
723//  DataFrame → Value (for .collect())
724// ============================================================================
725
726/// Convert a `DataFrame` to the legacy `Value::Struct { name: "DataFrame" }`
727/// representation used by existing CJC code.
728pub fn dataframe_to_value(df: DataFrame) -> Value {
729    let mut fields = std::collections::BTreeMap::new();
730    let mut col_names: Vec<Value> = Vec::new();
731    let nrows = df.nrows();
732    for (name, col) in &df.columns {
733        col_names.push(Value::String(Rc::new(name.clone())));
734        fields.insert(name.clone(), column_to_value(col));
735    }
736    fields.insert(
737        "__columns".to_string(),
738        Value::Array(Rc::new(col_names)),
739    );
740    fields.insert("__nrows".to_string(), Value::Int(nrows as i64));
741    Value::Struct {
742        name: "DataFrame".to_string(),
743        fields,
744    }
745}
746
747/// Produce a human-readable table-formatted string from a DataFrame.
748fn format_dataframe(df: &DataFrame) -> String {
749    let ncols = df.ncols();
750    let nrows = df.nrows();
751    if ncols == 0 {
752        return "DataFrame(0x0)".to_string();
753    }
754
755    // Column names
756    let names: Vec<&str> = df.columns.iter().map(|(n, _)| n.as_str()).collect();
757
758    // Compute widths
759    let mut widths: Vec<usize> = names.iter().map(|n| n.len()).collect();
760    let display_rows = nrows.min(20); // cap at 20 rows for display
761    let mut cells: Vec<Vec<String>> = Vec::with_capacity(display_rows);
762    for r in 0..display_rows {
763        let mut row: Vec<String> = Vec::with_capacity(ncols);
764        for (ci, (_, col)) in df.columns.iter().enumerate() {
765            let s = col.get_display(r);
766            if s.len() > widths[ci] {
767                widths[ci] = s.len();
768            }
769            row.push(s);
770        }
771        cells.push(row);
772    }
773
774    let mut out = String::new();
775    // Header
776    for (ci, name) in names.iter().enumerate() {
777        if ci > 0 { out.push_str("  "); }
778        out.push_str(&format!("{:>width$}", name, width = widths[ci]));
779    }
780    out.push('\n');
781    // Rows
782    for row in &cells {
783        for (ci, cell) in row.iter().enumerate() {
784            if ci > 0 { out.push_str("  "); }
785            out.push_str(&format!("{:>width$}", cell, width = widths[ci]));
786        }
787        out.push('\n');
788    }
789    if nrows > display_rows {
790        out.push_str(&format!("... ({} more rows)\n", nrows - display_rows));
791    }
792    out
793}
794
795/// Produce a statistical summary (like R's `summary()` or pandas `.describe()`).
796///
797/// For numeric columns: count, mean, std, min, 25%, 50%, 75%, max.
798/// For string/bool columns: count, unique, top (most frequent).
799fn format_describe(df: &DataFrame) -> String {
800    use cjc_repro::KahanAccumulatorF64;
801    let nrows = df.nrows();
802    let mut out = String::new();
803    out.push_str(&format!("DataFrame: {} rows x {} columns\n\n", nrows, df.ncols()));
804
805    for (name, col) in &df.columns {
806        out.push_str(&format!("── {} ({}) ──\n", name, col.type_name()));
807        match col {
808            Column::Int(v) => {
809                if v.is_empty() {
810                    out.push_str("  (empty)\n");
811                    continue;
812                }
813                let mut sorted = v.clone();
814                sorted.sort();
815                let mut acc = KahanAccumulatorF64::new();
816                for &x in v { acc.add(x as f64); }
817                let mean = acc.finalize() / nrows as f64;
818                // Variance via second pass (Welford-like but simple two-pass for determinism)
819                let mut var_acc = KahanAccumulatorF64::new();
820                for &x in v { let d = x as f64 - mean; var_acc.add(d * d); }
821                let std = if nrows > 1 { (var_acc.finalize() / (nrows - 1) as f64).sqrt() } else { 0.0 };
822                out.push_str(&format!("  count: {}\n", nrows));
823                out.push_str(&format!("  mean:  {:.4}\n", mean));
824                out.push_str(&format!("  std:   {:.4}\n", std));
825                out.push_str(&format!("  min:   {}\n", sorted[0]));
826                out.push_str(&format!("  25%:   {}\n", sorted[nrows / 4]));
827                out.push_str(&format!("  50%:   {}\n", sorted[nrows / 2]));
828                out.push_str(&format!("  75%:   {}\n", sorted[3 * nrows / 4]));
829                out.push_str(&format!("  max:   {}\n", sorted[nrows - 1]));
830            }
831            Column::Float(v) => {
832                if v.is_empty() {
833                    out.push_str("  (empty)\n");
834                    continue;
835                }
836                let mut sorted = v.clone();
837                sorted.sort_by(|a, b| a.total_cmp(b));
838                let mut acc = KahanAccumulatorF64::new();
839                for &x in v { acc.add(x); }
840                let mean = acc.finalize() / nrows as f64;
841                let mut var_acc = KahanAccumulatorF64::new();
842                for &x in v { let d = x - mean; var_acc.add(d * d); }
843                let std = if nrows > 1 { (var_acc.finalize() / (nrows - 1) as f64).sqrt() } else { 0.0 };
844                out.push_str(&format!("  count: {}\n", nrows));
845                out.push_str(&format!("  mean:  {:.4}\n", mean));
846                out.push_str(&format!("  std:   {:.4}\n", std));
847                out.push_str(&format!("  min:   {:.4}\n", sorted[0]));
848                out.push_str(&format!("  25%:   {:.4}\n", sorted[nrows / 4]));
849                out.push_str(&format!("  50%:   {:.4}\n", sorted[nrows / 2]));
850                out.push_str(&format!("  75%:   {:.4}\n", sorted[3 * nrows / 4]));
851                out.push_str(&format!("  max:   {:.4}\n", sorted[nrows - 1]));
852            }
853            Column::Str(v) => {
854                let mut freq = std::collections::BTreeMap::new();
855                for s in v { *freq.entry(s.as_str()).or_insert(0usize) += 1; }
856                let unique = freq.len();
857                let top = freq.iter().max_by_key(|(_, &c)| c).map(|(s, _)| *s).unwrap_or("");
858                out.push_str(&format!("  count:  {}\n", nrows));
859                out.push_str(&format!("  unique: {}\n", unique));
860                out.push_str(&format!("  top:    {}\n", top));
861            }
862            Column::Bool(v) => {
863                let trues = v.iter().filter(|&&b| b).count();
864                out.push_str(&format!("  count: {}\n", nrows));
865                out.push_str(&format!("  true:  {}\n", trues));
866                out.push_str(&format!("  false: {}\n", nrows - trues));
867            }
868            Column::Categorical { levels, codes } => {
869                let n_levels = levels.len();
870                let mut freq = std::collections::BTreeMap::new();
871                for &c in codes { *freq.entry(c).or_insert(0usize) += 1; }
872                let top_code = freq.iter().max_by_key(|(_, &c)| c).map(|(&k, _)| k).unwrap_or(0);
873                let top = if (top_code as usize) < levels.len() { &levels[top_code as usize] } else { "?" };
874                out.push_str(&format!("  count:  {}\n", nrows));
875                out.push_str(&format!("  levels: {}\n", n_levels));
876                out.push_str(&format!("  top:    {}\n", top));
877            }
878            Column::DateTime(v) => {
879                if v.is_empty() {
880                    out.push_str("  (empty)\n");
881                    continue;
882                }
883                let mut sorted = v.clone();
884                sorted.sort();
885                out.push_str(&format!("  count: {}\n", nrows));
886                out.push_str(&format!("  min:   {} (epoch ms)\n", sorted[0]));
887                out.push_str(&format!("  max:   {} (epoch ms)\n", sorted[nrows - 1]));
888            }
889        }
890    }
891    out
892}
893
894/// Produce a transposed glimpse (like dplyr::glimpse() or tibble printing).
895///
896/// Shows each column as a row: name, type, and first few values.
897fn format_glimpse(df: &DataFrame) -> String {
898    let nrows = df.nrows();
899    let ncols = df.ncols();
900    let mut out = String::new();
901    out.push_str(&format!("Rows: {}\nColumns: {}\n", nrows, ncols));
902
903    // Find max column name width for alignment
904    let max_name_w = df.columns.iter().map(|(n, _)| n.len()).max().unwrap_or(0);
905    let max_type_w = df.columns.iter().map(|(_, c)| c.type_name().len()).max().unwrap_or(0);
906
907    let preview_count = nrows.min(8);
908    for (name, col) in &df.columns {
909        out.push_str(&format!("$ {:width_n$} <{:width_t$}>  ",
910            name, col.type_name(),
911            width_n = max_name_w, width_t = max_type_w));
912        let mut vals = Vec::with_capacity(preview_count);
913        for i in 0..preview_count {
914            vals.push(col.get_display(i));
915        }
916        out.push_str(&vals.join(", "));
917        if nrows > preview_count {
918            out.push_str(", ...");
919        }
920        out.push('\n');
921    }
922    out
923}
924
925// ============================================================================
926//  DExpr builder builtins (col, binop, agg, etc.)
927// ============================================================================
928
929/// Build a `Value::Struct { name: "DExpr", kind: "col", ... }` from a column name.
930pub fn build_col_expr(name: &str) -> Value {
931    let mut fields = std::collections::BTreeMap::new();
932    fields.insert("kind".to_string(), Value::String(Rc::new("col".to_string())));
933    fields.insert("value".to_string(), Value::String(Rc::new(name.to_string())));
934    Value::Struct { name: "DExpr".to_string(), fields }
935}
936
937/// Build a DExpr binary operation.
938pub fn build_binop_expr(op: &str, left: Value, right: Value) -> Value {
939    let mut fields = std::collections::BTreeMap::new();
940    fields.insert("kind".to_string(), Value::String(Rc::new("binop".to_string())));
941    fields.insert("op".to_string(), Value::String(Rc::new(op.to_string())));
942    fields.insert("left".to_string(), left);
943    fields.insert("right".to_string(), right);
944    Value::Struct { name: "DExpr".to_string(), fields }
945}
946
947/// Build a TidyAgg struct value.
948pub fn build_tidy_agg(kind: &str, col: Option<&str>) -> Value {
949    let mut fields = std::collections::BTreeMap::new();
950    fields.insert("kind".to_string(), Value::String(Rc::new(kind.to_string())));
951    if let Some(c) = col {
952        fields.insert("col".to_string(), Value::String(Rc::new(c.to_string())));
953    }
954    Value::Struct { name: "TidyAgg".to_string(), fields }
955}
956
957/// Build an ArrangeKey struct value.
958pub fn build_arrange_key(col: &str, descending: bool) -> Value {
959    let mut fields = std::collections::BTreeMap::new();
960    fields.insert("col".to_string(), Value::String(Rc::new(col.to_string())));
961    fields.insert("desc".to_string(), Value::Bool(descending));
962    Value::Struct { name: "ArrangeKey".to_string(), fields }
963}
964
965/// Dispatch builder builtins like `col()`, `desc()`, `asc()`, `sum()`, `mean()`, etc.
966/// Returns `Ok(Some(value))` if recognised, `Ok(None)` otherwise.
967pub fn dispatch_tidy_builtin(name: &str, args: &[Value]) -> Result<Option<Value>, String> {
968    match name {
969        // DExpr builders
970        "col" => {
971            if args.len() != 1 {
972                return Err("col() requires 1 argument: column name".into());
973            }
974            let name = value_to_string(&args[0])?;
975            Ok(Some(build_col_expr(&name)))
976        }
977        "desc" => {
978            if args.len() != 1 {
979                return Err("desc() requires 1 argument: column name".into());
980            }
981            let name = value_to_string(&args[0])?;
982            Ok(Some(build_arrange_key(&name, true)))
983        }
984        "asc" => {
985            if args.len() != 1 {
986                return Err("asc() requires 1 argument: column name".into());
987            }
988            let name = value_to_string(&args[0])?;
989            Ok(Some(build_arrange_key(&name, false)))
990        }
991        // DExpr binary op builder
992        "dexpr_binop" => {
993            if args.len() != 3 {
994                return Err("dexpr_binop() requires 3 args: op, left, right".into());
995            }
996            let op = value_to_string(&args[0])?;
997            Ok(Some(build_binop_expr(&op, args[1].clone(), args[2].clone())))
998        }
999
1000        // TidyAgg builders
1001        "tidy_count" => Ok(Some(build_tidy_agg("count", None))),
1002        "tidy_sum" => {
1003            if args.len() != 1 { return Err("tidy_sum() requires 1 argument: column name".into()); }
1004            let col = value_to_string(&args[0])?;
1005            Ok(Some(build_tidy_agg("sum", Some(&col))))
1006        }
1007        "tidy_mean" => {
1008            if args.len() != 1 { return Err("tidy_mean() requires 1 argument: column name".into()); }
1009            let col = value_to_string(&args[0])?;
1010            Ok(Some(build_tidy_agg("mean", Some(&col))))
1011        }
1012        "tidy_min" => {
1013            if args.len() != 1 { return Err("tidy_min() requires 1 argument: column name".into()); }
1014            let col = value_to_string(&args[0])?;
1015            Ok(Some(build_tidy_agg("min", Some(&col))))
1016        }
1017        "tidy_max" => {
1018            if args.len() != 1 { return Err("tidy_max() requires 1 argument: column name".into()); }
1019            let col = value_to_string(&args[0])?;
1020            Ok(Some(build_tidy_agg("max", Some(&col))))
1021        }
1022        "tidy_first" => {
1023            if args.len() != 1 { return Err("tidy_first() requires 1 argument: column name".into()); }
1024            let col = value_to_string(&args[0])?;
1025            Ok(Some(build_tidy_agg("first", Some(&col))))
1026        }
1027        "tidy_last" => {
1028            if args.len() != 1 { return Err("tidy_last() requires 1 argument: column name".into()); }
1029            let col = value_to_string(&args[0])?;
1030            Ok(Some(build_tidy_agg("last", Some(&col))))
1031        }
1032
1033        // =====================================================================
1034        //  stringr builtins — byte-first string view approach
1035        //
1036        //  CJC strings are UTF-8 byte sequences. These functions operate on the
1037        //  byte representation via cjc-regex's Thompson NFA. Where possible,
1038        //  results are slices (zero-copy views) of the input. Allocation happens
1039        //  only when replacement or splitting creates new buffers.
1040        //
1041        //  Key design point: patterns are compiled fresh per call. For hot-loop
1042        //  use, prefer the compiled Regex value type (regex literal `/pattern/`).
1043        // =====================================================================
1044
1045        "str_detect" => {
1046            // str_detect(haystack, pattern) → bool
1047            if args.len() != 2 { return Err("str_detect requires 2 args: string, pattern".into()); }
1048            let hay = value_to_string(&args[0])?;
1049            let pat = value_to_string(&args[1])?;
1050            let matched = cjc_regex::is_match(&pat, "", hay.as_bytes());
1051            Ok(Some(Value::Bool(matched)))
1052        }
1053        "str_extract" => {
1054            // str_extract(haystack, pattern) → string (first match) or ""
1055            if args.len() != 2 { return Err("str_extract requires 2 args: string, pattern".into()); }
1056            let hay = value_to_string(&args[0])?;
1057            let pat = value_to_string(&args[1])?;
1058            match cjc_regex::find(&pat, "", hay.as_bytes()) {
1059                Some((start, end)) => {
1060                    let slice = &hay.as_bytes()[start..end];
1061                    let s = String::from_utf8_lossy(slice).to_string();
1062                    Ok(Some(Value::String(Rc::new(s))))
1063                }
1064                None => Ok(Some(Value::String(Rc::new(String::new())))),
1065            }
1066        }
1067        "str_extract_all" => {
1068            // str_extract_all(haystack, pattern) → [string]
1069            if args.len() != 2 { return Err("str_extract_all requires 2 args: string, pattern".into()); }
1070            let hay = value_to_string(&args[0])?;
1071            let pat = value_to_string(&args[1])?;
1072            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
1073            let vals: Vec<Value> = matches
1074                .iter()
1075                .map(|&(start, end)| {
1076                    let slice = &hay.as_bytes()[start..end];
1077                    Value::String(Rc::new(String::from_utf8_lossy(slice).to_string()))
1078                })
1079                .collect();
1080            Ok(Some(Value::Array(Rc::new(vals))))
1081        }
1082        "str_replace" => {
1083            // str_replace(haystack, pattern, replacement) → string (first match replaced)
1084            if args.len() != 3 { return Err("str_replace requires 3 args: string, pattern, replacement".into()); }
1085            let hay = value_to_string(&args[0])?;
1086            let pat = value_to_string(&args[1])?;
1087            let rep = value_to_string(&args[2])?;
1088            match cjc_regex::find(&pat, "", hay.as_bytes()) {
1089                Some((start, end)) => {
1090                    let mut result = String::with_capacity(hay.len());
1091                    result.push_str(&hay[..start]);
1092                    result.push_str(&rep);
1093                    result.push_str(&hay[end..]);
1094                    Ok(Some(Value::String(Rc::new(result))))
1095                }
1096                None => Ok(Some(Value::String(Rc::new(hay)))),
1097            }
1098        }
1099        "str_replace_all" => {
1100            // str_replace_all(haystack, pattern, replacement) → string (all matches replaced)
1101            if args.len() != 3 { return Err("str_replace_all requires 3 args: string, pattern, replacement".into()); }
1102            let hay = value_to_string(&args[0])?;
1103            let pat = value_to_string(&args[1])?;
1104            let rep = value_to_string(&args[2])?;
1105            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
1106            if matches.is_empty() {
1107                return Ok(Some(Value::String(Rc::new(hay))));
1108            }
1109            let mut result = String::with_capacity(hay.len());
1110            let mut last_end = 0;
1111            for &(start, end) in &matches {
1112                result.push_str(&hay[last_end..start]);
1113                result.push_str(&rep);
1114                last_end = end;
1115            }
1116            result.push_str(&hay[last_end..]);
1117            Ok(Some(Value::String(Rc::new(result))))
1118        }
1119        "str_split" => {
1120            // str_split(haystack, pattern) → [string]
1121            if args.len() != 2 { return Err("str_split requires 2 args: string, pattern".into()); }
1122            let hay = value_to_string(&args[0])?;
1123            let pat = value_to_string(&args[1])?;
1124            let spans = cjc_regex::split(&pat, "", hay.as_bytes());
1125            let vals: Vec<Value> = spans
1126                .iter()
1127                .map(|&(start, end)| {
1128                    Value::String(Rc::new(
1129                        String::from_utf8_lossy(&hay.as_bytes()[start..end]).to_string(),
1130                    ))
1131                })
1132                .collect();
1133            Ok(Some(Value::Array(Rc::new(vals))))
1134        }
1135        "str_count" => {
1136            // str_count(haystack, pattern) → int (number of matches)
1137            if args.len() != 2 { return Err("str_count requires 2 args: string, pattern".into()); }
1138            let hay = value_to_string(&args[0])?;
1139            let pat = value_to_string(&args[1])?;
1140            let count = cjc_regex::find_all(&pat, "", hay.as_bytes()).len();
1141            Ok(Some(Value::Int(count as i64)))
1142        }
1143        "str_trim" => {
1144            // str_trim(string) → string with leading/trailing whitespace removed
1145            if args.len() != 1 { return Err("str_trim requires 1 arg: string".into()); }
1146            let s = value_to_string(&args[0])?;
1147            Ok(Some(Value::String(Rc::new(s.trim().to_string()))))
1148        }
1149        "str_to_upper" => {
1150            if args.len() != 1 { return Err("str_to_upper requires 1 arg: string".into()); }
1151            let s = value_to_string(&args[0])?;
1152            Ok(Some(Value::String(Rc::new(s.to_uppercase()))))
1153        }
1154        "str_to_lower" => {
1155            if args.len() != 1 { return Err("str_to_lower requires 1 arg: string".into()); }
1156            let s = value_to_string(&args[0])?;
1157            Ok(Some(Value::String(Rc::new(s.to_lowercase()))))
1158        }
1159        "str_starts" => {
1160            if args.len() != 2 { return Err("str_starts requires 2 args: string, prefix".into()); }
1161            let s = value_to_string(&args[0])?;
1162            let prefix = value_to_string(&args[1])?;
1163            Ok(Some(Value::Bool(s.starts_with(&prefix))))
1164        }
1165        "str_ends" => {
1166            if args.len() != 2 { return Err("str_ends requires 2 args: string, suffix".into()); }
1167            let s = value_to_string(&args[0])?;
1168            let suffix = value_to_string(&args[1])?;
1169            Ok(Some(Value::Bool(s.ends_with(&suffix))))
1170        }
1171        "str_sub" => {
1172            // str_sub(string, start, end) → substring (byte-indexed, clamped)
1173            if args.len() != 3 { return Err("str_sub requires 3 args: string, start, end".into()); }
1174            let s = value_to_string(&args[0])?;
1175            let start = value_to_usize(&args[1])?.min(s.len());
1176            let end = value_to_usize(&args[2])?.min(s.len());
1177            if start > end {
1178                Ok(Some(Value::String(Rc::new(String::new()))))
1179            } else {
1180                // Clamp to char boundaries for safety
1181                let actual_start = clamp_to_char_boundary(&s, start);
1182                let actual_end = clamp_to_char_boundary(&s, end);
1183                Ok(Some(Value::String(Rc::new(s[actual_start..actual_end].to_string()))))
1184            }
1185        }
1186        "str_len" => {
1187            // str_len(string) → int (byte length, consistent with byte-first view)
1188            if args.len() != 1 { return Err("str_len requires 1 arg: string".into()); }
1189            let s = value_to_string(&args[0])?;
1190            Ok(Some(Value::Int(s.len() as i64)))
1191        }
1192
1193        // =====================================================================
1194        //  Stats builtins (operate on Array of numbers)
1195        // =====================================================================
1196
1197        "median" => {
1198            if args.len() != 1 { return Err("median requires 1 arg: numeric array".into()); }
1199            let nums = value_to_f64_vec(&args[0])?;
1200            if nums.is_empty() {
1201                return Ok(Some(Value::Float(f64::NAN)));
1202            }
1203            let mut sorted = nums;
1204            sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1205            let mid = sorted.len() / 2;
1206            let med = if sorted.len() % 2 == 0 {
1207                (sorted[mid - 1] + sorted[mid]) / 2.0
1208            } else {
1209                sorted[mid]
1210            };
1211            Ok(Some(Value::Float(med)))
1212        }
1213        "sd" => {
1214            // Population standard deviation
1215            if args.len() != 1 { return Err("sd requires 1 arg: numeric array".into()); }
1216            let nums = value_to_f64_vec(&args[0])?;
1217            if nums.len() < 2 {
1218                return Ok(Some(Value::Float(f64::NAN)));
1219            }
1220            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1221            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1222                / (nums.len() - 1) as f64;
1223            Ok(Some(Value::Float(var.sqrt())))
1224        }
1225        "variance" => {
1226            // Sample variance (N-1 denominator)
1227            if args.len() != 1 { return Err("variance requires 1 arg: numeric array".into()); }
1228            let nums = value_to_f64_vec(&args[0])?;
1229            if nums.len() < 2 {
1230                return Ok(Some(Value::Float(f64::NAN)));
1231            }
1232            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1233            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1234                / (nums.len() - 1) as f64;
1235            Ok(Some(Value::Float(var)))
1236        }
1237        "n_distinct" => {
1238            // Count distinct values in an array
1239            if args.len() != 1 { return Err("n_distinct requires 1 arg: array".into()); }
1240            match &args[0] {
1241                Value::Array(arr) => {
1242                    let mut seen = std::collections::BTreeSet::new();
1243                    for v in arr.iter() {
1244                        seen.insert(format!("{v}"));
1245                    }
1246                    Ok(Some(Value::Int(seen.len() as i64)))
1247                }
1248                _ => Err(format!("n_distinct expects Array, got {}", args[0].type_name())),
1249            }
1250        }
1251
1252        // =====================================================================
1253        //  DataFrame free-standing builtins (ITEM 1)
1254        //
1255        //  These wrap TidyView method calls so CJC code can write:
1256        //    pivot_wider(df, ["id"], "measure", "value")
1257        //  instead of (or in addition to) the method form:
1258        //    df.pivot_wider(["id"], "measure", "value")
1259        //
1260        //  All take a `Value::TidyView` as their first argument and re-use the
1261        //  existing method dispatch internally. This keeps the implementation a
1262        //  single source of truth.
1263        // =====================================================================
1264
1265        // ------------------------------------------------------------------
1266        // df_read_csv(path) or df_read_csv(path, delimiter) → TidyView
1267        // ------------------------------------------------------------------
1268        "df_read_csv" => {
1269            if args.len() < 1 || args.len() > 2 {
1270                return Err("df_read_csv requires 1-2 arguments (path[, delimiter])".into());
1271            }
1272            let path = match &args[0] {
1273                Value::String(s) => s.as_ref().clone(),
1274                _ => return Err(format!("df_read_csv: path must be String, got {}", args[0].type_name())),
1275            };
1276            let delim: u8 = if args.len() == 2 {
1277                match &args[1] {
1278                    Value::String(s) if !s.is_empty() => s.as_bytes()[0],
1279                    _ => return Err("df_read_csv: delimiter must be a non-empty String".into()),
1280                }
1281            } else {
1282                b','
1283            };
1284            let bytes = std::fs::read(&path)
1285                .map_err(|e| format!("df_read_csv: {}", e))?;
1286            let config = CsvConfig { delimiter: delim, ..CsvConfig::default() };
1287            let df = CsvReader::new(config)
1288                .parse(&bytes)
1289                .map_err(|e| format!("df_read_csv: {}", e))?;
1290            Ok(Some(wrap_view(TidyView::from_df(df))))
1291        }
1292
1293        // ------------------------------------------------------------------
1294        // pivot_wider(df, id_cols, names_from, values_from) → TidyView
1295        // ------------------------------------------------------------------
1296        "pivot_wider" => {
1297            if args.len() != 4 {
1298                return Err(
1299                    "pivot_wider requires 4 arguments (df, id_cols, names_from, values_from)".into(),
1300                );
1301            }
1302            let view = value_to_tidy_view(&args[0])?;
1303            let id_cols = value_to_str_vec(&args[1])?;
1304            let id_refs: Vec<&str> = id_cols.iter().map(|s| s.as_str()).collect();
1305            let names_from = value_to_string(&args[2])?;
1306            let values_from = value_to_string(&args[3])?;
1307            let nullable_frame = view
1308                .pivot_wider(&id_refs, &names_from, &values_from)
1309                .map_err(|e| format!("{e}"))?;
1310            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
1311        }
1312
1313        // ------------------------------------------------------------------
1314        // pivot_longer(df, cols, names_to, values_to) → TidyView
1315        // ------------------------------------------------------------------
1316        "pivot_longer" => {
1317            if args.len() < 3 || args.len() > 4 {
1318                return Err(
1319                    "pivot_longer requires 3-4 arguments (df, cols, names_to[, values_to])".into(),
1320                );
1321            }
1322            let view = value_to_tidy_view(&args[0])?;
1323            let cols = value_to_str_vec(&args[1])?;
1324            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
1325            let names_to = value_to_string(&args[2])?;
1326            let values_to = if args.len() == 4 {
1327                value_to_string(&args[3])?
1328            } else {
1329                "value".to_string()
1330            };
1331            let frame = view
1332                .pivot_longer(&col_refs, &names_to, &values_to)
1333                .map_err(|e| format!("{e}"))?;
1334            Ok(Some(wrap_view(frame.view())))
1335        }
1336
1337        // ------------------------------------------------------------------
1338        // df_distinct(df) or df_distinct(df, cols) → TidyView
1339        // ------------------------------------------------------------------
1340        "df_distinct" => {
1341            if args.is_empty() || args.len() > 2 {
1342                return Err("df_distinct requires 1-2 arguments (df[, cols])".into());
1343            }
1344            let view = value_to_tidy_view(&args[0])?;
1345            let cols = if args.len() == 2 {
1346                value_to_str_vec(&args[1])?
1347            } else {
1348                view.column_names().iter().map(|s| s.to_string()).collect()
1349            };
1350            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
1351            let new_view = view.distinct(&col_refs).map_err(|e| format!("{e}"))?;
1352            Ok(Some(wrap_view(new_view)))
1353        }
1354
1355        // ------------------------------------------------------------------
1356        // df_rename(df, old_name, new_name) → TidyView
1357        // ------------------------------------------------------------------
1358        "df_rename" => {
1359            if args.len() != 3 {
1360                return Err("df_rename requires 3 arguments (df, old_name, new_name)".into());
1361            }
1362            let view = value_to_tidy_view(&args[0])?;
1363            let old = value_to_string(&args[1])?;
1364            let new = value_to_string(&args[2])?;
1365            let pair_refs: Vec<(&str, &str)> = vec![(&old, &new)];
1366            let new_view = view.rename(&pair_refs).map_err(|e| format!("{e}"))?;
1367            Ok(Some(wrap_view(new_view)))
1368        }
1369
1370        // ------------------------------------------------------------------
1371        // df_anti_join(df1, df2, on) → TidyView
1372        // df_semi_join(df1, df2, on) → TidyView
1373        // df_full_join(df1, df2, on) → TidyView
1374        //
1375        // `on` = String (single key, same name in both) or
1376        //        Array of Strings (multi-key, same names in both).
1377        // ------------------------------------------------------------------
1378        "df_anti_join" | "df_semi_join" | "df_full_join" => {
1379            if args.len() != 3 {
1380                return Err(format!(
1381                    "{name} requires 3 arguments (df1, df2, on)"
1382                ));
1383            }
1384            let left = value_to_tidy_view(&args[0])?;
1385            let right_rc = match &args[1] {
1386                Value::TidyView(rc) => rc,
1387                _ => return Err(format!("{name}: second argument must be a TidyView")),
1388            };
1389            let right_inner: &Rc<dyn std::any::Any> = right_rc;
1390            let right = right_inner
1391                .downcast_ref::<TidyView>()
1392                .ok_or_else(|| "internal: TidyView downcast failed".to_string())?;
1393            // Parse `on`: single string or array of strings
1394            let on_keys: Vec<String> = match &args[2] {
1395                Value::String(s) => vec![s.as_ref().clone()],
1396                Value::Array(arr) => arr
1397                    .iter()
1398                    .map(|v| match v {
1399                        Value::String(s) => Ok(s.as_ref().clone()),
1400                        _ => Err(format!("on: expected String keys, got {}", v.type_name())),
1401                    })
1402                    .collect::<Result<Vec<_>, _>>()?,
1403                _ => return Err(format!("{name}: `on` must be String or Array of Strings")),
1404            };
1405            let on_pairs: Vec<(&str, &str)> = on_keys.iter().map(|k| (k.as_str(), k.as_str())).collect();
1406            match name {
1407                "df_anti_join" => {
1408                    let new_view = left.anti_join(right, &on_pairs).map_err(|e| format!("{e}"))?;
1409                    Ok(Some(wrap_view(new_view)))
1410                }
1411                "df_semi_join" => {
1412                    let new_view = left.semi_join(right, &on_pairs).map_err(|e| format!("{e}"))?;
1413                    Ok(Some(wrap_view(new_view)))
1414                }
1415                "df_full_join" => {
1416                    let suffix = crate::JoinSuffix::default();
1417                    let nullable_frame = left.full_join(right, &on_pairs, &suffix)
1418                        .map_err(|e| format!("{e}"))?;
1419                    Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
1420                }
1421                _ => Ok(None),
1422            }
1423        }
1424
1425        // ------------------------------------------------------------------
1426        // df_fill_na(df, col_name, fill_val) → TidyView
1427        //
1428        // Fills NA/null values in the specified column with `fill_val`.
1429        // Works by materializing, patching the column, and re-wrapping.
1430        // ------------------------------------------------------------------
1431        "df_fill_na" => {
1432            if args.len() != 3 {
1433                return Err("df_fill_na requires 3 arguments (df, col_name, fill_val)".into());
1434            }
1435            let view = value_to_tidy_view(&args[0])?;
1436            let col_name = value_to_string(&args[1])?;
1437            let fill_val = &args[2];
1438
1439            let mut df = view.materialize().map_err(|e| format!("{e}"))?;
1440            let col_idx = df.columns.iter().position(|(n, _)| n == &col_name)
1441                .ok_or_else(|| format!("df_fill_na: column '{}' not found", col_name))?;
1442
1443            let filled_col = match &df.columns[col_idx].1 {
1444                Column::Int(v) => {
1445                    // Int columns have no inline NA representation in the
1446                    // dense storage; NullableColumn nulls are materialised as 0
1447                    // by to_tidy_view_filled.  Accept the argument for API
1448                    // consistency but leave the column unchanged.
1449                    let _fill = match fill_val {
1450                        Value::Int(i) => *i,
1451                        Value::Float(f) => *f as i64,
1452                        _ => return Err("df_fill_na: fill value must be numeric for Int column".into()),
1453                    };
1454                    Column::Int(v.clone())
1455                }
1456                Column::Float(v) => {
1457                    let fill = match fill_val {
1458                        Value::Float(f) => *f,
1459                        Value::Int(i) => *i as f64,
1460                        _ => return Err("df_fill_na: fill value must be numeric for Float column".into()),
1461                    };
1462                    Column::Float(v.iter().map(|&x| if x.is_nan() { fill } else { x }).collect())
1463                }
1464                Column::Str(v) => {
1465                    let fill = match fill_val {
1466                        Value::String(s) => s.as_ref().clone(),
1467                        other => format!("{other}"),
1468                    };
1469                    Column::Str(v.iter().map(|s| {
1470                        if s == "NA" || s.is_empty() { fill.clone() } else { s.clone() }
1471                    }).collect())
1472                }
1473                Column::Bool(v) => Column::Bool(v.clone()),
1474                Column::Categorical { levels, codes } => Column::Categorical { levels: levels.clone(), codes: codes.clone() },
1475                Column::DateTime(v) => Column::DateTime(v.clone()),
1476            };
1477            df.columns[col_idx].1 = filled_col;
1478            Ok(Some(wrap_view(TidyView::from_df(df))))
1479        }
1480
1481        // ------------------------------------------------------------------
1482        // df_drop_na(df) or df_drop_na(df, cols) → TidyView
1483        //
1484        // Drops rows that contain NA in the specified columns (all by default).
1485        // Uses a filter predicate over the visible rows.
1486        // ------------------------------------------------------------------
1487        "df_drop_na" => {
1488            if args.is_empty() || args.len() > 2 {
1489                return Err("df_drop_na requires 1-2 arguments (df[, cols])".into());
1490            }
1491            let view = value_to_tidy_view(&args[0])?;
1492            let target_cols: Vec<String> = if args.len() == 2 {
1493                value_to_str_vec(&args[1])?
1494            } else {
1495                view.column_names().iter().map(|s| s.to_string()).collect()
1496            };
1497
1498            // Materialise once, then filter row by row
1499            let df = view.materialize().map_err(|e| format!("{e}"))?;
1500            let nrows = df.nrows();
1501
1502            // For each target column, find which rows are NA
1503            let mut keep = vec![true; nrows];
1504            for col_name in &target_cols {
1505                if let Some(col) = df.get_column(col_name) {
1506                    for r in 0..nrows {
1507                        if !keep[r] { continue; }
1508                        let na = match col {
1509                            Column::Float(v) => v[r].is_nan(),
1510                            Column::Str(v) => v[r] == "NA" || v[r].is_empty(),
1511                            _ => false,
1512                        };
1513                        if na { keep[r] = false; }
1514                    }
1515                } else {
1516                    return Err(format!("df_drop_na: column '{}' not found", col_name));
1517                }
1518            }
1519
1520            // Build new DataFrame from kept rows
1521            let mut new_cols: Vec<(String, Column)> = Vec::with_capacity(df.columns.len());
1522            for (name, col) in &df.columns {
1523                let new_col = match col {
1524                    Column::Int(v)       => Column::Int(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1525                    Column::Float(v)     => Column::Float(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1526                    Column::Str(v)       => Column::Str(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| x.clone()).collect()),
1527                    Column::Bool(v)      => Column::Bool(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1528                    Column::DateTime(v)  => Column::DateTime(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1529                    Column::Categorical { levels, codes } => Column::Categorical {
1530                        levels: levels.clone(),
1531                        codes: codes.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect(),
1532                    },
1533                };
1534                new_cols.push((name.clone(), new_col));
1535            }
1536            let new_df = DataFrame::from_columns(new_cols)
1537                .map_err(|e| format!("df_drop_na: {e}"))?;
1538            Ok(Some(wrap_view(TidyView::from_df(new_df))))
1539        }
1540
1541        _ => Ok(None),
1542    }
1543}
1544
1545/// Helper: extract a `&TidyView` reference from a `Value::TidyView`.
1546fn value_to_tidy_view(v: &Value) -> Result<&TidyView, String> {
1547    match v {
1548        Value::TidyView(rc) => rc
1549            .downcast_ref::<TidyView>()
1550            .ok_or_else(|| "internal: TidyView downcast failed".to_string()),
1551        _ => Err(format!(
1552            "expected TidyView (use df.view() to convert a DataFrame), got {}",
1553            v.type_name()
1554        )),
1555    }
1556}
1557
1558/// Clamp a byte index to the nearest char boundary (round down).
1559fn clamp_to_char_boundary(s: &str, idx: usize) -> usize {
1560    if idx >= s.len() {
1561        return s.len();
1562    }
1563    let mut i = idx;
1564    while i > 0 && !s.is_char_boundary(i) {
1565        i -= 1;
1566    }
1567    i
1568}
1569
1570/// Convert a Value::Array of numbers to Vec<f64>.
1571fn value_to_f64_vec(v: &Value) -> Result<Vec<f64>, String> {
1572    match v {
1573        Value::Array(arr) => {
1574            arr.iter()
1575                .map(|v| match v {
1576                    Value::Float(f) => Ok(*f),
1577                    Value::Int(i) => Ok(*i as f64),
1578                    _ => Err(format!("expected numeric value in array, got {}", v.type_name())),
1579                })
1580                .collect()
1581        }
1582        _ => Err(format!("expected Array, got {}", v.type_name())),
1583    }
1584}