Skip to main content

cjc_data/
tidy_dispatch.rs

1//! Shared tidy dispatch: maps CJC language method calls on TidyView /
2//! GroupedTidyView values to the concrete cjc_data API.
3//!
4//! Both `cjc-eval` and `cjc-mir-exec` call into `dispatch_tidy_method` and
5//! `dispatch_grouped_method` so that every tidy operation has a single source
6//! of truth.  The executors only need to pattern-match `Value::TidyView` or
7//! `Value::GroupedTidyView` and delegate here.
8//!
9//! # Error handling
10//! All errors are returned as `Err(String)`.  The caller wraps the string
11//! into its own error type (EvalError / MirExecError).
12
13use std::rc::Rc;
14use std::any::Any;
15
16use cjc_runtime::value::Value;
17
18use crate::{
19    ArrangeKey, Column, CsvConfig, CsvReader, DExpr, DBinOp, DataFrame, GroupedTidyView,
20    TidyAgg, TidyView,
21};
22
23// ============================================================================
24//  Public entry points
25// ============================================================================
26
27/// Dispatch a method call on a `Value::TidyView`.
28///
29/// Returns `Ok(Some(value))` if the method is known, `Ok(None)` if not
30/// recognised (allows the caller to fall through to other dispatch paths).
31pub fn dispatch_tidy_method(
32    inner: &Rc<dyn Any>,
33    method: &str,
34    args: &[Value],
35) -> Result<Option<Value>, String> {
36    let view = downcast_view(inner)?;
37    match method {
38        // -- shape ----------------------------------------------------------
39        "nrows" => Ok(Some(Value::Int(view.nrows() as i64))),
40        "ncols" => Ok(Some(Value::Int(view.ncols() as i64))),
41        "column_names" => {
42            let names: Vec<Value> = view
43                .column_names()
44                .into_iter()
45                .map(|s| Value::String(Rc::new(s.to_string())))
46                .collect();
47            Ok(Some(Value::Array(Rc::new(names))))
48        }
49
50        // -- filter ---------------------------------------------------------
51        "filter" => {
52            if args.len() != 1 {
53                return Err("TidyView.filter requires 1 argument: predicate DExpr".into());
54            }
55            let predicate = value_to_dexpr(&args[0])?;
56            let new_view = view.filter(&predicate).map_err(|e| format!("{e}"))?;
57            Ok(Some(wrap_view(new_view)))
58        }
59
60        // -- select ---------------------------------------------------------
61        "select" => {
62            if args.len() != 1 {
63                return Err("TidyView.select requires 1 argument: column names array".into());
64            }
65            let cols = value_to_str_vec(&args[0])?;
66            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
67            let new_view = view.select(&col_refs).map_err(|e| format!("{e}"))?;
68            Ok(Some(wrap_view(new_view)))
69        }
70
71        // -- mutate ---------------------------------------------------------
72        "mutate" => {
73            // mutate(name, expr) or mutate([(name, expr), ...])
74            // We support: mutate("col_name", dexpr_value)
75            if args.len() != 2 {
76                return Err("TidyView.mutate requires 2 arguments: column_name and expression".into());
77            }
78            let col_name = value_to_string(&args[0])?;
79            let expr = value_to_dexpr(&args[1])?;
80            let frame = view.mutate(&[(&col_name, expr)]).map_err(|e| format!("{e}"))?;
81            // mutate returns TidyFrame; convert to TidyView for pipeline continuity
82            Ok(Some(wrap_view(frame.view())))
83        }
84
85        // -- group_by -------------------------------------------------------
86        "group_by" => {
87            if args.len() != 1 {
88                return Err("TidyView.group_by requires 1 argument: key columns array".into());
89            }
90            let keys = value_to_str_vec(&args[0])?;
91            let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
92            let grouped = view.group_by(&key_refs).map_err(|e| format!("{e}"))?;
93            Ok(Some(wrap_grouped(grouped)))
94        }
95
96        // -- arrange --------------------------------------------------------
97        "arrange" => {
98            if args.len() != 1 {
99                return Err("TidyView.arrange requires 1 argument: sort keys array".into());
100            }
101            let keys = value_to_arrange_keys(&args[0])?;
102            let new_view = view.arrange(&keys).map_err(|e| format!("{e}"))?;
103            Ok(Some(wrap_view(new_view)))
104        }
105
106        // -- distinct -------------------------------------------------------
107        "distinct" => {
108            let cols = if args.is_empty() {
109                view.column_names().iter().map(|s| s.to_string()).collect::<Vec<_>>()
110            } else {
111                value_to_str_vec(&args[0])?
112            };
113            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
114            let new_view = view.distinct(&col_refs).map_err(|e| format!("{e}"))?;
115            Ok(Some(wrap_view(new_view)))
116        }
117
118        // -- slice family ---------------------------------------------------
119        "slice" => {
120            if args.len() != 2 {
121                return Err("TidyView.slice requires 2 arguments: start, end".into());
122            }
123            let start = value_to_usize(&args[0])?;
124            let end = value_to_usize(&args[1])?;
125            Ok(Some(wrap_view(view.slice(start, end))))
126        }
127        "slice_head" => {
128            if args.len() != 1 {
129                return Err("TidyView.slice_head requires 1 argument: n".into());
130            }
131            let n = value_to_usize(&args[0])?;
132            Ok(Some(wrap_view(view.slice_head(n))))
133        }
134        "slice_tail" => {
135            if args.len() != 1 {
136                return Err("TidyView.slice_tail requires 1 argument: n".into());
137            }
138            let n = value_to_usize(&args[0])?;
139            Ok(Some(wrap_view(view.slice_tail(n))))
140        }
141        "slice_sample" => {
142            if args.len() != 2 {
143                return Err("TidyView.slice_sample requires 2 arguments: n, seed".into());
144            }
145            let n = value_to_usize(&args[0])?;
146            let seed = match &args[1] {
147                Value::Int(i) => *i as u64,
148                _ => return Err("slice_sample seed must be Int".into()),
149            };
150            Ok(Some(wrap_view(view.slice_sample(n, seed))))
151        }
152
153        // -- joins ----------------------------------------------------------
154        "inner_join" | "left_join" | "semi_join" | "anti_join" | "full_join" => {
155            dispatch_join(view, args, method)
156        }
157
158        // -- reshape --------------------------------------------------------
159        "pivot_longer" => {
160            if args.len() < 2 || args.len() > 3 {
161                return Err(
162                    "TidyView.pivot_longer requires 2-3 args: cols, names_to, [values_to]".into(),
163                );
164            }
165            let cols = value_to_str_vec(&args[0])?;
166            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
167            let names_to = value_to_string(&args[1])?;
168            let values_to = if args.len() == 3 {
169                value_to_string(&args[2])?
170            } else {
171                "value".to_string()
172            };
173            let frame = view
174                .pivot_longer(&col_refs, &names_to, &values_to)
175                .map_err(|e| format!("{e}"))?;
176            Ok(Some(wrap_view(frame.view())))
177        }
178        "pivot_wider" => {
179            if args.len() != 3 {
180                return Err(
181                    "TidyView.pivot_wider requires 3 args: id_cols, names_from, values_from"
182                        .into(),
183                );
184            }
185            let id_cols = value_to_str_vec(&args[0])?;
186            let id_refs: Vec<&str> = id_cols.iter().map(|s| s.as_str()).collect();
187            let names_from = value_to_string(&args[1])?;
188            let values_from = value_to_string(&args[2])?;
189            let nullable_frame = view
190                .pivot_wider(&id_refs, &names_from, &values_from)
191                .map_err(|e| format!("{e}"))?;
192            // NullableFrame → fill nulls with defaults → TidyView
193            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
194        }
195
196        // -- rename / relocate / drop_cols / bind ----------------------------
197        "rename" => {
198            if args.len() != 1 {
199                return Err("TidyView.rename requires 1 argument: array of [old, new] pairs".into());
200            }
201            let pairs = value_to_rename_pairs(&args[0])?;
202            let pair_refs: Vec<(&str, &str)> =
203                pairs.iter().map(|(a, b)| (a.as_str(), b.as_str())).collect();
204            let new_view = view.rename(&pair_refs).map_err(|e| format!("{e}"))?;
205            Ok(Some(wrap_view(new_view)))
206        }
207        "drop_cols" => {
208            if args.len() != 1 {
209                return Err("TidyView.drop_cols requires 1 argument: column names array".into());
210            }
211            let cols = value_to_str_vec(&args[0])?;
212            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
213            let new_view = view.drop_cols(&col_refs).map_err(|e| format!("{e}"))?;
214            Ok(Some(wrap_view(new_view)))
215        }
216        "bind_rows" => {
217            if args.len() != 1 {
218                return Err("TidyView.bind_rows requires 1 argument: other TidyView".into());
219            }
220            let other_rc = match &args[0] {
221                Value::TidyView(rc) => rc,
222                _ => return Err("bind_rows argument must be a TidyView".into()),
223            };
224            let other = downcast_view(other_rc)?;
225            let frame = view.bind_rows(other).map_err(|e| format!("{e}"))?;
226            Ok(Some(wrap_view(frame.view())))
227        }
228        "bind_cols" => {
229            if args.len() != 1 {
230                return Err("TidyView.bind_cols requires 1 argument: other TidyView".into());
231            }
232            let other_rc = match &args[0] {
233                Value::TidyView(rc) => rc,
234                _ => return Err("bind_cols argument must be a TidyView".into()),
235            };
236            let other = downcast_view(other_rc)?;
237            let frame = view.bind_cols(other).map_err(|e| format!("{e}"))?;
238            Ok(Some(wrap_view(frame.view())))
239        }
240
241        // -- column extraction / tensor -------------------------------------
242        "column" => {
243            if args.len() != 1 {
244                return Err("TidyView.column requires 1 argument: column_name".into());
245            }
246            let name = value_to_string(&args[0])?;
247            let df = view.materialize().map_err(|e| format!("{e}"))?;
248            let col = df
249                .get_column(&name)
250                .ok_or_else(|| format!("column '{}' not found", name))?;
251            Ok(Some(column_to_value(col)))
252        }
253        "to_tensor" => {
254            if args.len() != 1 {
255                return Err("TidyView.to_tensor requires 1 argument: column_names array".into());
256            }
257            let cols = value_to_str_vec(&args[0])?;
258            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
259            let t = view.to_tensor(&col_refs).map_err(|e| format!("{e}"))?;
260            Ok(Some(Value::Tensor(t)))
261        }
262
263        // -- materialize to DataFrame Struct --------------------------------
264        "collect" => {
265            let df = view.materialize().map_err(|e| format!("{e}"))?;
266            Ok(Some(dataframe_to_value(df)))
267        }
268
269        // -- print (for debugging) ------------------------------------------
270        "print" => {
271            let df = view.materialize().map_err(|e| format!("{e}"))?;
272            let s = format_dataframe(&df);
273            // Returning the formatted string; the caller is responsible for
274            // printing and capturing in output buffer.
275            Ok(Some(Value::String(Rc::new(s))))
276        }
277
278        // -- DataFrame inspection builtins -----------------------------------
279        "head" => {
280            let n = if args.is_empty() { 10 } else {
281                match &args[0] { Value::Int(n) => *n as usize, _ => return Err("head: argument must be Int".into()) }
282            };
283            let sliced = view.slice_head(n);
284            let df = sliced.materialize().map_err(|e| format!("{e}"))?;
285            let s = format_dataframe(&df);
286            Ok(Some(Value::String(Rc::new(s))))
287        }
288        "tail" => {
289            let n = if args.is_empty() { 10 } else {
290                match &args[0] { Value::Int(n) => *n as usize, _ => return Err("tail: argument must be Int".into()) }
291            };
292            let sliced = view.slice_tail(n);
293            let df = sliced.materialize().map_err(|e| format!("{e}"))?;
294            let s = format_dataframe(&df);
295            Ok(Some(Value::String(Rc::new(s))))
296        }
297        "shape" => {
298            let result = Value::Tuple(Rc::new(vec![
299                Value::Int(view.nrows() as i64),
300                Value::Int(view.ncols() as i64),
301            ]));
302            Ok(Some(result))
303        }
304        "columns" => {
305            // Alias for column_names — returns array of column name strings
306            let names: Vec<Value> = view
307                .column_names()
308                .into_iter()
309                .map(|s| Value::String(Rc::new(s.to_string())))
310                .collect();
311            Ok(Some(Value::Array(Rc::new(names))))
312        }
313        "dtypes" => {
314            // Returns a Struct mapping column_name → type_name
315            let df = view.materialize().map_err(|e| format!("{e}"))?;
316            let mut fields = std::collections::BTreeMap::new();
317            for (name, col) in &df.columns {
318                fields.insert(name.clone(), Value::String(Rc::new(col.type_name().to_string())));
319            }
320            Ok(Some(Value::Struct { name: "Dtypes".to_string(), fields }))
321        }
322        "describe" => {
323            let df = view.materialize().map_err(|e| format!("{e}"))?;
324            let s = format_describe(&df);
325            Ok(Some(Value::String(Rc::new(s))))
326        }
327        "glimpse" => {
328            let df = view.materialize().map_err(|e| format!("{e}"))?;
329            let s = format_glimpse(&df);
330            Ok(Some(Value::String(Rc::new(s))))
331        }
332
333        _ => Ok(None), // unknown method — caller falls through
334    }
335}
336
337/// Dispatch a method call on a `Value::GroupedTidyView`.
338pub fn dispatch_grouped_method(
339    inner: &Rc<dyn Any>,
340    method: &str,
341    args: &[Value],
342) -> Result<Option<Value>, String> {
343    let grouped = downcast_grouped(inner)?;
344    match method {
345        "ngroups" => Ok(Some(Value::Int(grouped.ngroups() as i64))),
346
347        "summarise" | "summarize" => {
348            if args.len() % 2 != 0 || args.is_empty() {
349                return Err(
350                    "summarise requires pairs of (name, agg) arguments".into(),
351                );
352            }
353            let mut assignments: Vec<(String, TidyAgg)> = Vec::new();
354            let mut i = 0;
355            while i < args.len() {
356                let name = value_to_string(&args[i])?;
357                let agg = value_to_tidy_agg(&args[i + 1])?;
358                assignments.push((name, agg));
359                i += 2;
360            }
361            let asg_refs: Vec<(&str, TidyAgg)> = assignments
362                .iter()
363                .map(|(n, a)| (n.as_str(), a.clone()))
364                .collect();
365            let frame = grouped.summarise(&asg_refs).map_err(|e| format!("{e}"))?;
366            Ok(Some(wrap_view(frame.view())))
367        }
368
369        "ungroup" => {
370            let view = grouped.clone().ungroup();
371            Ok(Some(wrap_view(view)))
372        }
373
374        _ => Ok(None),
375    }
376}
377
378// ============================================================================
379//  Helpers — Value ↔ cjc_data conversions
380// ============================================================================
381
382fn downcast_view(inner: &Rc<dyn Any>) -> Result<&TidyView, String> {
383    inner
384        .downcast_ref::<TidyView>()
385        .ok_or_else(|| "internal error: TidyView downcast failed".to_string())
386}
387
388fn downcast_grouped(inner: &Rc<dyn Any>) -> Result<&GroupedTidyView, String> {
389    inner
390        .downcast_ref::<GroupedTidyView>()
391        .ok_or_else(|| "internal error: GroupedTidyView downcast failed".to_string())
392}
393
394/// Wrap a `TidyView` into `Value::TidyView`.
395pub fn wrap_view(view: TidyView) -> Value {
396    Value::TidyView(Rc::new(view) as Rc<dyn Any>)
397}
398
399/// Wrap a `GroupedTidyView` into `Value::GroupedTidyView`.
400pub fn wrap_grouped(grouped: GroupedTidyView) -> Value {
401    Value::GroupedTidyView(Rc::new(grouped) as Rc<dyn Any>)
402}
403
404/// Convert `Value::String` → `String`.
405fn value_to_string(v: &Value) -> Result<String, String> {
406    match v {
407        Value::String(s) => Ok(s.as_ref().clone()),
408        _ => Err(format!("expected String, got {}", v.type_name())),
409    }
410}
411
412/// Convert `Value::Int` → `usize`.
413fn value_to_usize(v: &Value) -> Result<usize, String> {
414    match v {
415        Value::Int(i) if *i >= 0 => Ok(*i as usize),
416        Value::Int(i) => Err(format!("expected non-negative Int, got {i}")),
417        _ => Err(format!("expected Int, got {}", v.type_name())),
418    }
419}
420
421/// Convert `Value::Array([String, ...])` → `Vec<String>`.
422fn value_to_str_vec(v: &Value) -> Result<Vec<String>, String> {
423    match v {
424        Value::Array(arr) => arr
425            .iter()
426            .map(|v| match v {
427                Value::String(s) => Ok(s.as_ref().clone()),
428                _ => Err(format!("expected String in array, got {}", v.type_name())),
429            })
430            .collect(),
431        _ => Err(format!("expected Array, got {}", v.type_name())),
432    }
433}
434
435/// Parse a `Value::Struct { name: "DExpr", ... }` into a `DExpr`.
436///
437/// The CJC language constructs DExpr values via helper builtins:
438///   col("name")        → Struct { name: "DExpr", kind: "col", value: "name" }
439///   binop(">", l, r)   → Struct { name: "DExpr", kind: "binop", op: ">", left: l, right: r }
440///   lit_int(42)         → Struct { name: "DExpr", kind: "lit_int", value: 42 }
441///   etc.
442///
443/// For ergonomic use, we also accept raw literals directly:
444///   Value::Int(42)      → DExpr::LitInt(42)
445///   Value::Float(3.14)  → DExpr::LitFloat(3.14)
446///   Value::Bool(true)   → DExpr::LitBool(true)
447///   Value::String("x")  → DExpr::Col("x")   -- shorthand for col("x")
448pub fn value_to_dexpr(v: &Value) -> Result<DExpr, String> {
449    match v {
450        // Literal shorthand
451        Value::Int(i) => Ok(DExpr::LitInt(*i)),
452        Value::Float(f) => Ok(DExpr::LitFloat(*f)),
453        Value::Bool(b) => Ok(DExpr::LitBool(*b)),
454        Value::String(s) => Ok(DExpr::Col(s.as_ref().clone())),
455        // Struct-encoded DExpr
456        Value::Struct { name, fields } if name == "DExpr" => {
457            let kind = fields
458                .get("kind")
459                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
460                .ok_or("DExpr struct missing 'kind' string field")?;
461            match kind {
462                "col" => {
463                    let col_name = fields
464                        .get("value")
465                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
466                        .ok_or("DExpr col missing 'value' string field")?;
467                    Ok(DExpr::Col(col_name))
468                }
469                "lit_int" => {
470                    let val = fields
471                        .get("value")
472                        .and_then(|v| if let Value::Int(i) = v { Some(*i) } else { None })
473                        .ok_or("DExpr lit_int missing 'value' int field")?;
474                    Ok(DExpr::LitInt(val))
475                }
476                "lit_float" => {
477                    let val = fields
478                        .get("value")
479                        .and_then(|v| if let Value::Float(f) = v { Some(*f) } else { None })
480                        .ok_or("DExpr lit_float missing 'value' float field")?;
481                    Ok(DExpr::LitFloat(val))
482                }
483                "lit_bool" => {
484                    let val = fields
485                        .get("value")
486                        .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
487                        .ok_or("DExpr lit_bool missing 'value' bool field")?;
488                    Ok(DExpr::LitBool(val))
489                }
490                "lit_str" => {
491                    let val = fields
492                        .get("value")
493                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
494                        .ok_or("DExpr lit_str missing 'value' string field")?;
495                    Ok(DExpr::LitStr(val))
496                }
497                "binop" => {
498                    let op_str = fields
499                        .get("op")
500                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
501                        .ok_or("DExpr binop missing 'op' field")?;
502                    let op = parse_binop(op_str)?;
503                    let left = fields.get("left").ok_or("DExpr binop missing 'left'")?;
504                    let right = fields.get("right").ok_or("DExpr binop missing 'right'")?;
505                    Ok(DExpr::BinOp {
506                        op,
507                        left: Box::new(value_to_dexpr(left)?),
508                        right: Box::new(value_to_dexpr(right)?),
509                    })
510                }
511                "count" => Ok(DExpr::Count),
512                other => Err(format!("unknown DExpr kind: {other}")),
513            }
514        }
515        _ => Err(format!(
516            "cannot convert {} to DExpr (expected DExpr struct, Int, Float, Bool, or String)",
517            v.type_name()
518        )),
519    }
520}
521
522fn parse_binop(s: &str) -> Result<DBinOp, String> {
523    match s {
524        "+" | "add" => Ok(DBinOp::Add),
525        "-" | "sub" => Ok(DBinOp::Sub),
526        "*" | "mul" => Ok(DBinOp::Mul),
527        "/" | "div" => Ok(DBinOp::Div),
528        ">" | "gt" => Ok(DBinOp::Gt),
529        "<" | "lt" => Ok(DBinOp::Lt),
530        ">=" | "ge" => Ok(DBinOp::Ge),
531        "<=" | "le" => Ok(DBinOp::Le),
532        "==" | "eq" => Ok(DBinOp::Eq),
533        "!=" | "ne" => Ok(DBinOp::Ne),
534        "&&" | "and" => Ok(DBinOp::And),
535        "||" | "or" => Ok(DBinOp::Or),
536        other => Err(format!("unknown binop: {other}")),
537    }
538}
539
540/// Parse a `Value::Struct` representing a TidyAgg, e.g.:
541///   Struct { name: "TidyAgg", kind: "sum", col: "salary" }
542///   Struct { name: "TidyAgg", kind: "count" }
543fn value_to_tidy_agg(v: &Value) -> Result<TidyAgg, String> {
544    match v {
545        Value::Struct { name, fields } if name == "TidyAgg" => {
546            let kind = fields
547                .get("kind")
548                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
549                .ok_or("TidyAgg struct missing 'kind' string")?;
550            match kind {
551                "count" => Ok(TidyAgg::Count),
552                "sum" | "mean" | "min" | "max" | "first" | "last"
553                | "median" | "sd" | "var" | "n_distinct" | "iqr" => {
554                    let col = fields
555                        .get("col")
556                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
557                        .ok_or_else(|| format!("TidyAgg {kind} missing 'col' string"))?;
558                    match kind {
559                        "sum" => Ok(TidyAgg::Sum(col)),
560                        "mean" => Ok(TidyAgg::Mean(col)),
561                        "min" => Ok(TidyAgg::Min(col)),
562                        "max" => Ok(TidyAgg::Max(col)),
563                        "first" => Ok(TidyAgg::First(col)),
564                        "last" => Ok(TidyAgg::Last(col)),
565                        "median" => Ok(TidyAgg::Median(col)),
566                        "sd" => Ok(TidyAgg::Sd(col)),
567                        "var" => Ok(TidyAgg::Var(col)),
568                        "n_distinct" => Ok(TidyAgg::NDistinct(col)),
569                        "iqr" => Ok(TidyAgg::Iqr(col)),
570                        _ => unreachable!(),
571                    }
572                }
573                "quantile" => {
574                    let col = fields
575                        .get("col")
576                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
577                        .ok_or("TidyAgg quantile missing 'col' string")?;
578                    let p = fields
579                        .get("p")
580                        .and_then(|v| match v {
581                            Value::Float(f) => Some(*f),
582                            Value::Int(i) => Some(*i as f64),
583                            _ => None,
584                        })
585                        .ok_or("TidyAgg quantile missing 'p' float")?;
586                    Ok(TidyAgg::Quantile(col, p))
587                }
588                other => Err(format!("unknown TidyAgg kind: {other}")),
589            }
590        }
591        _ => Err(format!("expected TidyAgg struct, got {}", v.type_name())),
592    }
593}
594
595/// Parse ArrangeKey array. Each element can be:
596///   - String "col_name"       → ascending
597///   - Struct { name: "ArrangeKey", col: "name", desc: bool }
598fn value_to_arrange_keys(v: &Value) -> Result<Vec<ArrangeKey>, String> {
599    match v {
600        Value::Array(arr) => {
601            let mut keys = Vec::with_capacity(arr.len());
602            for item in arr.iter() {
603                match item {
604                    Value::String(s) => keys.push(ArrangeKey::asc(s)),
605                    Value::Struct { name, fields } if name == "ArrangeKey" => {
606                        let col = fields
607                            .get("col")
608                            .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
609                            .ok_or("ArrangeKey missing 'col'")?;
610                        let desc = fields
611                            .get("desc")
612                            .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
613                            .unwrap_or(false);
614                        keys.push(if desc { ArrangeKey::desc(col) } else { ArrangeKey::asc(col) });
615                    }
616                    _ => return Err(format!("arrange key must be String or ArrangeKey struct, got {}", item.type_name())),
617                }
618            }
619            Ok(keys)
620        }
621        _ => Err(format!("arrange requires Array of keys, got {}", v.type_name())),
622    }
623}
624
625/// Parse rename pairs from `[["old","new"], ["old2","new2"]]`.
626fn value_to_rename_pairs(v: &Value) -> Result<Vec<(String, String)>, String> {
627    match v {
628        Value::Array(arr) => {
629            let mut pairs = Vec::with_capacity(arr.len());
630            for item in arr.iter() {
631                match item {
632                    Value::Array(pair) if pair.len() == 2 => {
633                        let old = value_to_string(&pair[0])?;
634                        let new = value_to_string(&pair[1])?;
635                        pairs.push((old, new));
636                    }
637                    _ => return Err("rename pairs must be arrays of [old, new] strings".into()),
638                }
639            }
640            Ok(pairs)
641        }
642        _ => Err(format!("rename requires Array of pairs, got {}", v.type_name())),
643    }
644}
645
646// ============================================================================
647//  Join dispatcher
648// ============================================================================
649
650/// Dispatch inner_join / left_join / semi_join / anti_join.
651///
652/// The CJC API is: `view.inner_join(other, left_on, right_on)`.
653/// The Rust API is: `view.inner_join(&other, &[(&left_on, &right_on)])`.
654fn dispatch_join(
655    view: &TidyView,
656    args: &[Value],
657    kind: &str,
658) -> Result<Option<Value>, String> {
659    if args.len() != 3 {
660        return Err(format!(
661            "TidyView.{kind} requires 3 args: other_view, left_on, right_on"
662        ));
663    }
664    let other_rc = match &args[0] {
665        Value::TidyView(rc) => rc,
666        _ => return Err(format!("{kind}: first arg must be a TidyView")),
667    };
668    let other = downcast_view(other_rc)?;
669    let left_on = value_to_string(&args[1])?;
670    let right_on = value_to_string(&args[2])?;
671    let on_pairs: Vec<(&str, &str)> = vec![(&left_on, &right_on)];
672
673    match kind {
674        "inner_join" => {
675            let frame = view.inner_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
676            Ok(Some(wrap_view(frame.view())))
677        }
678        "left_join" => {
679            let frame = view.left_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
680            Ok(Some(wrap_view(frame.view())))
681        }
682        "semi_join" => {
683            let new_view = view.semi_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
684            Ok(Some(wrap_view(new_view)))
685        }
686        "anti_join" => {
687            let new_view = view.anti_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
688            Ok(Some(wrap_view(new_view)))
689        }
690        "full_join" => {
691            let suffix = crate::JoinSuffix::default();
692            let nullable_frame = view.full_join(other, &on_pairs, &suffix).map_err(|e| format!("{e}"))?;
693            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
694        }
695        _ => Ok(None),
696    }
697}
698
699// ============================================================================
700//  Column → Value conversion
701// ============================================================================
702
703/// Convert a `Column` to a `Value::Array`.
704fn column_to_value(col: &Column) -> Value {
705    if matches!(col, Column::CategoricalAdaptive(_)) {
706        return column_to_value(&col.to_legacy_categorical());
707    }
708    let vals: Vec<Value> = match col {
709        Column::Int(v) => v.iter().map(|i| Value::Int(*i)).collect(),
710        Column::Float(v) => v.iter().map(|f| Value::Float(*f)).collect(),
711        Column::Str(v) => v
712            .iter()
713            .map(|s| Value::String(Rc::new(s.clone())))
714            .collect(),
715        Column::Bool(v) => v.iter().map(|b| Value::Bool(*b)).collect(),
716        Column::Categorical { levels, codes } => codes
717            .iter()
718            .map(|&c| Value::String(Rc::new(levels[c as usize].clone())))
719            .collect(),
720        Column::DateTime(v) => v.iter().map(|i| Value::Int(*i)).collect(),
721        Column::CategoricalAdaptive(_) => unreachable!("handled by early return"),
722    };
723    Value::Array(Rc::new(vals))
724}
725
726// ============================================================================
727//  DataFrame → Value (for .collect())
728// ============================================================================
729
730/// Convert a `DataFrame` to the legacy `Value::Struct { name: "DataFrame" }`
731/// representation used by existing CJC code.
732pub fn dataframe_to_value(df: DataFrame) -> Value {
733    let mut fields = std::collections::BTreeMap::new();
734    let mut col_names: Vec<Value> = Vec::new();
735    let nrows = df.nrows();
736    for (name, col) in &df.columns {
737        col_names.push(Value::String(Rc::new(name.clone())));
738        fields.insert(name.clone(), column_to_value(col));
739    }
740    fields.insert(
741        "__columns".to_string(),
742        Value::Array(Rc::new(col_names)),
743    );
744    fields.insert("__nrows".to_string(), Value::Int(nrows as i64));
745    Value::Struct {
746        name: "DataFrame".to_string(),
747        fields,
748    }
749}
750
751/// Produce a human-readable table-formatted string from a DataFrame.
752fn format_dataframe(df: &DataFrame) -> String {
753    let ncols = df.ncols();
754    let nrows = df.nrows();
755    if ncols == 0 {
756        return "DataFrame(0x0)".to_string();
757    }
758
759    // Column names
760    let names: Vec<&str> = df.columns.iter().map(|(n, _)| n.as_str()).collect();
761
762    // Compute widths
763    let mut widths: Vec<usize> = names.iter().map(|n| n.len()).collect();
764    let display_rows = nrows.min(20); // cap at 20 rows for display
765    let mut cells: Vec<Vec<String>> = Vec::with_capacity(display_rows);
766    for r in 0..display_rows {
767        let mut row: Vec<String> = Vec::with_capacity(ncols);
768        for (ci, (_, col)) in df.columns.iter().enumerate() {
769            let s = col.get_display(r);
770            if s.len() > widths[ci] {
771                widths[ci] = s.len();
772            }
773            row.push(s);
774        }
775        cells.push(row);
776    }
777
778    let mut out = String::new();
779    // Header
780    for (ci, name) in names.iter().enumerate() {
781        if ci > 0 { out.push_str("  "); }
782        out.push_str(&format!("{:>width$}", name, width = widths[ci]));
783    }
784    out.push('\n');
785    // Rows
786    for row in &cells {
787        for (ci, cell) in row.iter().enumerate() {
788            if ci > 0 { out.push_str("  "); }
789            out.push_str(&format!("{:>width$}", cell, width = widths[ci]));
790        }
791        out.push('\n');
792    }
793    if nrows > display_rows {
794        out.push_str(&format!("... ({} more rows)\n", nrows - display_rows));
795    }
796    out
797}
798
799/// Produce a statistical summary (like R's `summary()` or pandas `.describe()`).
800///
801/// For numeric columns: count, mean, std, min, 25%, 50%, 75%, max.
802/// For string/bool columns: count, unique, top (most frequent).
803fn format_describe(df: &DataFrame) -> String {
804    use cjc_repro::KahanAccumulatorF64;
805    let nrows = df.nrows();
806    let mut out = String::new();
807    out.push_str(&format!("DataFrame: {} rows x {} columns\n\n", nrows, df.ncols()));
808
809    for (name, col) in &df.columns {
810        out.push_str(&format!("── {} ({}) ──\n", name, col.type_name()));
811        match col {
812            Column::Int(v) => {
813                if v.is_empty() {
814                    out.push_str("  (empty)\n");
815                    continue;
816                }
817                let mut sorted = v.clone();
818                sorted.sort();
819                let mut acc = KahanAccumulatorF64::new();
820                for &x in v { acc.add(x as f64); }
821                let mean = acc.finalize() / nrows as f64;
822                // Variance via second pass (Welford-like but simple two-pass for determinism)
823                let mut var_acc = KahanAccumulatorF64::new();
824                for &x in v { let d = x as f64 - mean; var_acc.add(d * d); }
825                let std = if nrows > 1 { (var_acc.finalize() / (nrows - 1) as f64).sqrt() } else { 0.0 };
826                out.push_str(&format!("  count: {}\n", nrows));
827                out.push_str(&format!("  mean:  {:.4}\n", mean));
828                out.push_str(&format!("  std:   {:.4}\n", std));
829                out.push_str(&format!("  min:   {}\n", sorted[0]));
830                out.push_str(&format!("  25%:   {}\n", sorted[nrows / 4]));
831                out.push_str(&format!("  50%:   {}\n", sorted[nrows / 2]));
832                out.push_str(&format!("  75%:   {}\n", sorted[3 * nrows / 4]));
833                out.push_str(&format!("  max:   {}\n", sorted[nrows - 1]));
834            }
835            Column::Float(v) => {
836                if v.is_empty() {
837                    out.push_str("  (empty)\n");
838                    continue;
839                }
840                let mut sorted = v.clone();
841                sorted.sort_by(|a, b| a.total_cmp(b));
842                let mut acc = KahanAccumulatorF64::new();
843                for &x in v { acc.add(x); }
844                let mean = acc.finalize() / nrows as f64;
845                let mut var_acc = KahanAccumulatorF64::new();
846                for &x in v { let d = x - mean; var_acc.add(d * d); }
847                let std = if nrows > 1 { (var_acc.finalize() / (nrows - 1) as f64).sqrt() } else { 0.0 };
848                out.push_str(&format!("  count: {}\n", nrows));
849                out.push_str(&format!("  mean:  {:.4}\n", mean));
850                out.push_str(&format!("  std:   {:.4}\n", std));
851                out.push_str(&format!("  min:   {:.4}\n", sorted[0]));
852                out.push_str(&format!("  25%:   {:.4}\n", sorted[nrows / 4]));
853                out.push_str(&format!("  50%:   {:.4}\n", sorted[nrows / 2]));
854                out.push_str(&format!("  75%:   {:.4}\n", sorted[3 * nrows / 4]));
855                out.push_str(&format!("  max:   {:.4}\n", sorted[nrows - 1]));
856            }
857            Column::Str(v) => {
858                let mut freq = std::collections::BTreeMap::new();
859                for s in v { *freq.entry(s.as_str()).or_insert(0usize) += 1; }
860                let unique = freq.len();
861                let top = freq.iter().max_by_key(|(_, &c)| c).map(|(s, _)| *s).unwrap_or("");
862                out.push_str(&format!("  count:  {}\n", nrows));
863                out.push_str(&format!("  unique: {}\n", unique));
864                out.push_str(&format!("  top:    {}\n", top));
865            }
866            Column::Bool(v) => {
867                let trues = v.iter().filter(|&&b| b).count();
868                out.push_str(&format!("  count: {}\n", nrows));
869                out.push_str(&format!("  true:  {}\n", trues));
870                out.push_str(&format!("  false: {}\n", nrows - trues));
871            }
872            Column::Categorical { levels, codes } => {
873                let n_levels = levels.len();
874                let mut freq = std::collections::BTreeMap::new();
875                for &c in codes { *freq.entry(c).or_insert(0usize) += 1; }
876                let top_code = freq.iter().max_by_key(|(_, &c)| c).map(|(&k, _)| k).unwrap_or(0);
877                let top = if (top_code as usize) < levels.len() { &levels[top_code as usize] } else { "?" };
878                out.push_str(&format!("  count:  {}\n", nrows));
879                out.push_str(&format!("  levels: {}\n", n_levels));
880                out.push_str(&format!("  top:    {}\n", top));
881            }
882            Column::DateTime(v) => {
883                if v.is_empty() {
884                    out.push_str("  (empty)\n");
885                    continue;
886                }
887                let mut sorted = v.clone();
888                sorted.sort();
889                out.push_str(&format!("  count: {}\n", nrows));
890                out.push_str(&format!("  min:   {} (epoch ms)\n", sorted[0]));
891                out.push_str(&format!("  max:   {} (epoch ms)\n", sorted[nrows - 1]));
892            }
893            Column::CategoricalAdaptive(cc) => {
894                let n_levels = cc.dictionary().len();
895                out.push_str(&format!("  count:  {}\n", nrows));
896                out.push_str(&format!("  levels: {} (adaptive, {}-byte codes)\n",
897                    n_levels, cc.codes().width_bytes()));
898            }
899        }
900    }
901    out
902}
903
904/// Produce a transposed glimpse (like dplyr::glimpse() or tibble printing).
905///
906/// Shows each column as a row: name, type, and first few values.
907fn format_glimpse(df: &DataFrame) -> String {
908    let nrows = df.nrows();
909    let ncols = df.ncols();
910    let mut out = String::new();
911    out.push_str(&format!("Rows: {}\nColumns: {}\n", nrows, ncols));
912
913    // Find max column name width for alignment
914    let max_name_w = df.columns.iter().map(|(n, _)| n.len()).max().unwrap_or(0);
915    let max_type_w = df.columns.iter().map(|(_, c)| c.type_name().len()).max().unwrap_or(0);
916
917    let preview_count = nrows.min(8);
918    for (name, col) in &df.columns {
919        out.push_str(&format!("$ {:width_n$} <{:width_t$}>  ",
920            name, col.type_name(),
921            width_n = max_name_w, width_t = max_type_w));
922        let mut vals = Vec::with_capacity(preview_count);
923        for i in 0..preview_count {
924            vals.push(col.get_display(i));
925        }
926        out.push_str(&vals.join(", "));
927        if nrows > preview_count {
928            out.push_str(", ...");
929        }
930        out.push('\n');
931    }
932    out
933}
934
935// ============================================================================
936//  DExpr builder builtins (col, binop, agg, etc.)
937// ============================================================================
938
939/// Build a `Value::Struct { name: "DExpr", kind: "col", ... }` from a column name.
940pub fn build_col_expr(name: &str) -> Value {
941    let mut fields = std::collections::BTreeMap::new();
942    fields.insert("kind".to_string(), Value::String(Rc::new("col".to_string())));
943    fields.insert("value".to_string(), Value::String(Rc::new(name.to_string())));
944    Value::Struct { name: "DExpr".to_string(), fields }
945}
946
947/// Build a DExpr binary operation.
948pub fn build_binop_expr(op: &str, left: Value, right: Value) -> Value {
949    let mut fields = std::collections::BTreeMap::new();
950    fields.insert("kind".to_string(), Value::String(Rc::new("binop".to_string())));
951    fields.insert("op".to_string(), Value::String(Rc::new(op.to_string())));
952    fields.insert("left".to_string(), left);
953    fields.insert("right".to_string(), right);
954    Value::Struct { name: "DExpr".to_string(), fields }
955}
956
957/// Build a TidyAgg struct value.
958pub fn build_tidy_agg(kind: &str, col: Option<&str>) -> Value {
959    let mut fields = std::collections::BTreeMap::new();
960    fields.insert("kind".to_string(), Value::String(Rc::new(kind.to_string())));
961    if let Some(c) = col {
962        fields.insert("col".to_string(), Value::String(Rc::new(c.to_string())));
963    }
964    Value::Struct { name: "TidyAgg".to_string(), fields }
965}
966
967/// Build an ArrangeKey struct value.
968pub fn build_arrange_key(col: &str, descending: bool) -> Value {
969    let mut fields = std::collections::BTreeMap::new();
970    fields.insert("col".to_string(), Value::String(Rc::new(col.to_string())));
971    fields.insert("desc".to_string(), Value::Bool(descending));
972    Value::Struct { name: "ArrangeKey".to_string(), fields }
973}
974
975/// Dispatch builder builtins like `col()`, `desc()`, `asc()`, `sum()`, `mean()`, etc.
976/// Returns `Ok(Some(value))` if recognised, `Ok(None)` otherwise.
977pub fn dispatch_tidy_builtin(name: &str, args: &[Value]) -> Result<Option<Value>, String> {
978    match name {
979        // DExpr builders
980        "col" => {
981            if args.len() != 1 {
982                return Err("col() requires 1 argument: column name".into());
983            }
984            let name = value_to_string(&args[0])?;
985            Ok(Some(build_col_expr(&name)))
986        }
987        "desc" => {
988            if args.len() != 1 {
989                return Err("desc() requires 1 argument: column name".into());
990            }
991            let name = value_to_string(&args[0])?;
992            Ok(Some(build_arrange_key(&name, true)))
993        }
994        "asc" => {
995            if args.len() != 1 {
996                return Err("asc() requires 1 argument: column name".into());
997            }
998            let name = value_to_string(&args[0])?;
999            Ok(Some(build_arrange_key(&name, false)))
1000        }
1001        // DExpr binary op builder
1002        "dexpr_binop" => {
1003            if args.len() != 3 {
1004                return Err("dexpr_binop() requires 3 args: op, left, right".into());
1005            }
1006            let op = value_to_string(&args[0])?;
1007            Ok(Some(build_binop_expr(&op, args[1].clone(), args[2].clone())))
1008        }
1009
1010        // TidyAgg builders
1011        "tidy_count" => Ok(Some(build_tidy_agg("count", None))),
1012        "tidy_sum" => {
1013            if args.len() != 1 { return Err("tidy_sum() requires 1 argument: column name".into()); }
1014            let col = value_to_string(&args[0])?;
1015            Ok(Some(build_tidy_agg("sum", Some(&col))))
1016        }
1017        "tidy_mean" => {
1018            if args.len() != 1 { return Err("tidy_mean() requires 1 argument: column name".into()); }
1019            let col = value_to_string(&args[0])?;
1020            Ok(Some(build_tidy_agg("mean", Some(&col))))
1021        }
1022        "tidy_min" => {
1023            if args.len() != 1 { return Err("tidy_min() requires 1 argument: column name".into()); }
1024            let col = value_to_string(&args[0])?;
1025            Ok(Some(build_tidy_agg("min", Some(&col))))
1026        }
1027        "tidy_max" => {
1028            if args.len() != 1 { return Err("tidy_max() requires 1 argument: column name".into()); }
1029            let col = value_to_string(&args[0])?;
1030            Ok(Some(build_tidy_agg("max", Some(&col))))
1031        }
1032        "tidy_first" => {
1033            if args.len() != 1 { return Err("tidy_first() requires 1 argument: column name".into()); }
1034            let col = value_to_string(&args[0])?;
1035            Ok(Some(build_tidy_agg("first", Some(&col))))
1036        }
1037        "tidy_last" => {
1038            if args.len() != 1 { return Err("tidy_last() requires 1 argument: column name".into()); }
1039            let col = value_to_string(&args[0])?;
1040            Ok(Some(build_tidy_agg("last", Some(&col))))
1041        }
1042
1043        // =====================================================================
1044        //  stringr builtins — byte-first string view approach
1045        //
1046        //  CJC strings are UTF-8 byte sequences. These functions operate on the
1047        //  byte representation via cjc-regex's Thompson NFA. Where possible,
1048        //  results are slices (zero-copy views) of the input. Allocation happens
1049        //  only when replacement or splitting creates new buffers.
1050        //
1051        //  Key design point: patterns are compiled fresh per call. For hot-loop
1052        //  use, prefer the compiled Regex value type (regex literal `/pattern/`).
1053        // =====================================================================
1054
1055        "str_detect" => {
1056            // str_detect(haystack, pattern) → bool
1057            if args.len() != 2 { return Err("str_detect requires 2 args: string, pattern".into()); }
1058            let hay = value_to_string(&args[0])?;
1059            let pat = value_to_string(&args[1])?;
1060            let matched = cjc_regex::is_match(&pat, "", hay.as_bytes());
1061            Ok(Some(Value::Bool(matched)))
1062        }
1063        "str_extract" => {
1064            // str_extract(haystack, pattern) → string (first match) or ""
1065            if args.len() != 2 { return Err("str_extract requires 2 args: string, pattern".into()); }
1066            let hay = value_to_string(&args[0])?;
1067            let pat = value_to_string(&args[1])?;
1068            match cjc_regex::find(&pat, "", hay.as_bytes()) {
1069                Some((start, end)) => {
1070                    let slice = &hay.as_bytes()[start..end];
1071                    let s = String::from_utf8_lossy(slice).to_string();
1072                    Ok(Some(Value::String(Rc::new(s))))
1073                }
1074                None => Ok(Some(Value::String(Rc::new(String::new())))),
1075            }
1076        }
1077        "str_extract_all" => {
1078            // str_extract_all(haystack, pattern) → [string]
1079            if args.len() != 2 { return Err("str_extract_all requires 2 args: string, pattern".into()); }
1080            let hay = value_to_string(&args[0])?;
1081            let pat = value_to_string(&args[1])?;
1082            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
1083            let vals: Vec<Value> = matches
1084                .iter()
1085                .map(|&(start, end)| {
1086                    let slice = &hay.as_bytes()[start..end];
1087                    Value::String(Rc::new(String::from_utf8_lossy(slice).to_string()))
1088                })
1089                .collect();
1090            Ok(Some(Value::Array(Rc::new(vals))))
1091        }
1092        "str_replace" => {
1093            // str_replace(haystack, pattern, replacement) → string (first match replaced)
1094            if args.len() != 3 { return Err("str_replace requires 3 args: string, pattern, replacement".into()); }
1095            let hay = value_to_string(&args[0])?;
1096            let pat = value_to_string(&args[1])?;
1097            let rep = value_to_string(&args[2])?;
1098            match cjc_regex::find(&pat, "", hay.as_bytes()) {
1099                Some((start, end)) => {
1100                    let mut result = String::with_capacity(hay.len());
1101                    result.push_str(&hay[..start]);
1102                    result.push_str(&rep);
1103                    result.push_str(&hay[end..]);
1104                    Ok(Some(Value::String(Rc::new(result))))
1105                }
1106                None => Ok(Some(Value::String(Rc::new(hay)))),
1107            }
1108        }
1109        "str_replace_all" => {
1110            // str_replace_all(haystack, pattern, replacement) → string (all matches replaced)
1111            if args.len() != 3 { return Err("str_replace_all requires 3 args: string, pattern, replacement".into()); }
1112            let hay = value_to_string(&args[0])?;
1113            let pat = value_to_string(&args[1])?;
1114            let rep = value_to_string(&args[2])?;
1115            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
1116            if matches.is_empty() {
1117                return Ok(Some(Value::String(Rc::new(hay))));
1118            }
1119            let mut result = String::with_capacity(hay.len());
1120            let mut last_end = 0;
1121            for &(start, end) in &matches {
1122                result.push_str(&hay[last_end..start]);
1123                result.push_str(&rep);
1124                last_end = end;
1125            }
1126            result.push_str(&hay[last_end..]);
1127            Ok(Some(Value::String(Rc::new(result))))
1128        }
1129        "str_split" => {
1130            // str_split(haystack, pattern) → [string]
1131            if args.len() != 2 { return Err("str_split requires 2 args: string, pattern".into()); }
1132            let hay = value_to_string(&args[0])?;
1133            let pat = value_to_string(&args[1])?;
1134            let spans = cjc_regex::split(&pat, "", hay.as_bytes());
1135            let vals: Vec<Value> = spans
1136                .iter()
1137                .map(|&(start, end)| {
1138                    Value::String(Rc::new(
1139                        String::from_utf8_lossy(&hay.as_bytes()[start..end]).to_string(),
1140                    ))
1141                })
1142                .collect();
1143            Ok(Some(Value::Array(Rc::new(vals))))
1144        }
1145        "str_count" => {
1146            // str_count(haystack, pattern) → int (number of matches)
1147            if args.len() != 2 { return Err("str_count requires 2 args: string, pattern".into()); }
1148            let hay = value_to_string(&args[0])?;
1149            let pat = value_to_string(&args[1])?;
1150            let count = cjc_regex::find_all(&pat, "", hay.as_bytes()).len();
1151            Ok(Some(Value::Int(count as i64)))
1152        }
1153        "str_trim" => {
1154            // str_trim(string) → string with leading/trailing whitespace removed
1155            if args.len() != 1 { return Err("str_trim requires 1 arg: string".into()); }
1156            let s = value_to_string(&args[0])?;
1157            Ok(Some(Value::String(Rc::new(s.trim().to_string()))))
1158        }
1159        "str_to_upper" => {
1160            if args.len() != 1 { return Err("str_to_upper requires 1 arg: string".into()); }
1161            let s = value_to_string(&args[0])?;
1162            Ok(Some(Value::String(Rc::new(s.to_uppercase()))))
1163        }
1164        "str_to_lower" => {
1165            if args.len() != 1 { return Err("str_to_lower requires 1 arg: string".into()); }
1166            let s = value_to_string(&args[0])?;
1167            Ok(Some(Value::String(Rc::new(s.to_lowercase()))))
1168        }
1169        "str_starts" => {
1170            if args.len() != 2 { return Err("str_starts requires 2 args: string, prefix".into()); }
1171            let s = value_to_string(&args[0])?;
1172            let prefix = value_to_string(&args[1])?;
1173            Ok(Some(Value::Bool(s.starts_with(&prefix))))
1174        }
1175        "str_ends" => {
1176            if args.len() != 2 { return Err("str_ends requires 2 args: string, suffix".into()); }
1177            let s = value_to_string(&args[0])?;
1178            let suffix = value_to_string(&args[1])?;
1179            Ok(Some(Value::Bool(s.ends_with(&suffix))))
1180        }
1181        "str_sub" => {
1182            // str_sub(string, start, end) → substring (byte-indexed, clamped)
1183            if args.len() != 3 { return Err("str_sub requires 3 args: string, start, end".into()); }
1184            let s = value_to_string(&args[0])?;
1185            let start = value_to_usize(&args[1])?.min(s.len());
1186            let end = value_to_usize(&args[2])?.min(s.len());
1187            if start > end {
1188                Ok(Some(Value::String(Rc::new(String::new()))))
1189            } else {
1190                // Clamp to char boundaries for safety
1191                let actual_start = clamp_to_char_boundary(&s, start);
1192                let actual_end = clamp_to_char_boundary(&s, end);
1193                Ok(Some(Value::String(Rc::new(s[actual_start..actual_end].to_string()))))
1194            }
1195        }
1196        "str_len" => {
1197            // str_len(string) → int (byte length, consistent with byte-first view)
1198            if args.len() != 1 { return Err("str_len requires 1 arg: string".into()); }
1199            let s = value_to_string(&args[0])?;
1200            Ok(Some(Value::Int(s.len() as i64)))
1201        }
1202
1203        // =====================================================================
1204        //  Stats builtins (operate on Array of numbers)
1205        // =====================================================================
1206
1207        "median" => {
1208            if args.len() != 1 { return Err("median requires 1 arg: numeric array".into()); }
1209            let nums = value_to_f64_vec(&args[0])?;
1210            if nums.is_empty() {
1211                return Ok(Some(Value::Float(f64::NAN)));
1212            }
1213            let mut sorted = nums;
1214            sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1215            let mid = sorted.len() / 2;
1216            let med = if sorted.len() % 2 == 0 {
1217                (sorted[mid - 1] + sorted[mid]) / 2.0
1218            } else {
1219                sorted[mid]
1220            };
1221            Ok(Some(Value::Float(med)))
1222        }
1223        "sd" => {
1224            // Population standard deviation
1225            if args.len() != 1 { return Err("sd requires 1 arg: numeric array".into()); }
1226            let nums = value_to_f64_vec(&args[0])?;
1227            if nums.len() < 2 {
1228                return Ok(Some(Value::Float(f64::NAN)));
1229            }
1230            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1231            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1232                / (nums.len() - 1) as f64;
1233            Ok(Some(Value::Float(var.sqrt())))
1234        }
1235        "variance" => {
1236            // Sample variance (N-1 denominator)
1237            if args.len() != 1 { return Err("variance requires 1 arg: numeric array".into()); }
1238            let nums = value_to_f64_vec(&args[0])?;
1239            if nums.len() < 2 {
1240                return Ok(Some(Value::Float(f64::NAN)));
1241            }
1242            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1243            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1244                / (nums.len() - 1) as f64;
1245            Ok(Some(Value::Float(var)))
1246        }
1247        "n_distinct" => {
1248            // Count distinct values in an array
1249            if args.len() != 1 { return Err("n_distinct requires 1 arg: array".into()); }
1250            match &args[0] {
1251                Value::Array(arr) => {
1252                    let mut seen = std::collections::BTreeSet::new();
1253                    for v in arr.iter() {
1254                        seen.insert(format!("{v}"));
1255                    }
1256                    Ok(Some(Value::Int(seen.len() as i64)))
1257                }
1258                _ => Err(format!("n_distinct expects Array, got {}", args[0].type_name())),
1259            }
1260        }
1261
1262        // =====================================================================
1263        //  DataFrame free-standing builtins (ITEM 1)
1264        //
1265        //  These wrap TidyView method calls so CJC code can write:
1266        //    pivot_wider(df, ["id"], "measure", "value")
1267        //  instead of (or in addition to) the method form:
1268        //    df.pivot_wider(["id"], "measure", "value")
1269        //
1270        //  All take a `Value::TidyView` as their first argument and re-use the
1271        //  existing method dispatch internally. This keeps the implementation a
1272        //  single source of truth.
1273        // =====================================================================
1274
1275        // ------------------------------------------------------------------
1276        // df_read_csv(path) or df_read_csv(path, delimiter) → TidyView
1277        // ------------------------------------------------------------------
1278        "df_read_csv" => {
1279            if args.len() < 1 || args.len() > 2 {
1280                return Err("df_read_csv requires 1-2 arguments (path[, delimiter])".into());
1281            }
1282            let path = match &args[0] {
1283                Value::String(s) => s.as_ref().clone(),
1284                _ => return Err(format!("df_read_csv: path must be String, got {}", args[0].type_name())),
1285            };
1286            let delim: u8 = if args.len() == 2 {
1287                match &args[1] {
1288                    Value::String(s) if !s.is_empty() => s.as_bytes()[0],
1289                    _ => return Err("df_read_csv: delimiter must be a non-empty String".into()),
1290                }
1291            } else {
1292                b','
1293            };
1294            let bytes = std::fs::read(&path)
1295                .map_err(|e| format!("df_read_csv: {}", e))?;
1296            let config = CsvConfig { delimiter: delim, ..CsvConfig::default() };
1297            let df = CsvReader::new(config)
1298                .parse(&bytes)
1299                .map_err(|e| format!("df_read_csv: {}", e))?;
1300            Ok(Some(wrap_view(TidyView::from_df(df))))
1301        }
1302
1303        // ------------------------------------------------------------------
1304        // pivot_wider(df, id_cols, names_from, values_from) → TidyView
1305        // ------------------------------------------------------------------
1306        "pivot_wider" => {
1307            if args.len() != 4 {
1308                return Err(
1309                    "pivot_wider requires 4 arguments (df, id_cols, names_from, values_from)".into(),
1310                );
1311            }
1312            let view = value_to_tidy_view(&args[0])?;
1313            let id_cols = value_to_str_vec(&args[1])?;
1314            let id_refs: Vec<&str> = id_cols.iter().map(|s| s.as_str()).collect();
1315            let names_from = value_to_string(&args[2])?;
1316            let values_from = value_to_string(&args[3])?;
1317            let nullable_frame = view
1318                .pivot_wider(&id_refs, &names_from, &values_from)
1319                .map_err(|e| format!("{e}"))?;
1320            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
1321        }
1322
1323        // ------------------------------------------------------------------
1324        // pivot_longer(df, cols, names_to, values_to) → TidyView
1325        // ------------------------------------------------------------------
1326        "pivot_longer" => {
1327            if args.len() < 3 || args.len() > 4 {
1328                return Err(
1329                    "pivot_longer requires 3-4 arguments (df, cols, names_to[, values_to])".into(),
1330                );
1331            }
1332            let view = value_to_tidy_view(&args[0])?;
1333            let cols = value_to_str_vec(&args[1])?;
1334            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
1335            let names_to = value_to_string(&args[2])?;
1336            let values_to = if args.len() == 4 {
1337                value_to_string(&args[3])?
1338            } else {
1339                "value".to_string()
1340            };
1341            let frame = view
1342                .pivot_longer(&col_refs, &names_to, &values_to)
1343                .map_err(|e| format!("{e}"))?;
1344            Ok(Some(wrap_view(frame.view())))
1345        }
1346
1347        // ------------------------------------------------------------------
1348        // df_distinct(df) or df_distinct(df, cols) → TidyView
1349        // ------------------------------------------------------------------
1350        "df_distinct" => {
1351            if args.is_empty() || args.len() > 2 {
1352                return Err("df_distinct requires 1-2 arguments (df[, cols])".into());
1353            }
1354            let view = value_to_tidy_view(&args[0])?;
1355            let cols = if args.len() == 2 {
1356                value_to_str_vec(&args[1])?
1357            } else {
1358                view.column_names().iter().map(|s| s.to_string()).collect()
1359            };
1360            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
1361            let new_view = view.distinct(&col_refs).map_err(|e| format!("{e}"))?;
1362            Ok(Some(wrap_view(new_view)))
1363        }
1364
1365        // ------------------------------------------------------------------
1366        // df_rename(df, old_name, new_name) → TidyView
1367        // ------------------------------------------------------------------
1368        "df_rename" => {
1369            if args.len() != 3 {
1370                return Err("df_rename requires 3 arguments (df, old_name, new_name)".into());
1371            }
1372            let view = value_to_tidy_view(&args[0])?;
1373            let old = value_to_string(&args[1])?;
1374            let new = value_to_string(&args[2])?;
1375            let pair_refs: Vec<(&str, &str)> = vec![(&old, &new)];
1376            let new_view = view.rename(&pair_refs).map_err(|e| format!("{e}"))?;
1377            Ok(Some(wrap_view(new_view)))
1378        }
1379
1380        // ------------------------------------------------------------------
1381        // df_anti_join(df1, df2, on) → TidyView
1382        // df_semi_join(df1, df2, on) → TidyView
1383        // df_full_join(df1, df2, on) → TidyView
1384        //
1385        // `on` = String (single key, same name in both) or
1386        //        Array of Strings (multi-key, same names in both).
1387        // ------------------------------------------------------------------
1388        "df_anti_join" | "df_semi_join" | "df_full_join" => {
1389            if args.len() != 3 {
1390                return Err(format!(
1391                    "{name} requires 3 arguments (df1, df2, on)"
1392                ));
1393            }
1394            let left = value_to_tidy_view(&args[0])?;
1395            let right_rc = match &args[1] {
1396                Value::TidyView(rc) => rc,
1397                _ => return Err(format!("{name}: second argument must be a TidyView")),
1398            };
1399            let right_inner: &Rc<dyn std::any::Any> = right_rc;
1400            let right = right_inner
1401                .downcast_ref::<TidyView>()
1402                .ok_or_else(|| "internal: TidyView downcast failed".to_string())?;
1403            // Parse `on`: single string or array of strings
1404            let on_keys: Vec<String> = match &args[2] {
1405                Value::String(s) => vec![s.as_ref().clone()],
1406                Value::Array(arr) => arr
1407                    .iter()
1408                    .map(|v| match v {
1409                        Value::String(s) => Ok(s.as_ref().clone()),
1410                        _ => Err(format!("on: expected String keys, got {}", v.type_name())),
1411                    })
1412                    .collect::<Result<Vec<_>, _>>()?,
1413                _ => return Err(format!("{name}: `on` must be String or Array of Strings")),
1414            };
1415            let on_pairs: Vec<(&str, &str)> = on_keys.iter().map(|k| (k.as_str(), k.as_str())).collect();
1416            match name {
1417                "df_anti_join" => {
1418                    let new_view = left.anti_join(right, &on_pairs).map_err(|e| format!("{e}"))?;
1419                    Ok(Some(wrap_view(new_view)))
1420                }
1421                "df_semi_join" => {
1422                    let new_view = left.semi_join(right, &on_pairs).map_err(|e| format!("{e}"))?;
1423                    Ok(Some(wrap_view(new_view)))
1424                }
1425                "df_full_join" => {
1426                    let suffix = crate::JoinSuffix::default();
1427                    let nullable_frame = left.full_join(right, &on_pairs, &suffix)
1428                        .map_err(|e| format!("{e}"))?;
1429                    Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
1430                }
1431                _ => Ok(None),
1432            }
1433        }
1434
1435        // ------------------------------------------------------------------
1436        // df_fill_na(df, col_name, fill_val) → TidyView
1437        //
1438        // Fills NA/null values in the specified column with `fill_val`.
1439        // Works by materializing, patching the column, and re-wrapping.
1440        // ------------------------------------------------------------------
1441        "df_fill_na" => {
1442            if args.len() != 3 {
1443                return Err("df_fill_na requires 3 arguments (df, col_name, fill_val)".into());
1444            }
1445            let view = value_to_tidy_view(&args[0])?;
1446            let col_name = value_to_string(&args[1])?;
1447            let fill_val = &args[2];
1448
1449            let mut df = view.materialize().map_err(|e| format!("{e}"))?;
1450            let col_idx = df.columns.iter().position(|(n, _)| n == &col_name)
1451                .ok_or_else(|| format!("df_fill_na: column '{}' not found", col_name))?;
1452
1453            let filled_col = match &df.columns[col_idx].1 {
1454                Column::Int(v) => {
1455                    // Int columns have no inline NA representation in the
1456                    // dense storage; NullableColumn nulls are materialised as 0
1457                    // by to_tidy_view_filled.  Accept the argument for API
1458                    // consistency but leave the column unchanged.
1459                    let _fill = match fill_val {
1460                        Value::Int(i) => *i,
1461                        Value::Float(f) => *f as i64,
1462                        _ => return Err("df_fill_na: fill value must be numeric for Int column".into()),
1463                    };
1464                    Column::Int(v.clone())
1465                }
1466                Column::Float(v) => {
1467                    let fill = match fill_val {
1468                        Value::Float(f) => *f,
1469                        Value::Int(i) => *i as f64,
1470                        _ => return Err("df_fill_na: fill value must be numeric for Float column".into()),
1471                    };
1472                    Column::Float(v.iter().map(|&x| if x.is_nan() { fill } else { x }).collect())
1473                }
1474                Column::Str(v) => {
1475                    let fill = match fill_val {
1476                        Value::String(s) => s.as_ref().clone(),
1477                        other => format!("{other}"),
1478                    };
1479                    Column::Str(v.iter().map(|s| {
1480                        if s == "NA" || s.is_empty() { fill.clone() } else { s.clone() }
1481                    }).collect())
1482                }
1483                Column::Bool(v) => Column::Bool(v.clone()),
1484                Column::Categorical { levels, codes } => Column::Categorical { levels: levels.clone(), codes: codes.clone() },
1485                Column::CategoricalAdaptive(_) => df.columns[col_idx].1.to_legacy_categorical(),
1486                Column::DateTime(v) => Column::DateTime(v.clone()),
1487            };
1488            df.columns[col_idx].1 = filled_col;
1489            Ok(Some(wrap_view(TidyView::from_df(df))))
1490        }
1491
1492        // ------------------------------------------------------------------
1493        // df_drop_na(df) or df_drop_na(df, cols) → TidyView
1494        //
1495        // Drops rows that contain NA in the specified columns (all by default).
1496        // Uses a filter predicate over the visible rows.
1497        // ------------------------------------------------------------------
1498        "df_drop_na" => {
1499            if args.is_empty() || args.len() > 2 {
1500                return Err("df_drop_na requires 1-2 arguments (df[, cols])".into());
1501            }
1502            let view = value_to_tidy_view(&args[0])?;
1503            let target_cols: Vec<String> = if args.len() == 2 {
1504                value_to_str_vec(&args[1])?
1505            } else {
1506                view.column_names().iter().map(|s| s.to_string()).collect()
1507            };
1508
1509            // Materialise once, then filter row by row
1510            let df = view.materialize().map_err(|e| format!("{e}"))?;
1511            let nrows = df.nrows();
1512
1513            // For each target column, find which rows are NA
1514            let mut keep = vec![true; nrows];
1515            for col_name in &target_cols {
1516                if let Some(col) = df.get_column(col_name) {
1517                    for r in 0..nrows {
1518                        if !keep[r] { continue; }
1519                        let na = match col {
1520                            Column::Float(v) => v[r].is_nan(),
1521                            Column::Str(v) => v[r] == "NA" || v[r].is_empty(),
1522                            _ => false,
1523                        };
1524                        if na { keep[r] = false; }
1525                    }
1526                } else {
1527                    return Err(format!("df_drop_na: column '{}' not found", col_name));
1528                }
1529            }
1530
1531            // Build new DataFrame from kept rows
1532            let mut new_cols: Vec<(String, Column)> = Vec::with_capacity(df.columns.len());
1533            for (name, col) in &df.columns {
1534                let legacy_owned;
1535                let col_ref: &Column = if matches!(col, Column::CategoricalAdaptive(_)) {
1536                    legacy_owned = col.to_legacy_categorical();
1537                    &legacy_owned
1538                } else {
1539                    col
1540                };
1541                let new_col = match col_ref {
1542                    Column::Int(v)       => Column::Int(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1543                    Column::Float(v)     => Column::Float(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1544                    Column::Str(v)       => Column::Str(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| x.clone()).collect()),
1545                    Column::Bool(v)      => Column::Bool(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1546                    Column::DateTime(v)  => Column::DateTime(v.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect()),
1547                    Column::Categorical { levels, codes } => Column::Categorical {
1548                        levels: levels.clone(),
1549                        codes: codes.iter().enumerate().filter(|(r, _)| keep[*r]).map(|(_, x)| *x).collect(),
1550                    },
1551                    Column::CategoricalAdaptive(_) => unreachable!("converted via legacy_owned"),
1552                };
1553                new_cols.push((name.clone(), new_col));
1554            }
1555            let new_df = DataFrame::from_columns(new_cols)
1556                .map_err(|e| format!("df_drop_na: {e}"))?;
1557            Ok(Some(wrap_view(TidyView::from_df(new_df))))
1558        }
1559
1560        _ => Ok(None),
1561    }
1562}
1563
1564/// Helper: extract a `&TidyView` reference from a `Value::TidyView`.
1565fn value_to_tidy_view(v: &Value) -> Result<&TidyView, String> {
1566    match v {
1567        Value::TidyView(rc) => rc
1568            .downcast_ref::<TidyView>()
1569            .ok_or_else(|| "internal: TidyView downcast failed".to_string()),
1570        _ => Err(format!(
1571            "expected TidyView (use df.view() to convert a DataFrame), got {}",
1572            v.type_name()
1573        )),
1574    }
1575}
1576
1577/// Clamp a byte index to the nearest char boundary (round down).
1578fn clamp_to_char_boundary(s: &str, idx: usize) -> usize {
1579    if idx >= s.len() {
1580        return s.len();
1581    }
1582    let mut i = idx;
1583    while i > 0 && !s.is_char_boundary(i) {
1584        i -= 1;
1585    }
1586    i
1587}
1588
1589/// Convert a Value::Array of numbers to Vec<f64>.
1590fn value_to_f64_vec(v: &Value) -> Result<Vec<f64>, String> {
1591    match v {
1592        Value::Array(arr) => {
1593            arr.iter()
1594                .map(|v| match v {
1595                    Value::Float(f) => Ok(*f),
1596                    Value::Int(i) => Ok(*i as f64),
1597                    _ => Err(format!("expected numeric value in array, got {}", v.type_name())),
1598                })
1599                .collect()
1600        }
1601        _ => Err(format!("expected Array, got {}", v.type_name())),
1602    }
1603}