Skip to main content

cjc_data/
tidy_dispatch.rs

1//! Shared tidy dispatch: maps CJC language method calls on TidyView /
2//! GroupedTidyView values to the concrete cjc_data API.
3//!
4//! Both `cjc-eval` and `cjc-mir-exec` call into `dispatch_tidy_method` and
5//! `dispatch_grouped_method` so that every tidy operation has a single source
6//! of truth.  The executors only need to pattern-match `Value::TidyView` or
7//! `Value::GroupedTidyView` and delegate here.
8//!
9//! # Error handling
10//! All errors are returned as `Err(String)`.  The caller wraps the string
11//! into its own error type (EvalError / MirExecError).
12
13use std::rc::Rc;
14use std::any::Any;
15
16use cjc_runtime::value::Value;
17
18use crate::{
19    ArrangeKey, Column, DExpr, DBinOp, DataFrame, GroupedTidyView,
20    TidyAgg, TidyView,
21};
22
23// ============================================================================
24//  Public entry points
25// ============================================================================
26
27/// Dispatch a method call on a `Value::TidyView`.
28///
29/// Returns `Ok(Some(value))` if the method is known, `Ok(None)` if not
30/// recognised (allows the caller to fall through to other dispatch paths).
31pub fn dispatch_tidy_method(
32    inner: &Rc<dyn Any>,
33    method: &str,
34    args: &[Value],
35) -> Result<Option<Value>, String> {
36    let view = downcast_view(inner)?;
37    match method {
38        // -- shape ----------------------------------------------------------
39        "nrows" => Ok(Some(Value::Int(view.nrows() as i64))),
40        "ncols" => Ok(Some(Value::Int(view.ncols() as i64))),
41        "column_names" => {
42            let names: Vec<Value> = view
43                .column_names()
44                .into_iter()
45                .map(|s| Value::String(Rc::new(s.to_string())))
46                .collect();
47            Ok(Some(Value::Array(Rc::new(names))))
48        }
49
50        // -- filter ---------------------------------------------------------
51        "filter" => {
52            if args.len() != 1 {
53                return Err("TidyView.filter requires 1 argument: predicate DExpr".into());
54            }
55            let predicate = value_to_dexpr(&args[0])?;
56            let new_view = view.filter(&predicate).map_err(|e| format!("{e}"))?;
57            Ok(Some(wrap_view(new_view)))
58        }
59
60        // -- select ---------------------------------------------------------
61        "select" => {
62            if args.len() != 1 {
63                return Err("TidyView.select requires 1 argument: column names array".into());
64            }
65            let cols = value_to_str_vec(&args[0])?;
66            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
67            let new_view = view.select(&col_refs).map_err(|e| format!("{e}"))?;
68            Ok(Some(wrap_view(new_view)))
69        }
70
71        // -- mutate ---------------------------------------------------------
72        "mutate" => {
73            // mutate(name, expr) or mutate([(name, expr), ...])
74            // We support: mutate("col_name", dexpr_value)
75            if args.len() != 2 {
76                return Err("TidyView.mutate requires 2 arguments: column_name and expression".into());
77            }
78            let col_name = value_to_string(&args[0])?;
79            let expr = value_to_dexpr(&args[1])?;
80            let frame = view.mutate(&[(&col_name, expr)]).map_err(|e| format!("{e}"))?;
81            // mutate returns TidyFrame; convert to TidyView for pipeline continuity
82            Ok(Some(wrap_view(frame.view())))
83        }
84
85        // -- group_by -------------------------------------------------------
86        "group_by" => {
87            if args.len() != 1 {
88                return Err("TidyView.group_by requires 1 argument: key columns array".into());
89            }
90            let keys = value_to_str_vec(&args[0])?;
91            let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
92            let grouped = view.group_by(&key_refs).map_err(|e| format!("{e}"))?;
93            Ok(Some(wrap_grouped(grouped)))
94        }
95
96        // -- arrange --------------------------------------------------------
97        "arrange" => {
98            if args.len() != 1 {
99                return Err("TidyView.arrange requires 1 argument: sort keys array".into());
100            }
101            let keys = value_to_arrange_keys(&args[0])?;
102            let new_view = view.arrange(&keys).map_err(|e| format!("{e}"))?;
103            Ok(Some(wrap_view(new_view)))
104        }
105
106        // -- distinct -------------------------------------------------------
107        "distinct" => {
108            let cols = if args.is_empty() {
109                view.column_names().iter().map(|s| s.to_string()).collect::<Vec<_>>()
110            } else {
111                value_to_str_vec(&args[0])?
112            };
113            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
114            let new_view = view.distinct(&col_refs).map_err(|e| format!("{e}"))?;
115            Ok(Some(wrap_view(new_view)))
116        }
117
118        // -- slice family ---------------------------------------------------
119        "slice" => {
120            if args.len() != 2 {
121                return Err("TidyView.slice requires 2 arguments: start, end".into());
122            }
123            let start = value_to_usize(&args[0])?;
124            let end = value_to_usize(&args[1])?;
125            Ok(Some(wrap_view(view.slice(start, end))))
126        }
127        "slice_head" => {
128            if args.len() != 1 {
129                return Err("TidyView.slice_head requires 1 argument: n".into());
130            }
131            let n = value_to_usize(&args[0])?;
132            Ok(Some(wrap_view(view.slice_head(n))))
133        }
134        "slice_tail" => {
135            if args.len() != 1 {
136                return Err("TidyView.slice_tail requires 1 argument: n".into());
137            }
138            let n = value_to_usize(&args[0])?;
139            Ok(Some(wrap_view(view.slice_tail(n))))
140        }
141        "slice_sample" => {
142            if args.len() != 2 {
143                return Err("TidyView.slice_sample requires 2 arguments: n, seed".into());
144            }
145            let n = value_to_usize(&args[0])?;
146            let seed = match &args[1] {
147                Value::Int(i) => *i as u64,
148                _ => return Err("slice_sample seed must be Int".into()),
149            };
150            Ok(Some(wrap_view(view.slice_sample(n, seed))))
151        }
152
153        // -- joins ----------------------------------------------------------
154        "inner_join" | "left_join" | "semi_join" | "anti_join" => {
155            dispatch_join(view, args, method)
156        }
157
158        // -- reshape --------------------------------------------------------
159        "pivot_longer" => {
160            if args.len() < 2 || args.len() > 3 {
161                return Err(
162                    "TidyView.pivot_longer requires 2-3 args: cols, names_to, [values_to]".into(),
163                );
164            }
165            let cols = value_to_str_vec(&args[0])?;
166            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
167            let names_to = value_to_string(&args[1])?;
168            let values_to = if args.len() == 3 {
169                value_to_string(&args[2])?
170            } else {
171                "value".to_string()
172            };
173            let frame = view
174                .pivot_longer(&col_refs, &names_to, &values_to)
175                .map_err(|e| format!("{e}"))?;
176            Ok(Some(wrap_view(frame.view())))
177        }
178        "pivot_wider" => {
179            if args.len() != 3 {
180                return Err(
181                    "TidyView.pivot_wider requires 3 args: id_cols, names_from, values_from"
182                        .into(),
183                );
184            }
185            let id_cols = value_to_str_vec(&args[0])?;
186            let id_refs: Vec<&str> = id_cols.iter().map(|s| s.as_str()).collect();
187            let names_from = value_to_string(&args[1])?;
188            let values_from = value_to_string(&args[2])?;
189            let nullable_frame = view
190                .pivot_wider(&id_refs, &names_from, &values_from)
191                .map_err(|e| format!("{e}"))?;
192            // NullableFrame → fill nulls with defaults → TidyView
193            Ok(Some(wrap_view(nullable_frame.to_tidy_view_filled())))
194        }
195
196        // -- rename / relocate / drop_cols / bind ----------------------------
197        "rename" => {
198            if args.len() != 1 {
199                return Err("TidyView.rename requires 1 argument: array of [old, new] pairs".into());
200            }
201            let pairs = value_to_rename_pairs(&args[0])?;
202            let pair_refs: Vec<(&str, &str)> =
203                pairs.iter().map(|(a, b)| (a.as_str(), b.as_str())).collect();
204            let new_view = view.rename(&pair_refs).map_err(|e| format!("{e}"))?;
205            Ok(Some(wrap_view(new_view)))
206        }
207        "drop_cols" => {
208            if args.len() != 1 {
209                return Err("TidyView.drop_cols requires 1 argument: column names array".into());
210            }
211            let cols = value_to_str_vec(&args[0])?;
212            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
213            let new_view = view.drop_cols(&col_refs).map_err(|e| format!("{e}"))?;
214            Ok(Some(wrap_view(new_view)))
215        }
216        "bind_rows" => {
217            if args.len() != 1 {
218                return Err("TidyView.bind_rows requires 1 argument: other TidyView".into());
219            }
220            let other_rc = match &args[0] {
221                Value::TidyView(rc) => rc,
222                _ => return Err("bind_rows argument must be a TidyView".into()),
223            };
224            let other = downcast_view(other_rc)?;
225            let frame = view.bind_rows(other).map_err(|e| format!("{e}"))?;
226            Ok(Some(wrap_view(frame.view())))
227        }
228        "bind_cols" => {
229            if args.len() != 1 {
230                return Err("TidyView.bind_cols requires 1 argument: other TidyView".into());
231            }
232            let other_rc = match &args[0] {
233                Value::TidyView(rc) => rc,
234                _ => return Err("bind_cols argument must be a TidyView".into()),
235            };
236            let other = downcast_view(other_rc)?;
237            let frame = view.bind_cols(other).map_err(|e| format!("{e}"))?;
238            Ok(Some(wrap_view(frame.view())))
239        }
240
241        // -- column extraction / tensor -------------------------------------
242        "column" => {
243            if args.len() != 1 {
244                return Err("TidyView.column requires 1 argument: column_name".into());
245            }
246            let name = value_to_string(&args[0])?;
247            let df = view.materialize().map_err(|e| format!("{e}"))?;
248            let col = df
249                .get_column(&name)
250                .ok_or_else(|| format!("column '{}' not found", name))?;
251            Ok(Some(column_to_value(col)))
252        }
253        "to_tensor" => {
254            if args.len() != 1 {
255                return Err("TidyView.to_tensor requires 1 argument: column_names array".into());
256            }
257            let cols = value_to_str_vec(&args[0])?;
258            let col_refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
259            let t = view.to_tensor(&col_refs).map_err(|e| format!("{e}"))?;
260            Ok(Some(Value::Tensor(t)))
261        }
262
263        // -- materialize to DataFrame Struct --------------------------------
264        "collect" => {
265            let df = view.materialize().map_err(|e| format!("{e}"))?;
266            Ok(Some(dataframe_to_value(df)))
267        }
268
269        // -- print (for debugging) ------------------------------------------
270        "print" => {
271            let df = view.materialize().map_err(|e| format!("{e}"))?;
272            let s = format_dataframe(&df);
273            // Returning the formatted string; the caller is responsible for
274            // printing and capturing in output buffer.
275            Ok(Some(Value::String(Rc::new(s))))
276        }
277
278        _ => Ok(None), // unknown method — caller falls through
279    }
280}
281
282/// Dispatch a method call on a `Value::GroupedTidyView`.
283pub fn dispatch_grouped_method(
284    inner: &Rc<dyn Any>,
285    method: &str,
286    args: &[Value],
287) -> Result<Option<Value>, String> {
288    let grouped = downcast_grouped(inner)?;
289    match method {
290        "ngroups" => Ok(Some(Value::Int(grouped.ngroups() as i64))),
291
292        "summarise" | "summarize" => {
293            if args.len() % 2 != 0 || args.is_empty() {
294                return Err(
295                    "summarise requires pairs of (name, agg) arguments".into(),
296                );
297            }
298            let mut assignments: Vec<(String, TidyAgg)> = Vec::new();
299            let mut i = 0;
300            while i < args.len() {
301                let name = value_to_string(&args[i])?;
302                let agg = value_to_tidy_agg(&args[i + 1])?;
303                assignments.push((name, agg));
304                i += 2;
305            }
306            let asg_refs: Vec<(&str, TidyAgg)> = assignments
307                .iter()
308                .map(|(n, a)| (n.as_str(), a.clone()))
309                .collect();
310            let frame = grouped.summarise(&asg_refs).map_err(|e| format!("{e}"))?;
311            Ok(Some(wrap_view(frame.view())))
312        }
313
314        "ungroup" => {
315            let view = grouped.clone().ungroup();
316            Ok(Some(wrap_view(view)))
317        }
318
319        _ => Ok(None),
320    }
321}
322
323// ============================================================================
324//  Helpers — Value ↔ cjc_data conversions
325// ============================================================================
326
327fn downcast_view(inner: &Rc<dyn Any>) -> Result<&TidyView, String> {
328    inner
329        .downcast_ref::<TidyView>()
330        .ok_or_else(|| "internal error: TidyView downcast failed".to_string())
331}
332
333fn downcast_grouped(inner: &Rc<dyn Any>) -> Result<&GroupedTidyView, String> {
334    inner
335        .downcast_ref::<GroupedTidyView>()
336        .ok_or_else(|| "internal error: GroupedTidyView downcast failed".to_string())
337}
338
339/// Wrap a `TidyView` into `Value::TidyView`.
340pub fn wrap_view(view: TidyView) -> Value {
341    Value::TidyView(Rc::new(view) as Rc<dyn Any>)
342}
343
344/// Wrap a `GroupedTidyView` into `Value::GroupedTidyView`.
345pub fn wrap_grouped(grouped: GroupedTidyView) -> Value {
346    Value::GroupedTidyView(Rc::new(grouped) as Rc<dyn Any>)
347}
348
349/// Convert `Value::String` → `String`.
350fn value_to_string(v: &Value) -> Result<String, String> {
351    match v {
352        Value::String(s) => Ok(s.as_ref().clone()),
353        _ => Err(format!("expected String, got {}", v.type_name())),
354    }
355}
356
357/// Convert `Value::Int` → `usize`.
358fn value_to_usize(v: &Value) -> Result<usize, String> {
359    match v {
360        Value::Int(i) if *i >= 0 => Ok(*i as usize),
361        Value::Int(i) => Err(format!("expected non-negative Int, got {i}")),
362        _ => Err(format!("expected Int, got {}", v.type_name())),
363    }
364}
365
366/// Convert `Value::Array([String, ...])` → `Vec<String>`.
367fn value_to_str_vec(v: &Value) -> Result<Vec<String>, String> {
368    match v {
369        Value::Array(arr) => arr
370            .iter()
371            .map(|v| match v {
372                Value::String(s) => Ok(s.as_ref().clone()),
373                _ => Err(format!("expected String in array, got {}", v.type_name())),
374            })
375            .collect(),
376        _ => Err(format!("expected Array, got {}", v.type_name())),
377    }
378}
379
380/// Parse a `Value::Struct { name: "DExpr", ... }` into a `DExpr`.
381///
382/// The CJC language constructs DExpr values via helper builtins:
383///   col("name")        → Struct { name: "DExpr", kind: "col", value: "name" }
384///   binop(">", l, r)   → Struct { name: "DExpr", kind: "binop", op: ">", left: l, right: r }
385///   lit_int(42)         → Struct { name: "DExpr", kind: "lit_int", value: 42 }
386///   etc.
387///
388/// For ergonomic use, we also accept raw literals directly:
389///   Value::Int(42)      → DExpr::LitInt(42)
390///   Value::Float(3.14)  → DExpr::LitFloat(3.14)
391///   Value::Bool(true)   → DExpr::LitBool(true)
392///   Value::String("x")  → DExpr::Col("x")   -- shorthand for col("x")
393pub fn value_to_dexpr(v: &Value) -> Result<DExpr, String> {
394    match v {
395        // Literal shorthand
396        Value::Int(i) => Ok(DExpr::LitInt(*i)),
397        Value::Float(f) => Ok(DExpr::LitFloat(*f)),
398        Value::Bool(b) => Ok(DExpr::LitBool(*b)),
399        Value::String(s) => Ok(DExpr::Col(s.as_ref().clone())),
400        // Struct-encoded DExpr
401        Value::Struct { name, fields } if name == "DExpr" => {
402            let kind = fields
403                .get("kind")
404                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
405                .ok_or("DExpr struct missing 'kind' string field")?;
406            match kind {
407                "col" => {
408                    let col_name = fields
409                        .get("value")
410                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
411                        .ok_or("DExpr col missing 'value' string field")?;
412                    Ok(DExpr::Col(col_name))
413                }
414                "lit_int" => {
415                    let val = fields
416                        .get("value")
417                        .and_then(|v| if let Value::Int(i) = v { Some(*i) } else { None })
418                        .ok_or("DExpr lit_int missing 'value' int field")?;
419                    Ok(DExpr::LitInt(val))
420                }
421                "lit_float" => {
422                    let val = fields
423                        .get("value")
424                        .and_then(|v| if let Value::Float(f) = v { Some(*f) } else { None })
425                        .ok_or("DExpr lit_float missing 'value' float field")?;
426                    Ok(DExpr::LitFloat(val))
427                }
428                "lit_bool" => {
429                    let val = fields
430                        .get("value")
431                        .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
432                        .ok_or("DExpr lit_bool missing 'value' bool field")?;
433                    Ok(DExpr::LitBool(val))
434                }
435                "lit_str" => {
436                    let val = fields
437                        .get("value")
438                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
439                        .ok_or("DExpr lit_str missing 'value' string field")?;
440                    Ok(DExpr::LitStr(val))
441                }
442                "binop" => {
443                    let op_str = fields
444                        .get("op")
445                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
446                        .ok_or("DExpr binop missing 'op' field")?;
447                    let op = parse_binop(op_str)?;
448                    let left = fields.get("left").ok_or("DExpr binop missing 'left'")?;
449                    let right = fields.get("right").ok_or("DExpr binop missing 'right'")?;
450                    Ok(DExpr::BinOp {
451                        op,
452                        left: Box::new(value_to_dexpr(left)?),
453                        right: Box::new(value_to_dexpr(right)?),
454                    })
455                }
456                "count" => Ok(DExpr::Count),
457                other => Err(format!("unknown DExpr kind: {other}")),
458            }
459        }
460        _ => Err(format!(
461            "cannot convert {} to DExpr (expected DExpr struct, Int, Float, Bool, or String)",
462            v.type_name()
463        )),
464    }
465}
466
467fn parse_binop(s: &str) -> Result<DBinOp, String> {
468    match s {
469        "+" | "add" => Ok(DBinOp::Add),
470        "-" | "sub" => Ok(DBinOp::Sub),
471        "*" | "mul" => Ok(DBinOp::Mul),
472        "/" | "div" => Ok(DBinOp::Div),
473        ">" | "gt" => Ok(DBinOp::Gt),
474        "<" | "lt" => Ok(DBinOp::Lt),
475        ">=" | "ge" => Ok(DBinOp::Ge),
476        "<=" | "le" => Ok(DBinOp::Le),
477        "==" | "eq" => Ok(DBinOp::Eq),
478        "!=" | "ne" => Ok(DBinOp::Ne),
479        "&&" | "and" => Ok(DBinOp::And),
480        "||" | "or" => Ok(DBinOp::Or),
481        other => Err(format!("unknown binop: {other}")),
482    }
483}
484
485/// Parse a `Value::Struct` representing a TidyAgg, e.g.:
486///   Struct { name: "TidyAgg", kind: "sum", col: "salary" }
487///   Struct { name: "TidyAgg", kind: "count" }
488fn value_to_tidy_agg(v: &Value) -> Result<TidyAgg, String> {
489    match v {
490        Value::Struct { name, fields } if name == "TidyAgg" => {
491            let kind = fields
492                .get("kind")
493                .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
494                .ok_or("TidyAgg struct missing 'kind' string")?;
495            match kind {
496                "count" => Ok(TidyAgg::Count),
497                "sum" | "mean" | "min" | "max" | "first" | "last"
498                | "median" | "sd" | "var" | "n_distinct" | "iqr" => {
499                    let col = fields
500                        .get("col")
501                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
502                        .ok_or_else(|| format!("TidyAgg {kind} missing 'col' string"))?;
503                    match kind {
504                        "sum" => Ok(TidyAgg::Sum(col)),
505                        "mean" => Ok(TidyAgg::Mean(col)),
506                        "min" => Ok(TidyAgg::Min(col)),
507                        "max" => Ok(TidyAgg::Max(col)),
508                        "first" => Ok(TidyAgg::First(col)),
509                        "last" => Ok(TidyAgg::Last(col)),
510                        "median" => Ok(TidyAgg::Median(col)),
511                        "sd" => Ok(TidyAgg::Sd(col)),
512                        "var" => Ok(TidyAgg::Var(col)),
513                        "n_distinct" => Ok(TidyAgg::NDistinct(col)),
514                        "iqr" => Ok(TidyAgg::Iqr(col)),
515                        _ => unreachable!(),
516                    }
517                }
518                "quantile" => {
519                    let col = fields
520                        .get("col")
521                        .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().clone()) } else { None })
522                        .ok_or("TidyAgg quantile missing 'col' string")?;
523                    let p = fields
524                        .get("p")
525                        .and_then(|v| match v {
526                            Value::Float(f) => Some(*f),
527                            Value::Int(i) => Some(*i as f64),
528                            _ => None,
529                        })
530                        .ok_or("TidyAgg quantile missing 'p' float")?;
531                    Ok(TidyAgg::Quantile(col, p))
532                }
533                other => Err(format!("unknown TidyAgg kind: {other}")),
534            }
535        }
536        _ => Err(format!("expected TidyAgg struct, got {}", v.type_name())),
537    }
538}
539
540/// Parse ArrangeKey array. Each element can be:
541///   - String "col_name"       → ascending
542///   - Struct { name: "ArrangeKey", col: "name", desc: bool }
543fn value_to_arrange_keys(v: &Value) -> Result<Vec<ArrangeKey>, String> {
544    match v {
545        Value::Array(arr) => {
546            let mut keys = Vec::with_capacity(arr.len());
547            for item in arr.iter() {
548                match item {
549                    Value::String(s) => keys.push(ArrangeKey::asc(s)),
550                    Value::Struct { name, fields } if name == "ArrangeKey" => {
551                        let col = fields
552                            .get("col")
553                            .and_then(|v| if let Value::String(s) = v { Some(s.as_ref().as_str()) } else { None })
554                            .ok_or("ArrangeKey missing 'col'")?;
555                        let desc = fields
556                            .get("desc")
557                            .and_then(|v| if let Value::Bool(b) = v { Some(*b) } else { None })
558                            .unwrap_or(false);
559                        keys.push(if desc { ArrangeKey::desc(col) } else { ArrangeKey::asc(col) });
560                    }
561                    _ => return Err(format!("arrange key must be String or ArrangeKey struct, got {}", item.type_name())),
562                }
563            }
564            Ok(keys)
565        }
566        _ => Err(format!("arrange requires Array of keys, got {}", v.type_name())),
567    }
568}
569
570/// Parse rename pairs from `[["old","new"], ["old2","new2"]]`.
571fn value_to_rename_pairs(v: &Value) -> Result<Vec<(String, String)>, String> {
572    match v {
573        Value::Array(arr) => {
574            let mut pairs = Vec::with_capacity(arr.len());
575            for item in arr.iter() {
576                match item {
577                    Value::Array(pair) if pair.len() == 2 => {
578                        let old = value_to_string(&pair[0])?;
579                        let new = value_to_string(&pair[1])?;
580                        pairs.push((old, new));
581                    }
582                    _ => return Err("rename pairs must be arrays of [old, new] strings".into()),
583                }
584            }
585            Ok(pairs)
586        }
587        _ => Err(format!("rename requires Array of pairs, got {}", v.type_name())),
588    }
589}
590
591// ============================================================================
592//  Join dispatcher
593// ============================================================================
594
595/// Dispatch inner_join / left_join / semi_join / anti_join.
596///
597/// The CJC API is: `view.inner_join(other, left_on, right_on)`.
598/// The Rust API is: `view.inner_join(&other, &[(&left_on, &right_on)])`.
599fn dispatch_join(
600    view: &TidyView,
601    args: &[Value],
602    kind: &str,
603) -> Result<Option<Value>, String> {
604    if args.len() != 3 {
605        return Err(format!(
606            "TidyView.{kind} requires 3 args: other_view, left_on, right_on"
607        ));
608    }
609    let other_rc = match &args[0] {
610        Value::TidyView(rc) => rc,
611        _ => return Err(format!("{kind}: first arg must be a TidyView")),
612    };
613    let other = downcast_view(other_rc)?;
614    let left_on = value_to_string(&args[1])?;
615    let right_on = value_to_string(&args[2])?;
616    let on_pairs: Vec<(&str, &str)> = vec![(&left_on, &right_on)];
617
618    match kind {
619        "inner_join" => {
620            let frame = view.inner_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
621            Ok(Some(wrap_view(frame.view())))
622        }
623        "left_join" => {
624            let frame = view.left_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
625            Ok(Some(wrap_view(frame.view())))
626        }
627        "semi_join" => {
628            let new_view = view.semi_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
629            Ok(Some(wrap_view(new_view)))
630        }
631        "anti_join" => {
632            let new_view = view.anti_join(other, &on_pairs).map_err(|e| format!("{e}"))?;
633            Ok(Some(wrap_view(new_view)))
634        }
635        _ => Ok(None),
636    }
637}
638
639// ============================================================================
640//  Column → Value conversion
641// ============================================================================
642
643/// Convert a `Column` to a `Value::Array`.
644fn column_to_value(col: &Column) -> Value {
645    let vals: Vec<Value> = match col {
646        Column::Int(v) => v.iter().map(|i| Value::Int(*i)).collect(),
647        Column::Float(v) => v.iter().map(|f| Value::Float(*f)).collect(),
648        Column::Str(v) => v
649            .iter()
650            .map(|s| Value::String(Rc::new(s.clone())))
651            .collect(),
652        Column::Bool(v) => v.iter().map(|b| Value::Bool(*b)).collect(),
653        Column::Categorical { levels, codes } => codes
654            .iter()
655            .map(|&c| Value::String(Rc::new(levels[c as usize].clone())))
656            .collect(),
657        Column::DateTime(v) => v.iter().map(|i| Value::Int(*i)).collect(),
658    };
659    Value::Array(Rc::new(vals))
660}
661
662// ============================================================================
663//  DataFrame → Value (for .collect())
664// ============================================================================
665
666/// Convert a `DataFrame` to the legacy `Value::Struct { name: "DataFrame" }`
667/// representation used by existing CJC code.
668pub fn dataframe_to_value(df: DataFrame) -> Value {
669    let mut fields = std::collections::BTreeMap::new();
670    let mut col_names: Vec<Value> = Vec::new();
671    let nrows = df.nrows();
672    for (name, col) in &df.columns {
673        col_names.push(Value::String(Rc::new(name.clone())));
674        fields.insert(name.clone(), column_to_value(col));
675    }
676    fields.insert(
677        "__columns".to_string(),
678        Value::Array(Rc::new(col_names)),
679    );
680    fields.insert("__nrows".to_string(), Value::Int(nrows as i64));
681    Value::Struct {
682        name: "DataFrame".to_string(),
683        fields,
684    }
685}
686
687/// Produce a human-readable table-formatted string from a DataFrame.
688fn format_dataframe(df: &DataFrame) -> String {
689    let ncols = df.ncols();
690    let nrows = df.nrows();
691    if ncols == 0 {
692        return "DataFrame(0x0)".to_string();
693    }
694
695    // Column names
696    let names: Vec<&str> = df.columns.iter().map(|(n, _)| n.as_str()).collect();
697
698    // Compute widths
699    let mut widths: Vec<usize> = names.iter().map(|n| n.len()).collect();
700    let display_rows = nrows.min(20); // cap at 20 rows for display
701    let mut cells: Vec<Vec<String>> = Vec::with_capacity(display_rows);
702    for r in 0..display_rows {
703        let mut row: Vec<String> = Vec::with_capacity(ncols);
704        for (ci, (_, col)) in df.columns.iter().enumerate() {
705            let s = col.get_display(r);
706            if s.len() > widths[ci] {
707                widths[ci] = s.len();
708            }
709            row.push(s);
710        }
711        cells.push(row);
712    }
713
714    let mut out = String::new();
715    // Header
716    for (ci, name) in names.iter().enumerate() {
717        if ci > 0 { out.push_str("  "); }
718        out.push_str(&format!("{:>width$}", name, width = widths[ci]));
719    }
720    out.push('\n');
721    // Rows
722    for row in &cells {
723        for (ci, cell) in row.iter().enumerate() {
724            if ci > 0 { out.push_str("  "); }
725            out.push_str(&format!("{:>width$}", cell, width = widths[ci]));
726        }
727        out.push('\n');
728    }
729    if nrows > display_rows {
730        out.push_str(&format!("... ({} more rows)\n", nrows - display_rows));
731    }
732    out
733}
734
735// ============================================================================
736//  DExpr builder builtins (col, binop, agg, etc.)
737// ============================================================================
738
739/// Build a `Value::Struct { name: "DExpr", kind: "col", ... }` from a column name.
740pub fn build_col_expr(name: &str) -> Value {
741    let mut fields = std::collections::BTreeMap::new();
742    fields.insert("kind".to_string(), Value::String(Rc::new("col".to_string())));
743    fields.insert("value".to_string(), Value::String(Rc::new(name.to_string())));
744    Value::Struct { name: "DExpr".to_string(), fields }
745}
746
747/// Build a DExpr binary operation.
748pub fn build_binop_expr(op: &str, left: Value, right: Value) -> Value {
749    let mut fields = std::collections::BTreeMap::new();
750    fields.insert("kind".to_string(), Value::String(Rc::new("binop".to_string())));
751    fields.insert("op".to_string(), Value::String(Rc::new(op.to_string())));
752    fields.insert("left".to_string(), left);
753    fields.insert("right".to_string(), right);
754    Value::Struct { name: "DExpr".to_string(), fields }
755}
756
757/// Build a TidyAgg struct value.
758pub fn build_tidy_agg(kind: &str, col: Option<&str>) -> Value {
759    let mut fields = std::collections::BTreeMap::new();
760    fields.insert("kind".to_string(), Value::String(Rc::new(kind.to_string())));
761    if let Some(c) = col {
762        fields.insert("col".to_string(), Value::String(Rc::new(c.to_string())));
763    }
764    Value::Struct { name: "TidyAgg".to_string(), fields }
765}
766
767/// Build an ArrangeKey struct value.
768pub fn build_arrange_key(col: &str, descending: bool) -> Value {
769    let mut fields = std::collections::BTreeMap::new();
770    fields.insert("col".to_string(), Value::String(Rc::new(col.to_string())));
771    fields.insert("desc".to_string(), Value::Bool(descending));
772    Value::Struct { name: "ArrangeKey".to_string(), fields }
773}
774
775/// Dispatch builder builtins like `col()`, `desc()`, `asc()`, `sum()`, `mean()`, etc.
776/// Returns `Ok(Some(value))` if recognised, `Ok(None)` otherwise.
777pub fn dispatch_tidy_builtin(name: &str, args: &[Value]) -> Result<Option<Value>, String> {
778    match name {
779        // DExpr builders
780        "col" => {
781            if args.len() != 1 {
782                return Err("col() requires 1 argument: column name".into());
783            }
784            let name = value_to_string(&args[0])?;
785            Ok(Some(build_col_expr(&name)))
786        }
787        "desc" => {
788            if args.len() != 1 {
789                return Err("desc() requires 1 argument: column name".into());
790            }
791            let name = value_to_string(&args[0])?;
792            Ok(Some(build_arrange_key(&name, true)))
793        }
794        "asc" => {
795            if args.len() != 1 {
796                return Err("asc() requires 1 argument: column name".into());
797            }
798            let name = value_to_string(&args[0])?;
799            Ok(Some(build_arrange_key(&name, false)))
800        }
801        // DExpr binary op builder
802        "dexpr_binop" => {
803            if args.len() != 3 {
804                return Err("dexpr_binop() requires 3 args: op, left, right".into());
805            }
806            let op = value_to_string(&args[0])?;
807            Ok(Some(build_binop_expr(&op, args[1].clone(), args[2].clone())))
808        }
809
810        // TidyAgg builders
811        "tidy_count" => Ok(Some(build_tidy_agg("count", None))),
812        "tidy_sum" => {
813            if args.len() != 1 { return Err("tidy_sum() requires 1 argument: column name".into()); }
814            let col = value_to_string(&args[0])?;
815            Ok(Some(build_tidy_agg("sum", Some(&col))))
816        }
817        "tidy_mean" => {
818            if args.len() != 1 { return Err("tidy_mean() requires 1 argument: column name".into()); }
819            let col = value_to_string(&args[0])?;
820            Ok(Some(build_tidy_agg("mean", Some(&col))))
821        }
822        "tidy_min" => {
823            if args.len() != 1 { return Err("tidy_min() requires 1 argument: column name".into()); }
824            let col = value_to_string(&args[0])?;
825            Ok(Some(build_tidy_agg("min", Some(&col))))
826        }
827        "tidy_max" => {
828            if args.len() != 1 { return Err("tidy_max() requires 1 argument: column name".into()); }
829            let col = value_to_string(&args[0])?;
830            Ok(Some(build_tidy_agg("max", Some(&col))))
831        }
832        "tidy_first" => {
833            if args.len() != 1 { return Err("tidy_first() requires 1 argument: column name".into()); }
834            let col = value_to_string(&args[0])?;
835            Ok(Some(build_tidy_agg("first", Some(&col))))
836        }
837        "tidy_last" => {
838            if args.len() != 1 { return Err("tidy_last() requires 1 argument: column name".into()); }
839            let col = value_to_string(&args[0])?;
840            Ok(Some(build_tidy_agg("last", Some(&col))))
841        }
842
843        // =====================================================================
844        //  stringr builtins — byte-first string view approach
845        //
846        //  CJC strings are UTF-8 byte sequences. These functions operate on the
847        //  byte representation via cjc-regex's Thompson NFA. Where possible,
848        //  results are slices (zero-copy views) of the input. Allocation happens
849        //  only when replacement or splitting creates new buffers.
850        //
851        //  Key design point: patterns are compiled fresh per call. For hot-loop
852        //  use, prefer the compiled Regex value type (regex literal `/pattern/`).
853        // =====================================================================
854
855        "str_detect" => {
856            // str_detect(haystack, pattern) → bool
857            if args.len() != 2 { return Err("str_detect requires 2 args: string, pattern".into()); }
858            let hay = value_to_string(&args[0])?;
859            let pat = value_to_string(&args[1])?;
860            let matched = cjc_regex::is_match(&pat, "", hay.as_bytes());
861            Ok(Some(Value::Bool(matched)))
862        }
863        "str_extract" => {
864            // str_extract(haystack, pattern) → string (first match) or ""
865            if args.len() != 2 { return Err("str_extract requires 2 args: string, pattern".into()); }
866            let hay = value_to_string(&args[0])?;
867            let pat = value_to_string(&args[1])?;
868            match cjc_regex::find(&pat, "", hay.as_bytes()) {
869                Some((start, end)) => {
870                    let slice = &hay.as_bytes()[start..end];
871                    let s = String::from_utf8_lossy(slice).to_string();
872                    Ok(Some(Value::String(Rc::new(s))))
873                }
874                None => Ok(Some(Value::String(Rc::new(String::new())))),
875            }
876        }
877        "str_extract_all" => {
878            // str_extract_all(haystack, pattern) → [string]
879            if args.len() != 2 { return Err("str_extract_all requires 2 args: string, pattern".into()); }
880            let hay = value_to_string(&args[0])?;
881            let pat = value_to_string(&args[1])?;
882            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
883            let vals: Vec<Value> = matches
884                .iter()
885                .map(|&(start, end)| {
886                    let slice = &hay.as_bytes()[start..end];
887                    Value::String(Rc::new(String::from_utf8_lossy(slice).to_string()))
888                })
889                .collect();
890            Ok(Some(Value::Array(Rc::new(vals))))
891        }
892        "str_replace" => {
893            // str_replace(haystack, pattern, replacement) → string (first match replaced)
894            if args.len() != 3 { return Err("str_replace requires 3 args: string, pattern, replacement".into()); }
895            let hay = value_to_string(&args[0])?;
896            let pat = value_to_string(&args[1])?;
897            let rep = value_to_string(&args[2])?;
898            match cjc_regex::find(&pat, "", hay.as_bytes()) {
899                Some((start, end)) => {
900                    let mut result = String::with_capacity(hay.len());
901                    result.push_str(&hay[..start]);
902                    result.push_str(&rep);
903                    result.push_str(&hay[end..]);
904                    Ok(Some(Value::String(Rc::new(result))))
905                }
906                None => Ok(Some(Value::String(Rc::new(hay)))),
907            }
908        }
909        "str_replace_all" => {
910            // str_replace_all(haystack, pattern, replacement) → string (all matches replaced)
911            if args.len() != 3 { return Err("str_replace_all requires 3 args: string, pattern, replacement".into()); }
912            let hay = value_to_string(&args[0])?;
913            let pat = value_to_string(&args[1])?;
914            let rep = value_to_string(&args[2])?;
915            let matches = cjc_regex::find_all(&pat, "", hay.as_bytes());
916            if matches.is_empty() {
917                return Ok(Some(Value::String(Rc::new(hay))));
918            }
919            let mut result = String::with_capacity(hay.len());
920            let mut last_end = 0;
921            for &(start, end) in &matches {
922                result.push_str(&hay[last_end..start]);
923                result.push_str(&rep);
924                last_end = end;
925            }
926            result.push_str(&hay[last_end..]);
927            Ok(Some(Value::String(Rc::new(result))))
928        }
929        "str_split" => {
930            // str_split(haystack, pattern) → [string]
931            if args.len() != 2 { return Err("str_split requires 2 args: string, pattern".into()); }
932            let hay = value_to_string(&args[0])?;
933            let pat = value_to_string(&args[1])?;
934            let spans = cjc_regex::split(&pat, "", hay.as_bytes());
935            let vals: Vec<Value> = spans
936                .iter()
937                .map(|&(start, end)| {
938                    Value::String(Rc::new(
939                        String::from_utf8_lossy(&hay.as_bytes()[start..end]).to_string(),
940                    ))
941                })
942                .collect();
943            Ok(Some(Value::Array(Rc::new(vals))))
944        }
945        "str_count" => {
946            // str_count(haystack, pattern) → int (number of matches)
947            if args.len() != 2 { return Err("str_count requires 2 args: string, pattern".into()); }
948            let hay = value_to_string(&args[0])?;
949            let pat = value_to_string(&args[1])?;
950            let count = cjc_regex::find_all(&pat, "", hay.as_bytes()).len();
951            Ok(Some(Value::Int(count as i64)))
952        }
953        "str_trim" => {
954            // str_trim(string) → string with leading/trailing whitespace removed
955            if args.len() != 1 { return Err("str_trim requires 1 arg: string".into()); }
956            let s = value_to_string(&args[0])?;
957            Ok(Some(Value::String(Rc::new(s.trim().to_string()))))
958        }
959        "str_to_upper" => {
960            if args.len() != 1 { return Err("str_to_upper requires 1 arg: string".into()); }
961            let s = value_to_string(&args[0])?;
962            Ok(Some(Value::String(Rc::new(s.to_uppercase()))))
963        }
964        "str_to_lower" => {
965            if args.len() != 1 { return Err("str_to_lower requires 1 arg: string".into()); }
966            let s = value_to_string(&args[0])?;
967            Ok(Some(Value::String(Rc::new(s.to_lowercase()))))
968        }
969        "str_starts" => {
970            if args.len() != 2 { return Err("str_starts requires 2 args: string, prefix".into()); }
971            let s = value_to_string(&args[0])?;
972            let prefix = value_to_string(&args[1])?;
973            Ok(Some(Value::Bool(s.starts_with(&prefix))))
974        }
975        "str_ends" => {
976            if args.len() != 2 { return Err("str_ends requires 2 args: string, suffix".into()); }
977            let s = value_to_string(&args[0])?;
978            let suffix = value_to_string(&args[1])?;
979            Ok(Some(Value::Bool(s.ends_with(&suffix))))
980        }
981        "str_sub" => {
982            // str_sub(string, start, end) → substring (byte-indexed, clamped)
983            if args.len() != 3 { return Err("str_sub requires 3 args: string, start, end".into()); }
984            let s = value_to_string(&args[0])?;
985            let start = value_to_usize(&args[1])?.min(s.len());
986            let end = value_to_usize(&args[2])?.min(s.len());
987            if start > end {
988                Ok(Some(Value::String(Rc::new(String::new()))))
989            } else {
990                // Clamp to char boundaries for safety
991                let actual_start = clamp_to_char_boundary(&s, start);
992                let actual_end = clamp_to_char_boundary(&s, end);
993                Ok(Some(Value::String(Rc::new(s[actual_start..actual_end].to_string()))))
994            }
995        }
996        "str_len" => {
997            // str_len(string) → int (byte length, consistent with byte-first view)
998            if args.len() != 1 { return Err("str_len requires 1 arg: string".into()); }
999            let s = value_to_string(&args[0])?;
1000            Ok(Some(Value::Int(s.len() as i64)))
1001        }
1002
1003        // =====================================================================
1004        //  Stats builtins (operate on Array of numbers)
1005        // =====================================================================
1006
1007        "median" => {
1008            if args.len() != 1 { return Err("median requires 1 arg: numeric array".into()); }
1009            let nums = value_to_f64_vec(&args[0])?;
1010            if nums.is_empty() {
1011                return Ok(Some(Value::Float(f64::NAN)));
1012            }
1013            let mut sorted = nums;
1014            sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1015            let mid = sorted.len() / 2;
1016            let med = if sorted.len() % 2 == 0 {
1017                (sorted[mid - 1] + sorted[mid]) / 2.0
1018            } else {
1019                sorted[mid]
1020            };
1021            Ok(Some(Value::Float(med)))
1022        }
1023        "sd" => {
1024            // Population standard deviation
1025            if args.len() != 1 { return Err("sd requires 1 arg: numeric array".into()); }
1026            let nums = value_to_f64_vec(&args[0])?;
1027            if nums.len() < 2 {
1028                return Ok(Some(Value::Float(f64::NAN)));
1029            }
1030            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1031            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1032                / (nums.len() - 1) as f64;
1033            Ok(Some(Value::Float(var.sqrt())))
1034        }
1035        "variance" => {
1036            // Sample variance (N-1 denominator)
1037            if args.len() != 1 { return Err("variance requires 1 arg: numeric array".into()); }
1038            let nums = value_to_f64_vec(&args[0])?;
1039            if nums.len() < 2 {
1040                return Ok(Some(Value::Float(f64::NAN)));
1041            }
1042            let mean = nums.iter().sum::<f64>() / nums.len() as f64;
1043            let var = nums.iter().map(|x| (x - mean) * (x - mean)).sum::<f64>()
1044                / (nums.len() - 1) as f64;
1045            Ok(Some(Value::Float(var)))
1046        }
1047        "n_distinct" => {
1048            // Count distinct values in an array
1049            if args.len() != 1 { return Err("n_distinct requires 1 arg: array".into()); }
1050            match &args[0] {
1051                Value::Array(arr) => {
1052                    let mut seen = std::collections::HashSet::new();
1053                    for v in arr.iter() {
1054                        seen.insert(format!("{v}"));
1055                    }
1056                    Ok(Some(Value::Int(seen.len() as i64)))
1057                }
1058                _ => Err(format!("n_distinct expects Array, got {}", args[0].type_name())),
1059            }
1060        }
1061
1062        _ => Ok(None),
1063    }
1064}
1065
1066/// Clamp a byte index to the nearest char boundary (round down).
1067fn clamp_to_char_boundary(s: &str, idx: usize) -> usize {
1068    if idx >= s.len() {
1069        return s.len();
1070    }
1071    let mut i = idx;
1072    while i > 0 && !s.is_char_boundary(i) {
1073        i -= 1;
1074    }
1075    i
1076}
1077
1078/// Convert a Value::Array of numbers to Vec<f64>.
1079fn value_to_f64_vec(v: &Value) -> Result<Vec<f64>, String> {
1080    match v {
1081        Value::Array(arr) => {
1082            arr.iter()
1083                .map(|v| match v {
1084                    Value::Float(f) => Ok(*f),
1085                    Value::Int(i) => Ok(*i as f64),
1086                    _ => Err(format!("expected numeric value in array, got {}", v.type_name())),
1087                })
1088                .collect()
1089        }
1090        _ => Err(format!("expected Array, got {}", v.type_name())),
1091    }
1092}