Skip to main content

cjc_data/
tidy_dispatch.rs

1//! Shared tidy dispatch: maps CJC language method calls on TidyView /
2//! GroupedTidyView values to the concrete cjc_data API.
3//!
4//! Both `cjc-eval` and `cjc-mir-exec` call into `dispatch_tidy_method` and
5//! `dispatch_grouped_method` so that every tidy operation has a single source
6//! of truth.  The executors only need to pattern-match `Value::TidyView` or
7//! `Value::GroupedTidyView` and delegate here.
8//!
9//! # Error handling
10//! All errors are returned as `Err(String)`.  The caller wraps the string
11//! into its own error type (EvalError / MirExecError).
12
13use std::rc::Rc;
14use std::any::Any;
15
16use cjc_runtime::value::Value;
17
18use crate::{
19    ArrangeKey, Column, DExpr, DBinOp, DataFrame, GroupedTidyView,
20    TidyAgg, TidyView,
21};
22
23// ============================================================================
24//  Public entry points
25// ============================================================================
26
27/// Dispatch a method call on a `Value::TidyView`.
28///
29/// Returns `Ok(Some(value))` if the method is known, `Ok(None)` if not
30/// recognised (allows the caller to fall through to other dispatch paths).
31pub fn dispatch_tidy_method(
32    inner: &Rc<dyn Any>,
33    method: &str,
34    args: &[Value],
35) -> Result<Option<Value>, String> {
36    let view = downcast_view(inner)?;
37    match method {
38        // -- shape ----------------------------------------------------------
39        "nrows" => Ok(Some(Value::Int(view.nrows() as i64))),
40        "ncols" => Ok(Some(Value::Int(view.ncols() as i64))),
41        "column_names" => {
42            let names: Vec<Value> = view
43                .column_names()
44                .into_iter()
45                .map(|s| Value::String(Rc::new(s.to_string())))
46                .collect();
47            Ok(Some(Value::Array(Rc::new(names))))
48        }
49
50        // -- filter ---------------------------------------------------------
51        "filter" => {
52            if args.len() != 1 {
53                return Err("TidyView.filter requires 1 argument: predicate DExpr".into());
54            }
55            let predicate = value_to_dexpr(&args[0])?;
56            let new_view = view.filter(&predicate).map_err(|e| format!("{e}"))?;
57            Ok(Some(wrap_view(new_view)))
58        }
59
60        // -- select ---------------------------------------------------------
61        "select" => {
62            if args.len() != 1 {
63                return Err("TidyView.select requires 1 argument: column names array".into());
64            }
65            let cols = value_to_str_vec(&args[0])?;
66            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
67            let new_view = view.select(&col_refs).map_err(|e| format!("{e}"))?;
68            Ok(Some(wrap_view(new_view)))
69        }
70
71        // -- mutate ---------------------------------------------------------
72        "mutate" => {
73            // mutate(name, expr) or mutate([(name, expr), ...])
74            // We support: mutate("col_name", dexpr_value)
75            if args.len() != 2 {
76                return Err("TidyView.mutate requires 2 arguments: column_name and expression".into());
77            }
78            let col_name = value_to_string(&args[0])?;
79            let expr = value_to_dexpr(&args[1])?;
80            let frame = view.mutate(&[(&col_name, expr)]).map_err(|e| format!("{e}"))?;
81            // mutate returns TidyFrame; convert to TidyView for pipeline continuity
82            Ok(Some(wrap_view(frame.view())))
83        }
84
85        // -- group_by -------------------------------------------------------
86        "group_by" => {
87            if args.len() != 1 {
88                return Err("TidyView.group_by requires 1 argument: key columns array".into());
89            }
90            let keys = value_to_str_vec(&args[0])?;
91            let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
92            let grouped = view.group_by(&key_refs).map_err(|e| format!("{e}"))?;
93            Ok(Some(wrap_grouped(grouped)))
94        }
95
96        // -- arrange --------------------------------------------------------
97        "arrange" => {
98            if args.len() != 1 {
99                return Err("TidyView.arrange requires 1 argument: sort keys array".into());
100            }
101            let keys = value_to_arrange_keys(&args[0])?;
102            let new_view = view.arrange(&keys).map_err(|e| format!("{e}"))?;
103            Ok(Some(wrap_view(new_view)))
104        }
105
106        // -- distinct -------------------------------------------------------
107        "distinct" => {
108            let cols = if args.is_empty() {
109                view.column_names().iter().map(|s| s.to_string()).collect::<Vec<_>>()
110            } else {
111                value_to_str_vec(&args[0])?
112            };
113            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
114            let new_view = view.distinct(&col_refs).map_err(|e| format!("{e}"))?;
115            Ok(Some(wrap_view(new_view)))
116        }
117
118        // -- slice family ---------------------------------------------------
119        "slice" => {
120            if args.len() != 2 {
121                return Err("TidyView.slice requires 2 arguments: start, end".into());
122            }
123            let start = value_to_usize(&args[0])?;
124            let end = value_to_usize(&args[1])?;
125            Ok(Some(wrap_view(view.slice(start, end))))
126        }
127        "slice_head" => {
128            if args.len() != 1 {
129                return Err("TidyView.slice_head requires 1 argument: n".into());
130            }
131            let n = value_to_usize(&args[0])?;
132            Ok(Some(wrap_view(view.slice_head(n))))
133        }
134        "slice_tail" => {
135            if args.len() != 1 {
136                return Err("TidyView.slice_tail requires 1 argument: n".into());
137            }
138            let n = value_to_usize(&args[0])?;
139            Ok(Some(wrap_view(view.slice_tail(n))))
140        }
141        "slice_sample" => {
142            if args.len() != 2 {
143                return Err("TidyView.slice_sample requires 2 arguments: n, seed".into());
144            }
145            let n = value_to_usize(&args[0])?;
146            let seed = match &args[1] {
147                Value::Int(i) => *i as u64,
148                _ => return Err("slice_sample seed must be Int".into()),
149            };
150            Ok(Some(wrap_view(view.slice_sample(n, seed))))
151        }
152
153        // -- joins ----------------------------------------------------------
154        "inner_join" | "left_join" | "semi_join" | "anti_join" => {
155            dispatch_join(view, args, method)
156        }
157
158        // -- reshape --------------------------------------------------------
159        "pivot_longer" => {
160            if args.len() < 2 || args.len() > 3 {
161                return Err(
162                    "TidyView.pivot_longer requires 2-3 args: cols, names_to, [values_to]".into(),
163                );
164            }
165            let cols = value_to_str_vec(&args[0])?;
166            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
167            let names_to = value_to_string(&args[1])?;
168            let values_to = if args.len() == 3 {
169                value_to_string(&args[2])?
170            } else {
171                "value".to_string()
172            };
173            let frame = view
174                .pivot_longer(&col_refs, &names_to, &values_to)
175                .map_err(|e| format!("{e}"))?;
176            Ok(Some(wrap_view(frame.view())))
177        }
178        "pivot_wider" => {
179            if args.len() != 3 {
180                return Err(
181                    "TidyView.pivot_wider requires 3 args: id_cols, names_from, values_from"
182                        .into(),
183                );
184            }
185            let id_cols = value_to_str_vec(&args[0])?;
186            let id_refs: Vec<&str> = id_cols.iter().map(|s| s.as_str()).collect();
187            let names_from = value_to_string(&args[1])?;
188            let values_from = value_to_string(&args[2])?;
189            let nullable_frame = view
190                .pivot_wider(&id_refs, &names_from, &values_from)
191                .map_err(|e| format!("{e}"))?;
192            // NullableFrame → fill nulls with defaults → TidyView
193            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
194        }
195
196        // -- rename / relocate / drop_cols / bind ----------------------------
197        "rename" => {
198            if args.len() != 1 {
199                return Err("TidyView.rename requires 1 argument: array of [old, new] pairs".into());
200            }
201            let pairs = value_to_rename_pairs(&args[0])?;
202            let pair_refs: Vec<(&str, &str)> =
203                pairs.iter().map(|(a, b)| (a.as_str(), b.as_str())).collect();
204            let new_view = view.rename(&pair_refs).map_err(|e| format!("{e}"))?;
205            Ok(Some(wrap_view(new_view)))
206        }
207        "drop_cols" => {
208            if args.len() != 1 {
209                return Err("TidyView.drop_cols requires 1 argument: column names array".into());
210            }
211            let cols = value_to_str_vec(&args[0])?;
212            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
213            let new_view = view.drop_cols(&col_refs).map_err(|e| format!("{e}"))?;
214            Ok(Some(wrap_view(new_view)))
215        }
216        "bind_rows" => {
217            if args.len() != 1 {
218                return Err("TidyView.bind_rows requires 1 argument: other TidyView".into());
219            }
220            let other_rc = match &args[0] {
221                Value::TidyView(rc) => rc,
222                _ => return Err("bind_rows argument must be a TidyView".into()),
223            };
224            let other = downcast_view(other_rc)?;
225            let frame = view.bind_rows(other).map_err(|e| format!("{e}"))?;
226            Ok(Some(wrap_view(frame.view())))
227        }
228        "bind_cols" => {
229            if args.len() != 1 {
230                return Err("TidyView.bind_cols requires 1 argument: other TidyView".into());
231            }
232            let other_rc = match &args[0] {
233                Value::TidyView(rc) => rc,
234                _ => return Err("bind_cols argument must be a TidyView".into()),
235            };
236            let other = downcast_view(other_rc)?;
237            let frame = view.bind_cols(other).map_err(|e| format!("{e}"))?;
238            Ok(Some(wrap_view(frame.view())))
239        }
240
241        // -- column extraction / tensor -------------------------------------
242        "column" => {
243            if args.len() != 1 {
244                return Err("TidyView.column requires 1 argument: column_name".into());
245            }
246            let name = value_to_string(&args[0])?;
247            let df = view.materialize().map_err(|e| format!("{e}"))?;
248            let col = df
249                .get_column(&name)
250                .ok_or_else(|| format!("column '{}' not found", name))?;
251            Ok(Some(column_to_value(col)))
252        }
253        "to_tensor" => {
254            if args.len() != 1 {
255                return Err("TidyView.to_tensor requires 1 argument: column_names array".into());
256            }
257            let cols = value_to_str_vec(&args[0])?;
258            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
259            let t = view.to_tensor(&col_refs).map_err(|e| format!("{e}"))?;
260            Ok(Some(Value::Tensor(t)))
261        }
262
263        // -- materialize to DataFrame Struct --------------------------------
264        "collect" => {
265            let df = view.materialize().map_err(|e| format!("{e}"))?;
266            Ok(Some(dataframe_to_value(df)))
267        }
268
269        // -- print (for debugging) ------------------------------------------
270        "print" => {
271            let df = view.materialize().map_err(|e| format!("{e}"))?;
272            let s = format_dataframe(&df);
273            // Returning the formatted string; the caller is responsible for
274            // printing and capturing in output buffer.
275            Ok(Some(Value::String(Rc::new(s))))
276        }
277
278        // -- DataFrame inspection builtins -----------------------------------
279        "head" => {
280            let n = if args.is_empty() { 10 } else {
281                match &args[0] { Value::Int(n) => *n as usize, _ => return Err("head: argument must be Int".into()) }
282            };
283            let sliced = view.slice_head(n);
284            let df = sliced.materialize().map_err(|e| format!("{e}"))?;
285            let s = format_dataframe(&df);
286            Ok(Some(Value::String(Rc::new(s))))
287        }
288        "tail" => {
289            let n = if args.is_empty() { 10 } else {
290                match &args[0] { Value::Int(n) => *n as usize, _ => return Err("tail: argument must be Int".into()) }
291            };
292            let sliced = view.slice_tail(n);
293            let df = sliced.materialize().map_err(|e| format!("{e}"))?;
294            let s = format_dataframe(&df);
295            Ok(Some(Value::String(Rc::new(s))))
296        }
297        "shape" => {
298            let result = Value::Tuple(Rc::new(vec![
299                Value::Int(view.nrows() as i64),
300                Value::Int(view.ncols() as i64),
301            ]));
302            Ok(Some(result))
303        }
304        "columns" => {
305            // Alias for column_names — returns array of column name strings
306            let names: Vec<Value> = view
307                .column_names()
308                .into_iter()
309                .map(|s| Value::String(Rc::new(s.to_string())))
310                .collect();
311            Ok(Some(Value::Array(Rc::new(names))))
312        }
313        "dtypes" => {
314            // Returns a Struct mapping column_name → type_name
315            let df = view.materialize().map_err(|e| format!("{e}"))?;
316            let mut fields = std::collections::BTreeMap::new();
317            for (name, col) in &df.columns {
318                fields.insert(name.clone(), Value::String(Rc::new(col.type_name().to_string())));
319            }
320            Ok(Some(Value::Struct { name: "Dtypes".to_string(), fields }))
321        }
322        "describe" => {
323            let df = view.materialize().map_err(|e| format!("{e}"))?;
324            let s = format_describe(&df);
325            Ok(Some(Value::String(Rc::new(s))))
326        }
327        "glimpse" => {
328            let df = view.materialize().map_err(|e| format!("{e}"))?;
329            let s = format_glimpse(&df);
330            Ok(Some(Value::String(Rc::new(s))))
331        }
332
333        _ => Ok(None), // unknown method — caller falls through
334    }
335}
336
337/// Dispatch a method call on a `Value::GroupedTidyView`.
338pub fn dispatch_grouped_method(
339    inner: &Rc<dyn Any>,
340    method: &str,
341    args: &[Value],
342) -> Result<Option<Value>, String> {
343    let grouped = downcast_grouped(inner)?;
344    match method {
345        "ngroups" => Ok(Some(Value::Int(grouped.ngroups() as i64))),
346
347        "summarise" | "summarize" => {
348            if args.len() % 2 != 0 || args.is_empty() {
349                return Err(
350                    "summarise requires pairs of (name, agg) arguments".into(),
351                );
352            }
353            let mut assignments: Vec<(String, TidyAgg)> = Vec::new();
354            let mut i = 0;
355            while i < args.len() {
356                let name = value_to_string(&args[i])?;
357                let agg = value_to_tidy_agg(&args[i + 1])?;
358                assignments.push((name, agg));
359                i += 2;
360            }
361            let asg_refs: Vec<(&str, TidyAgg)> = assignments
362                .iter()
363                .map(|(n, a)| (n.as_str(), a.clone()))
364                .collect();
365            let frame = grouped.summarise(&asg_refs).map_err(|e| format!("{e}"))?;
366            Ok(Some(wrap_view(frame.view())))
367        }
368
369        "ungroup" => {
370            let view = grouped.clone().ungroup();
371            Ok(Some(wrap_view(view)))
372        }
373
374        _ => Ok(None),
375    }
376}
377
378// ============================================================================
379//  Helpers — Value ↔ cjc_data conversions
380// ============================================================================
381
382fn downcast_view(inner: &Rc<dyn Any>) -> Result<&TidyView, String> {
383    inner
384        .downcast_ref::<TidyView>()
385        .ok_or_else(|| "internal error: TidyView downcast failed".to_string())
386}
387
388fn downcast_grouped(inner: &Rc<dyn Any>) -> Result<&GroupedTidyView, String> {
389    inner
390        .downcast_ref::<GroupedTidyView>()
391        .ok_or_else(|| "internal error: GroupedTidyView downcast failed".to_string())
392}
393
394/// Wrap a `TidyView` into `Value::TidyView`.
395pub fn wrap_view(view: TidyView) -> Value {
396    Value::TidyView(Rc::new(view) as Rc<dyn Any>)
397}
398
399/// Wrap a `GroupedTidyView` into `Value::GroupedTidyView`.
400pub fn wrap_grouped(grouped: GroupedTidyView) -> Value {
401    Value::GroupedTidyView(Rc::new(grouped) as Rc<dyn Any>)
402}
403
404/// Convert `Value::String` → `String`.
405fn value_to_string(v: &Value) -> Result<String, String> {
406    match v {
407        Value::String(s) => Ok(s.as_ref().clone()),
408        _ => Err(format!("expected String, got {}", v.type_name())),
409    }
410}
411
412/// Convert `Value::Int` → `usize`.
413fn value_to_usize(v: &Value) -> Result<usize, String> {
414    match v {
415        Value::Int(i) if *i >= 0 => Ok(*i as usize),
416        Value::Int(i) => Err(format!("expected non-negative Int, got {i}")),
417        _ => Err(format!("expected Int, got {}", v.type_name())),
418    }
419}
420
421/// Convert `Value::Array([String, ...])` → `Vec<String>`.
422fn value_to_str_vec(v: &Value) -> Result<Vec<String>, String> {
423    match v {
424        Value::Array(arr) => arr
425            .iter()
426            .map(|v| match v {
427                Value::String(s) => Ok(s.as_ref().clone()),
428                _ => Err(format!("expected String in array, got {}", v.type_name())),
429            })
430            .collect(),
431        _ => Err(format!("expected Array, got {}", v.type_name())),
432    }
433}
434
435/// Parse a `Value::Struct { name: "DExpr", ... }` into a `DExpr`.
436///
437/// The CJC language constructs DExpr values via helper builtins:
438///   col("name")        → Struct { name: "DExpr", kind: "col", value: "name" }
439///   binop(">", l, r)   → Struct { name: "DExpr", kind: "binop", op: ">", left: l, right: r }
440///   lit_int(42)         → Struct { name: "DExpr", kind: "lit_int", value: 42 }
441///   etc.
442///
443/// For ergonomic use, we also accept raw literals directly:
444///   Value::Int(42)      → DExpr::LitInt(42)
445///   Value::Float(3.14)  → DExpr::LitFloat(3.14)
446///   Value::Bool(true)   → DExpr::LitBool(true)
447///   Value::String("x")  → DExpr::Col("x")   -- shorthand for col("x")
448pub fn value_to_dexpr(v: &Value) -> Result<DExpr, String> {
449    match v {
450        // Literal shorthand
451        Value::Int(i) => Ok(DExpr::LitInt(*i)),
452        Value::Float(f) => Ok(DExpr::LitFloat(*f)),
453        Value::Bool(b) => Ok(DExpr::LitBool(*b)),
454        Value::String(s) => Ok(DExpr::Col(s.as_ref().clone())),
455        // Struct-encoded DExpr
456        Value::Struct { name, fields } if name == "DExpr" => {
457            let kind = fields
458                .get("kind")
459                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
460                .ok_or("DExpr struct missing 'kind' string field")?;
461            match kind {
462                "col" => {
463                    let col_name = fields
464                        .get("value")
465                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
466                        .ok_or("DExpr col missing 'value' string field")?;
467                    Ok(DExpr::Col(col_name))
468                }
469                "lit_int" => {
470                    let val = fields
471                        .get("value")
472                        .and_then(|v| if let Value::Int(i) = v { Some(*i) } else { None })
473                        .ok_or("DExpr lit_int missing 'value' int field")?;
474                    Ok(DExpr::LitInt(val))
475                }
476                "lit_float" => {
477                    let val = fields
478                        .get("value")
479                        .and_then(|v| if let Value::Float(f) = v { Some(*f) } else { None })
480                        .ok_or("DExpr lit_float missing 'value' float field")?;
481                    Ok(DExpr::LitFloat(val))
482                }
483                "lit_bool" => {
484                    let val = fields
485                        .get("value")
486                        .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
487                        .ok_or("DExpr lit_bool missing 'value' bool field")?;
488                    Ok(DExpr::LitBool(val))
489                }
490                "lit_str" => {
491                    let val = fields
492                        .get("value")
493                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
494                        .ok_or("DExpr lit_str missing 'value' string field")?;
495                    Ok(DExpr::LitStr(val))
496                }
497                "binop" => {
498                    let op_str = fields
499                        .get("op")
500                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
501                        .ok_or("DExpr binop missing 'op' field")?;
502                    let op = parse_binop(op_str)?;
503                    let left = fields.get("left").ok_or("DExpr binop missing 'left'")?;
504                    let right = fields.get("right").ok_or("DExpr binop missing 'right'")?;
505                    Ok(DExpr::BinOp {
506                        op,
507                        left: Box::new(value_to_dexpr(left)?),
508                        right: Box::new(value_to_dexpr(right)?),
509                    })
510                }
511                "count" => Ok(DExpr::Count),
512                other => Err(format!("unknown DExpr kind: {other}")),
513            }
514        }
515        _ => Err(format!(
516            "cannot convert {} to DExpr (expected DExpr struct, Int, Float, Bool, or String)",
517            v.type_name()
518        )),
519    }
520}
521
522fn parse_binop(s: &str) -> Result<DBinOp, String> {
523    match s {
524        "+" | "add" => Ok(DBinOp::Add),
525        "-" | "sub" => Ok(DBinOp::Sub),
526        "*" | "mul" => Ok(DBinOp::Mul),
527        "/" | "div" => Ok(DBinOp::Div),
528        ">" | "gt" => Ok(DBinOp::Gt),
529        "<" | "lt" => Ok(DBinOp::Lt),
530        ">=" | "ge" => Ok(DBinOp::Ge),
531        "<=" | "le" => Ok(DBinOp::Le),
532        "==" | "eq" => Ok(DBinOp::Eq),
533        "!=" | "ne" => Ok(DBinOp::Ne),
534        "&&" | "and" => Ok(DBinOp::And),
535        "||" | "or" => Ok(DBinOp::Or),
536        other => Err(format!("unknown binop: {other}")),
537    }
538}
539
540/// Parse a `Value::Struct` representing a TidyAgg, e.g.:
541///   Struct { name: "TidyAgg", kind: "sum", col: "salary" }
542///   Struct { name: "TidyAgg", kind: "count" }
543fn value_to_tidy_agg(v: &Value) -> Result<TidyAgg, String> {
544    match v {
545        Value::Struct { name, fields } if name == "TidyAgg" => {
546            let kind = fields
547                .get("kind")
548                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
549                .ok_or("TidyAgg struct missing 'kind' string")?;
550            match kind {
551                "count" => Ok(TidyAgg::Count),
552                "sum" | "mean" | "min" | "max" | "first" | "last"
553                | "median" | "sd" | "var" | "n_distinct" | "iqr" => {
554                    let col = fields
555                        .get("col")
556                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
557                        .ok_or_else(|| format!("TidyAgg {kind} missing 'col' string"))?;
558                    match kind {
559                        "sum" => Ok(TidyAgg::Sum(col)),
560                        "mean" => Ok(TidyAgg::Mean(col)),
561                        "min" => Ok(TidyAgg::Min(col)),
562                        "max" => Ok(TidyAgg::Max(col)),
563                        "first" => Ok(TidyAgg::First(col)),
564                        "last" => Ok(TidyAgg::Last(col)),
565                        "median" => Ok(TidyAgg::Median(col)),
566                        "sd" => Ok(TidyAgg::Sd(col)),
567                        "var" => Ok(TidyAgg::Var(col)),
568                        "n_distinct" => Ok(TidyAgg::NDistinct(col)),
569                        "iqr" => Ok(TidyAgg::Iqr(col)),
570                        _ => unreachable!(),
571                    }
572                }
573                "quantile" => {
574                    let col = fields
575                        .get("col")
576                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
577                        .ok_or("TidyAgg quantile missing 'col' string")?;
578                    let p = fields
579                        .get("p")
580                        .and_then(|v| match v {
581                            Value::Float(f) => Some(*f),
582                            Value::Int(i) => Some(*i as f64),
583                            _ => None,
584                        })
585                        .ok_or("TidyAgg quantile missing 'p' float")?;
586                    Ok(TidyAgg::Quantile(col, p))
587                }
588                other => Err(format!("unknown TidyAgg kind: {other}")),
589            }
590        }
591        _ => Err(format!("expected TidyAgg struct, got {}", v.type_name())),
592    }
593}
594
595/// Parse ArrangeKey array. Each element can be:
596///   - String "col_name"       → ascending
597///   - Struct { name: "ArrangeKey", col: "name", desc: bool }
598fn value_to_arrange_keys(v: &Value) -> Result<Vec<ArrangeKey>, String> {
599    match v {
600        Value::Array(arr) => {
601            let mut keys = Vec::with_capacity(arr.len());
602            for item in arr.iter() {
603                match item {
604                    Value::String(s) => keys.push(ArrangeKey::asc(s)),
605                    Value::Struct { name, fields } if name == "ArrangeKey" => {
606                        let col = fields
607                            .get("col")
608                            .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
609                            .ok_or("ArrangeKey missing 'col'")?;
610                        let desc = fields
611                            .get("desc")
612                            .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
613                            .unwrap_or(false);
614                        keys.push(if desc { ArrangeKey::desc(col) } else { ArrangeKey::asc(col) });
615                    }
616                    _ => return Err(format!("arrange key must be String or ArrangeKey struct, got {}", item.type_name())),
617                }
618            }
619            Ok(keys)
620        }
621        _ => Err(format!("arrange requires Array of keys, got {}", v.type_name())),
622    }
623}
624
625/// Parse rename pairs from `[["old","new"], ["old2","new2"]]`.
626fn value_to_rename_pairs(v: &Value) -> Result<Vec<(String, String)>, String> {
627    match v {
628        Value::Array(arr) => {
629            let mut pairs = Vec::with_capacity(arr.len());
630            for item in arr.iter() {
631                match item {
632                    Value::Array(pair) if pair.len() == 2 => {
633                        let old = value_to_string(&pair[0])?;
634                        let new = value_to_string(&pair[1])?;
635                        pairs.push((old, new));
636                    }
637                    _ => return Err("rename pairs must be arrays of [old, new] strings".into()),
638                }
639            }
640            Ok(pairs)
641        }
642        _ => Err(format!("rename requires Array of pairs, got {}", v.type_name())),
643    }
644}
645
646// ============================================================================
647//  Join dispatcher
648// ============================================================================
649
650/// Dispatch inner_join / left_join / semi_join / anti_join.
651///
652/// The CJC API is: `view.inner_join(other, left_on, right_on)`.
653/// The Rust API is: `view.inner_join(&other, &[(&left_on, &right_on)])`.
654fn dispatch_join(
655    view: &TidyView,
656    args: &[Value],
657    kind: &str,
658) -> Result<Option<Value>, String> {
659    if args.len() != 3 {
660        return Err(format!(
661            "TidyView.{kind} requires 3 args: other_view, left_on, right_on"
662        ));
663    }
664    let other_rc = match &args[0] {
665        Value::TidyView(rc) => rc,
666        _ => return Err(format!("{kind}: first arg must be a TidyView")),
667    };
668    let other = downcast_view(other_rc)?;
669    let left_on = value_to_string(&args[1])?;
670    let right_on = value_to_string(&args[2])?;
671    let on_pairs: Vec<(&str, &str)> = vec![(&left_on, &right_on)];
672
673    match kind {
674        "inner_join" => {
675            let frame = view.inner_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
676            Ok(Some(wrap_view(frame.view())))
677        }
678        "left_join" => {
679            let frame = view.left_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
680            Ok(Some(wrap_view(frame.view())))
681        }
682        "semi_join" => {
683            let new_view = view.semi_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
684            Ok(Some(wrap_view(new_view)))
685        }
686        "anti_join" => {
687            let new_view = view.anti_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
688            Ok(Some(wrap_view(new_view)))
689        }
690        _ => Ok(None),
691    }
692}
693
694// ============================================================================
695//  Column → Value conversion
696// ============================================================================
697
698/// Convert a `Column` to a `Value::Array`.
699fn column_to_value(col: &Column) -> Value {
700    let vals: Vec<Value> = match col {
701        Column::Int(v) => v.iter().map(|i| Value::Int(*i)).collect(),
702        Column::Float(v) => v.iter().map(|f| Value::Float(*f)).collect(),
703        Column::Str(v) => v
704            .iter()
705            .map(|s| Value::String(Rc::new(s.clone())))
706            .collect(),
707        Column::Bool(v) => v.iter().map(|b| Value::Bool(*b)).collect(),
708        Column::Categorical { levels, codes } => codes
709            .iter()
710            .map(|&c| Value::String(Rc::new(levels[c as usize].clone())))
711            .collect(),
712        Column::DateTime(v) => v.iter().map(|i| Value::Int(*i)).collect(),
713    };
714    Value::Array(Rc::new(vals))
715}
716
717// ============================================================================
718//  DataFrame → Value (for .collect())
719// ============================================================================
720
721/// Convert a `DataFrame` to the legacy `Value::Struct { name: "DataFrame" }`
722/// representation used by existing CJC code.
723pub fn dataframe_to_value(df: DataFrame) -> Value {
724    let mut fields = std::collections::BTreeMap::new();
725    let mut col_names: Vec<Value> = Vec::new();
726    let nrows = df.nrows();
727    for (name, col) in &df.columns {
728        col_names.push(Value::String(Rc::new(name.clone())));
729        fields.insert(name.clone(), column_to_value(col));
730    }
731    fields.insert(
732        "__columns".to_string(),
733        Value::Array(Rc::new(col_names)),
734    );
735    fields.insert("__nrows".to_string(), Value::Int(nrows as i64));
736    Value::Struct {
737        name: "DataFrame".to_string(),
738        fields,
739    }
740}
741
742/// Produce a human-readable table-formatted string from a DataFrame.
743fn format_dataframe(df: &DataFrame) -> String {
744    let ncols = df.ncols();
745    let nrows = df.nrows();
746    if ncols == 0 {
747        return "DataFrame(0x0)".to_string();
748    }
749
750    // Column names
751    let names: Vec<&str> = df.columns.iter().map(|(n, _)| n.as_str()).collect();
752
753    // Compute widths
754    let mut widths: Vec<usize> = names.iter().map(|n| n.len()).collect();
755    let display_rows = nrows.min(20); // cap at 20 rows for display
756    let mut cells: Vec<Vec<String>> = Vec::with_capacity(display_rows);
757    for r in 0..display_rows {
758        let mut row: Vec<String> = Vec::with_capacity(ncols);
759        for (ci, (_, col)) in df.columns.iter().enumerate() {
760            let s = col.get_display(r);
761            if s.len() > widths[ci] {
762                widths[ci] = s.len();
763            }
764            row.push(s);
765        }
766        cells.push(row);
767    }
768
769    let mut out = String::new();
770    // Header
771    for (ci, name) in names.iter().enumerate() {
772        if ci > 0 { out.push_str("  "); }
773        out.push_str(&format!("{:>width$}", name, width = widths[ci]));
774    }
775    out.push('\n');
776    // Rows
777    for row in &cells {
778        for (ci, cell) in row.iter().enumerate() {
779            if ci > 0 { out.push_str("  "); }
780            out.push_str(&format!("{:>width$}", cell, width = widths[ci]));
781        }
782        out.push('\n');
783    }
784    if nrows > display_rows {
785        out.push_str(&format!("... ({} more rows)\n", nrows - display_rows));
786    }
787    out
788}
789
790/// Produce a statistical summary (like R's `summary()` or pandas `.describe()`).
791///
792/// For numeric columns: count, mean, std, min, 25%, 50%, 75%, max.
793/// For string/bool columns: count, unique, top (most frequent).
794fn format_describe(df: &DataFrame) -> String {
795    use cjc_repro::KahanAccumulatorF64;
796    let nrows = df.nrows();
797    let mut out = String::new();
798    out.push_str(&format!("DataFrame: {} rows x {} columns\n\n", nrows, df.ncols()));
799
800    for (name, col) in &df.columns {
801        out.push_str(&format!("── {} ({}) ──\n", name, col.type_name()));
802        match col {
803            Column::Int(v) => {
804                if v.is_empty() {
805                    out.push_str("  (empty)\n");
806                    continue;
807                }
808                let mut sorted = v.clone();
809                sorted.sort();
810                let mut acc = KahanAccumulatorF64::new();
811                for &x in v { acc.add(x as f64); }
812                let mean = acc.finalize() / nrows as f64;
813                // Variance via second pass (Welford-like but simple two-pass for determinism)
814                let mut var_acc = KahanAccumulatorF64::new();
815                for &x in v { let d = x as f64 - mean; var_acc.add(d * d); }
816                let std = if nrows > 1 { (var_acc.finalize() / (nrows - 1) as f64).sqrt() } else { 0.0 };
817                out.push_str(&format!("  count: {}\n", nrows));
818                out.push_str(&format!("  mean:  {:.4}\n", mean));
819                out.push_str(&format!("  std:   {:.4}\n", std));
820                out.push_str(&format!("  min:   {}\n", sorted[0]));
821                out.push_str(&format!("  25%:   {}\n", sorted[nrows / 4]));
822                out.push_str(&format!("  50%:   {}\n", sorted[nrows / 2]));
823                out.push_str(&format!("  75%:   {}\n", sorted[3 * nrows / 4]));
824                out.push_str(&format!("  max:   {}\n", sorted[nrows - 1]));
825            }
826            Column::Float(v) => {
827                if v.is_empty() {
828                    out.push_str("  (empty)\n");
829                    continue;
830                }
831                let mut sorted = v.clone();
832                sorted.sort_by(|a, b| a.total_cmp(b));
833                let mut acc = KahanAccumulatorF64::new();
834                for &x in v { acc.add(x); }
835                let mean = acc.finalize() / nrows as f64;
836                let mut var_acc = KahanAccumulatorF64::new();
837                for &x in v { let d = x - mean; var_acc.add(d * d); }
838                let std = if nrows > 1 { (var_acc.finalize() / (nrows - 1) as f64).sqrt() } else { 0.0 };
839                out.push_str(&format!("  count: {}\n", nrows));
840                out.push_str(&format!("  mean:  {:.4}\n", mean));
841                out.push_str(&format!("  std:   {:.4}\n", std));
842                out.push_str(&format!("  min:   {:.4}\n", sorted[0]));
843                out.push_str(&format!("  25%:   {:.4}\n", sorted[nrows / 4]));
844                out.push_str(&format!("  50%:   {:.4}\n", sorted[nrows / 2]));
845                out.push_str(&format!("  75%:   {:.4}\n", sorted[3 * nrows / 4]));
846                out.push_str(&format!("  max:   {:.4}\n", sorted[nrows - 1]));
847            }
848            Column::Str(v) => {
849                let mut freq = std::collections::BTreeMap::new();
850                for s in v { *freq.entry(s.as_str()).or_insert(0usize) += 1; }
851                let unique = freq.len();
852                let top = freq.iter().max_by_key(|(_, &c)| c).map(|(s, _)| *s).unwrap_or("");
853                out.push_str(&format!("  count:  {}\n", nrows));
854                out.push_str(&format!("  unique: {}\n", unique));
855                out.push_str(&format!("  top:    {}\n", top));
856            }
857            Column::Bool(v) => {
858                let trues = v.iter().filter(|&&b| b).count();
859                out.push_str(&format!("  count: {}\n", nrows));
860                out.push_str(&format!("  true:  {}\n", trues));
861                out.push_str(&format!("  false: {}\n", nrows - trues));
862            }
863            Column::Categorical { levels, codes } => {
864                let n_levels = levels.len();
865                let mut freq = std::collections::BTreeMap::new();
866                for &c in codes { *freq.entry(c).or_insert(0usize) += 1; }
867                let top_code = freq.iter().max_by_key(|(_, &c)| c).map(|(&k, _)| k).unwrap_or(0);
868                let top = if (top_code as usize) < levels.len() { &levels[top_code as usize] } else { "?" };
869                out.push_str(&format!("  count:  {}\n", nrows));
870                out.push_str(&format!("  levels: {}\n", n_levels));
871                out.push_str(&format!("  top:    {}\n", top));
872            }
873            Column::DateTime(v) => {
874                if v.is_empty() {
875                    out.push_str("  (empty)\n");
876                    continue;
877                }
878                let mut sorted = v.clone();
879                sorted.sort();
880                out.push_str(&format!("  count: {}\n", nrows));
881                out.push_str(&format!("  min:   {} (epoch ms)\n", sorted[0]));
882                out.push_str(&format!("  max:   {} (epoch ms)\n", sorted[nrows - 1]));
883            }
884        }
885    }
886    out
887}
888
889/// Produce a transposed glimpse (like dplyr::glimpse() or tibble printing).
890///
891/// Shows each column as a row: name, type, and first few values.
892fn format_glimpse(df: &DataFrame) -> String {
893    let nrows = df.nrows();
894    let ncols = df.ncols();
895    let mut out = String::new();
896    out.push_str(&format!("Rows: {}\nColumns: {}\n", nrows, ncols));
897
898    // Find max column name width for alignment
899    let max_name_w = df.columns.iter().map(|(n, _)| n.len()).max().unwrap_or(0);
900    let max_type_w = df.columns.iter().map(|(_, c)| c.type_name().len()).max().unwrap_or(0);
901
902    let preview_count = nrows.min(8);
903    for (name, col) in &df.columns {
904        out.push_str(&format!("$ {:width_n$} <{:width_t$}>  ",
905            name, col.type_name(),
906            width_n = max_name_w, width_t = max_type_w));
907        let mut vals = Vec::with_capacity(preview_count);
908        for i in 0..preview_count {
909            vals.push(col.get_display(i));
910        }
911        out.push_str(&vals.join(", "));
912        if nrows > preview_count {
913            out.push_str(", ...");
914        }
915        out.push('\n');
916    }
917    out
918}
919
920// ============================================================================
921//  DExpr builder builtins (col, binop, agg, etc.)
922// ============================================================================
923
924/// Build a `Value::Struct { name: "DExpr", kind: "col", ... }` from a column name.
925pub fn build_col_expr(name: &str) -> Value {
926    let mut fields = std::collections::BTreeMap::new();
927    fields.insert("kind".to_string(), Value::String(Rc::new("col".to_string())));
928    fields.insert("value".to_string(), Value::String(Rc::new(name.to_string())));
929    Value::Struct { name: "DExpr".to_string(), fields }
930}
931
932/// Build a DExpr binary operation.
933pub fn build_binop_expr(op: &str, left: Value, right: Value) -> Value {
934    let mut fields = std::collections::BTreeMap::new();
935    fields.insert("kind".to_string(), Value::String(Rc::new("binop".to_string())));
936    fields.insert("op".to_string(), Value::String(Rc::new(op.to_string())));
937    fields.insert("left".to_string(), left);
938    fields.insert("right".to_string(), right);
939    Value::Struct { name: "DExpr".to_string(), fields }
940}
941
942/// Build a TidyAgg struct value.
943pub fn build_tidy_agg(kind: &str, col: Option<&str>) -> Value {
944    let mut fields = std::collections::BTreeMap::new();
945    fields.insert("kind".to_string(), Value::String(Rc::new(kind.to_string())));
946    if let Some(c) = col {
947        fields.insert("col".to_string(), Value::String(Rc::new(c.to_string())));
948    }
949    Value::Struct { name: "TidyAgg".to_string(), fields }
950}
951
952/// Build an ArrangeKey struct value.
953pub fn build_arrange_key(col: &str, descending: bool) -> Value {
954    let mut fields = std::collections::BTreeMap::new();
955    fields.insert("col".to_string(), Value::String(Rc::new(col.to_string())));
956    fields.insert("desc".to_string(), Value::Bool(descending));
957    Value::Struct { name: "ArrangeKey".to_string(), fields }
958}
959
960/// Dispatch builder builtins like `col()`, `desc()`, `asc()`, `sum()`, `mean()`, etc.
961/// Returns `Ok(Some(value))` if recognised, `Ok(None)` otherwise.
962pub fn dispatch_tidy_builtin(name: &str, args: &[Value]) -> Result<Option<Value>, String> {
963    match name {
964        // DExpr builders
965        "col" => {
966            if args.len() != 1 {
967                return Err("col() requires 1 argument: column name".into());
968            }
969            let name = value_to_string(&args[0])?;
970            Ok(Some(build_col_expr(&name)))
971        }
972        "desc" => {
973            if args.len() != 1 {
974                return Err("desc() requires 1 argument: column name".into());
975            }
976            let name = value_to_string(&args[0])?;
977            Ok(Some(build_arrange_key(&name, true)))
978        }
979        "asc" => {
980            if args.len() != 1 {
981                return Err("asc() requires 1 argument: column name".into());
982            }
983            let name = value_to_string(&args[0])?;
984            Ok(Some(build_arrange_key(&name, false)))
985        }
986        // DExpr binary op builder
987        "dexpr_binop" => {
988            if args.len() != 3 {
989                return Err("dexpr_binop() requires 3 args: op, left, right".into());
990            }
991            let op = value_to_string(&args[0])?;
992            Ok(Some(build_binop_expr(&op, args[1].clone(), args[2].clone())))
993        }
994
995        // TidyAgg builders
996        "tidy_count" => Ok(Some(build_tidy_agg("count", None))),
997        "tidy_sum" => {
998            if args.len() != 1 { return Err("tidy_sum() requires 1 argument: column name".into()); }
999            let col = value_to_string(&args[0])?;
1000            Ok(Some(build_tidy_agg("sum", Some(&col))))
1001        }
1002        "tidy_mean" => {
1003            if args.len() != 1 { return Err("tidy_mean() requires 1 argument: column name".into()); }
1004            let col = value_to_string(&args[0])?;
1005            Ok(Some(build_tidy_agg("mean", Some(&col))))
1006        }
1007        "tidy_min" => {
1008            if args.len() != 1 { return Err("tidy_min() requires 1 argument: column name".into()); }
1009            let col = value_to_string(&args[0])?;
1010            Ok(Some(build_tidy_agg("min", Some(&col))))
1011        }
1012        "tidy_max" => {
1013            if args.len() != 1 { return Err("tidy_max() requires 1 argument: column name".into()); }
1014            let col = value_to_string(&args[0])?;
1015            Ok(Some(build_tidy_agg("max", Some(&col))))
1016        }
1017        "tidy_first" => {
1018            if args.len() != 1 { return Err("tidy_first() requires 1 argument: column name".into()); }
1019            let col = value_to_string(&args[0])?;
1020            Ok(Some(build_tidy_agg("first", Some(&col))))
1021        }
1022        "tidy_last" => {
1023            if args.len() != 1 { return Err("tidy_last() requires 1 argument: column name".into()); }
1024            let col = value_to_string(&args[0])?;
1025            Ok(Some(build_tidy_agg("last", Some(&col))))
1026        }
1027
1028        // =====================================================================
1029        //  stringr builtins — byte-first string view approach
1030        //
1031        //  CJC strings are UTF-8 byte sequences. These functions operate on the
1032        //  byte representation via cjc-regex's Thompson NFA. Where possible,
1033        //  results are slices (zero-copy views) of the input. Allocation happens
1034        //  only when replacement or splitting creates new buffers.
1035        //
1036        //  Key design point: patterns are compiled fresh per call. For hot-loop
1037        //  use, prefer the compiled Regex value type (regex literal `/pattern/`).
1038        // =====================================================================
1039
1040        "str_detect" => {
1041            // str_detect(haystack, pattern) → bool
1042            if args.len() != 2 { return Err("str_detect requires 2 args: string, pattern".into()); }
1043            let hay = value_to_string(&args[0])?;
1044            let pat = value_to_string(&args[1])?;
1045            let matched = cjc_regex::is_match(&pat, "", hay.as_bytes());
1046            Ok(Some(Value::Bool(matched)))
1047        }
1048        "str_extract" => {
1049            // str_extract(haystack, pattern) → string (first match) or ""
1050            if args.len() != 2 { return Err("str_extract requires 2 args: string, pattern".into()); }
1051            let hay = value_to_string(&args[0])?;
1052            let pat = value_to_string(&args[1])?;
1053            match cjc_regex::find(&pat, "", hay.as_bytes()) {
1054                Some((start, end)) => {
1055                    let slice = &hay.as_bytes()[start..end];
1056                    let s = String::from_utf8_lossy(slice).to_string();
1057                    Ok(Some(Value::String(Rc::new(s))))
1058                }
1059                None => Ok(Some(Value::String(Rc::new(String::new())))),
1060            }
1061        }
1062        "str_extract_all" => {
1063            // str_extract_all(haystack, pattern) → [string]
1064            if args.len() != 2 { return Err("str_extract_all requires 2 args: string, pattern".into()); }
1065            let hay = value_to_string(&args[0])?;
1066            let pat = value_to_string(&args[1])?;
1067            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
1068            let vals: Vec<Value> = matches
1069                .iter()
1070                .map(|&(start, end)| {
1071                    let slice = &hay.as_bytes()[start..end];
1072                    Value::String(Rc::new(String::from_utf8_lossy(slice).to_string()))
1073                })
1074                .collect();
1075            Ok(Some(Value::Array(Rc::new(vals))))
1076        }
1077        "str_replace" => {
1078            // str_replace(haystack, pattern, replacement) → string (first match replaced)
1079            if args.len() != 3 { return Err("str_replace requires 3 args: string, pattern, replacement".into()); }
1080            let hay = value_to_string(&args[0])?;
1081            let pat = value_to_string(&args[1])?;
1082            let rep = value_to_string(&args[2])?;
1083            match cjc_regex::find(&pat, "", hay.as_bytes()) {
1084                Some((start, end)) => {
1085                    let mut result = String::with_capacity(hay.len());
1086                    result.push_str(&hay[..start]);
1087                    result.push_str(&rep);
1088                    result.push_str(&hay[end..]);
1089                    Ok(Some(Value::String(Rc::new(result))))
1090                }
1091                None => Ok(Some(Value::String(Rc::new(hay)))),
1092            }
1093        }
1094        "str_replace_all" => {
1095            // str_replace_all(haystack, pattern, replacement) → string (all matches replaced)
1096            if args.len() != 3 { return Err("str_replace_all requires 3 args: string, pattern, replacement".into()); }
1097            let hay = value_to_string(&args[0])?;
1098            let pat = value_to_string(&args[1])?;
1099            let rep = value_to_string(&args[2])?;
1100            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
1101            if matches.is_empty() {
1102                return Ok(Some(Value::String(Rc::new(hay))));
1103            }
1104            let mut result = String::with_capacity(hay.len());
1105            let mut last_end = 0;
1106            for &(start, end) in &matches {
1107                result.push_str(&hay[last_end..start]);
1108                result.push_str(&rep);
1109                last_end = end;
1110            }
1111            result.push_str(&hay[last_end..]);
1112            Ok(Some(Value::String(Rc::new(result))))
1113        }
1114        "str_split" => {
1115            // str_split(haystack, pattern) → [string]
1116            if args.len() != 2 { return Err("str_split requires 2 args: string, pattern".into()); }
1117            let hay = value_to_string(&args[0])?;
1118            let pat = value_to_string(&args[1])?;
1119            let spans = cjc_regex::split(&pat, "", hay.as_bytes());
1120            let vals: Vec<Value> = spans
1121                .iter()
1122                .map(|&(start, end)| {
1123                    Value::String(Rc::new(
1124                        String::from_utf8_lossy(&hay.as_bytes()[start..end]).to_string(),
1125                    ))
1126                })
1127                .collect();
1128            Ok(Some(Value::Array(Rc::new(vals))))
1129        }
1130        "str_count" => {
1131            // str_count(haystack, pattern) → int (number of matches)
1132            if args.len() != 2 { return Err("str_count requires 2 args: string, pattern".into()); }
1133            let hay = value_to_string(&args[0])?;
1134            let pat = value_to_string(&args[1])?;
1135            let count = cjc_regex::find_all(&pat, "", hay.as_bytes()).len();
1136            Ok(Some(Value::Int(count as i64)))
1137        }
1138        "str_trim" => {
1139            // str_trim(string) → string with leading/trailing whitespace removed
1140            if args.len() != 1 { return Err("str_trim requires 1 arg: string".into()); }
1141            let s = value_to_string(&args[0])?;
1142            Ok(Some(Value::String(Rc::new(s.trim().to_string()))))
1143        }
1144        "str_to_upper" => {
1145            if args.len() != 1 { return Err("str_to_upper requires 1 arg: string".into()); }
1146            let s = value_to_string(&args[0])?;
1147            Ok(Some(Value::String(Rc::new(s.to_uppercase()))))
1148        }
1149        "str_to_lower" => {
1150            if args.len() != 1 { return Err("str_to_lower requires 1 arg: string".into()); }
1151            let s = value_to_string(&args[0])?;
1152            Ok(Some(Value::String(Rc::new(s.to_lowercase()))))
1153        }
1154        "str_starts" => {
1155            if args.len() != 2 { return Err("str_starts requires 2 args: string, prefix".into()); }
1156            let s = value_to_string(&args[0])?;
1157            let prefix = value_to_string(&args[1])?;
1158            Ok(Some(Value::Bool(s.starts_with(&prefix))))
1159        }
1160        "str_ends" => {
1161            if args.len() != 2 { return Err("str_ends requires 2 args: string, suffix".into()); }
1162            let s = value_to_string(&args[0])?;
1163            let suffix = value_to_string(&args[1])?;
1164            Ok(Some(Value::Bool(s.ends_with(&suffix))))
1165        }
1166        "str_sub" => {
1167            // str_sub(string, start, end) → substring (byte-indexed, clamped)
1168            if args.len() != 3 { return Err("str_sub requires 3 args: string, start, end".into()); }
1169            let s = value_to_string(&args[0])?;
1170            let start = value_to_usize(&args[1])?.min(s.len());
1171            let end = value_to_usize(&args[2])?.min(s.len());
1172            if start > end {
1173                Ok(Some(Value::String(Rc::new(String::new()))))
1174            } else {
1175                // Clamp to char boundaries for safety
1176                let actual_start = clamp_to_char_boundary(&s, start);
1177                let actual_end = clamp_to_char_boundary(&s, end);
1178                Ok(Some(Value::String(Rc::new(s[actual_start..actual_end].to_string()))))
1179            }
1180        }
1181        "str_len" => {
1182            // str_len(string) → int (byte length, consistent with byte-first view)
1183            if args.len() != 1 { return Err("str_len requires 1 arg: string".into()); }
1184            let s = value_to_string(&args[0])?;
1185            Ok(Some(Value::Int(s.len() as i64)))
1186        }
1187
1188        // =====================================================================
1189        //  Stats builtins (operate on Array of numbers)
1190        // =====================================================================
1191
1192        "median" => {
1193            if args.len() != 1 { return Err("median requires 1 arg: numeric array".into()); }
1194            let nums = value_to_f64_vec(&args[0])?;
1195            if nums.is_empty() {
1196                return Ok(Some(Value::Float(f64::NAN)));
1197            }
1198            let mut sorted = nums;
1199            sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1200            let mid = sorted.len() / 2;
1201            let med = if sorted.len() % 2 == 0 {
1202                (sorted[mid - 1] + sorted[mid]) / 2.0
1203            } else {
1204                sorted[mid]
1205            };
1206            Ok(Some(Value::Float(med)))
1207        }
1208        "sd" => {
1209            // Population standard deviation
1210            if args.len() != 1 { return Err("sd requires 1 arg: numeric array".into()); }
1211            let nums = value_to_f64_vec(&args[0])?;
1212            if nums.len() < 2 {
1213                return Ok(Some(Value::Float(f64::NAN)));
1214            }
1215            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1216            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1217                / (nums.len() - 1) as f64;
1218            Ok(Some(Value::Float(var.sqrt())))
1219        }
1220        "variance" => {
1221            // Sample variance (N-1 denominator)
1222            if args.len() != 1 { return Err("variance requires 1 arg: numeric array".into()); }
1223            let nums = value_to_f64_vec(&args[0])?;
1224            if nums.len() < 2 {
1225                return Ok(Some(Value::Float(f64::NAN)));
1226            }
1227            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1228            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1229                / (nums.len() - 1) as f64;
1230            Ok(Some(Value::Float(var)))
1231        }
1232        "n_distinct" => {
1233            // Count distinct values in an array
1234            if args.len() != 1 { return Err("n_distinct requires 1 arg: array".into()); }
1235            match &args[0] {
1236                Value::Array(arr) => {
1237                    let mut seen = std::collections::BTreeSet::new();
1238                    for v in arr.iter() {
1239                        seen.insert(format!("{v}"));
1240                    }
1241                    Ok(Some(Value::Int(seen.len() as i64)))
1242                }
1243                _ => Err(format!("n_distinct expects Array, got {}", args[0].type_name())),
1244            }
1245        }
1246
1247        _ => Ok(None),
1248    }
1249}
1250
1251/// Clamp a byte index to the nearest char boundary (round down).
1252fn clamp_to_char_boundary(s: &str, idx: usize) -> usize {
1253    if idx >= s.len() {
1254        return s.len();
1255    }
1256    let mut i = idx;
1257    while i > 0 && !s.is_char_boundary(i) {
1258        i -= 1;
1259    }
1260    i
1261}
1262
1263/// Convert a Value::Array of numbers to Vec<f64>.
1264fn value_to_f64_vec(v: &Value) -> Result<Vec<f64>, String> {
1265    match v {
1266        Value::Array(arr) => {
1267            arr.iter()
1268                .map(|v| match v {
1269                    Value::Float(f) => Ok(*f),
1270                    Value::Int(i) => Ok(*i as f64),
1271                    _ => Err(format!("expected numeric value in array, got {}", v.type_name())),
1272                })
1273                .collect()
1274        }
1275        _ => Err(format!("expected Array, got {}", v.type_name())),
1276    }
1277}