Skip to main content

robin_sparkless_polars/plan/
expr.rs

1//! Expression interpreter: turn serialized expression trees (JSON/serde) into Polars Expr.
2//! Used by the plan interpreter for filter, select, and withColumn payloads.
3
4use polars::prelude::{DataType, Expr, Series, col, lit};
5use serde_json::Value;
6use std::error::Error;
7use std::fmt;
8
9/// Error from parsing or interpreting a plan expression.
10#[derive(Debug)]
11pub struct PlanExprError(String);
12
13impl fmt::Display for PlanExprError {
14    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15        write!(f, "{}", self.0)
16    }
17}
18
19impl Error for PlanExprError {}
20
21/// Convert a serialized expression tree (JSON Value) into a Polars Expr.
22/// Supports: bare string (column reference), col, lit, comparison ops (eq, ne, gt, ge, lt, le),
23/// logical (and, or), not, and a subset of functions.
24/// (Fixes #644: accept bare string as column reference so embedders can pass column names.)
25pub fn expr_from_value(v: &Value) -> Result<Expr, PlanExprError> {
26    // Bare string: treat as column reference (PySpark parity: select/filter with column name).
27    if let Some(name) = v.as_str() {
28        return Ok(col(name));
29    }
30
31    let obj = v.as_object().ok_or_else(|| {
32        PlanExprError("expression must be a JSON object or column name string".to_string())
33    })?;
34
35    // Column reference: {"col": "name"}
36    if let Some(name) = obj.get("col").and_then(Value::as_str) {
37        return Ok(col(name));
38    }
39
40    // Literal: {"lit": <value>}
41    if let Some(lit_val) = obj.get("lit") {
42        return lit_from_value(lit_val);
43    }
44
45    // Binary op: {"op": "gt"|"eq"|..., "left": <expr>, "right": <expr>}
46    if let Some(op) = obj.get("op").and_then(Value::as_str) {
47        match op {
48            "eq" | "ne" | "gt" | "ge" | "lt" | "le" => {
49                let left_v = obj
50                    .get("left")
51                    .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'left'")))?;
52                let right_v = obj
53                    .get("right")
54                    .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'right'")))?;
55                let l = expr_from_value(left_v)?;
56                let r = expr_from_value(right_v)?;
57
58                // Best-effort type hints: literals we can infer directly; columns are left
59                // without types here and will be handled by the DataFrame-level rewriter.
60                let infer_lit_type = |e: &Expr| -> Option<DataType> {
61                    if let Expr::Literal(lv) = e {
62                        let dt = lv.get_datatype();
63                        if matches!(dt, DataType::Unknown(_)) {
64                            None
65                        } else {
66                            Some(dt)
67                        }
68                    } else {
69                        None
70                    }
71                };
72
73                let l_ty = infer_lit_type(&l);
74                let r_ty = infer_lit_type(&r);
75
76                // #612: When one side is a literal and the other is a column, assume column is String
77                // so string–numeric coercion is applied (PySpark parity: df.filter(df["s"] == 123)).
78                use crate::type_coercion::{CompareOp, coerce_for_pyspark_comparison};
79                let op_enum = match op {
80                    "eq" => CompareOp::Eq,
81                    "ne" => CompareOp::NotEq,
82                    "gt" => CompareOp::Gt,
83                    "ge" => CompareOp::GtEq,
84                    "lt" => CompareOp::Lt,
85                    "le" => CompareOp::LtEq,
86                    _ => unreachable!(),
87                };
88                let (lt, rt) = match (&l_ty, &r_ty) {
89                    (Some(lt), Some(rt)) => (lt.clone(), rt.clone()),
90                    (Some(lt), None) => (lt.clone(), DataType::String),
91                    (None, Some(rt)) => (DataType::String, rt.clone()),
92                    (None, None) => {
93                        // No type info; skip coercion (may error at runtime).
94                        return Ok(match op {
95                            "eq" => l.eq(r),
96                            "ne" => l.neq(r),
97                            "gt" => l.gt(r),
98                            "ge" => l.gt_eq(r),
99                            "lt" => l.lt(r),
100                            "le" => l.lt_eq(r),
101                            _ => unreachable!(),
102                        });
103                    }
104                };
105                let expr =
106                    match coerce_for_pyspark_comparison(l.clone(), r.clone(), &lt, &rt, &op_enum) {
107                        Ok((lc, rc)) => match op {
108                            "eq" => lc.eq(rc),
109                            "ne" => lc.neq(rc),
110                            "gt" => lc.gt(rc),
111                            "ge" => lc.gt_eq(rc),
112                            "lt" => lc.lt(rc),
113                            "le" => lc.lt_eq(rc),
114                            _ => unreachable!(),
115                        },
116                        Err(_) => match op {
117                            "eq" => l.eq(r),
118                            "ne" => l.neq(r),
119                            "gt" => l.gt(r),
120                            "ge" => l.gt_eq(r),
121                            "lt" => l.lt(r),
122                            "le" => l.lt_eq(r),
123                            _ => unreachable!(),
124                        },
125                    };
126
127                return Ok(expr);
128            }
129            "eq_null_safe" => {
130                let left = obj.get("left").ok_or_else(|| {
131                    PlanExprError("op 'eq_null_safe' requires 'left'".to_string())
132                })?;
133                let right = obj.get("right").ok_or_else(|| {
134                    PlanExprError("op 'eq_null_safe' requires 'right'".to_string())
135                })?;
136                let l = expr_from_value(left)?;
137                let r = expr_from_value(right)?;
138                return eq_null_safe_expr(l, r);
139            }
140            "and" => {
141                let left = obj
142                    .get("left")
143                    .ok_or_else(|| PlanExprError("op 'and' requires 'left'".to_string()))?;
144                let right = obj
145                    .get("right")
146                    .ok_or_else(|| PlanExprError("op 'and' requires 'right'".to_string()))?;
147                return Ok(expr_from_value(left)?.and(expr_from_value(right)?));
148            }
149            "or" => {
150                let left = obj
151                    .get("left")
152                    .ok_or_else(|| PlanExprError("op 'or' requires 'left'".to_string()))?;
153                let right = obj
154                    .get("right")
155                    .ok_or_else(|| PlanExprError("op 'or' requires 'right'".to_string()))?;
156                return Ok(expr_from_value(left)?.or(expr_from_value(right)?));
157            }
158            "not" => {
159                let arg = obj
160                    .get("arg")
161                    .ok_or_else(|| PlanExprError("op 'not' requires 'arg'".to_string()))?;
162                return Ok(expr_from_value(arg)?.not());
163            }
164            "between" => {
165                let left_v = obj
166                    .get("left")
167                    .ok_or_else(|| PlanExprError("op 'between' requires 'left'".to_string()))?;
168                let lower_v = obj
169                    .get("lower")
170                    .ok_or_else(|| PlanExprError("op 'between' requires 'lower'".to_string()))?;
171                let upper_v = obj
172                    .get("upper")
173                    .ok_or_else(|| PlanExprError("op 'between' requires 'upper'".to_string()))?;
174                let left = expr_from_value(left_v)?;
175                let lower = expr_from_value(lower_v)?;
176                let upper = expr_from_value(upper_v)?;
177                // #628: Apply string–numeric coercion so col("val").between(1, 10) works when val is string.
178                let infer_lit_type = |e: &Expr| -> Option<DataType> {
179                    if let Expr::Literal(lv) = e {
180                        let dt = lv.get_datatype();
181                        if matches!(dt, DataType::Unknown(_)) {
182                            None
183                        } else {
184                            Some(dt)
185                        }
186                    } else {
187                        None
188                    }
189                };
190                let lower_ty = infer_lit_type(&lower);
191                let upper_ty = infer_lit_type(&upper);
192                use crate::type_coercion::{CompareOp, coerce_for_pyspark_comparison};
193                // Assume column is String when unknown so string–numeric coercion is applied (#628).
194                let lt = DataType::String;
195                let rt_lower = lower_ty.unwrap_or(DataType::String);
196                let rt_upper = upper_ty.unwrap_or(DataType::String);
197                let (left_c, lower_c) = match coerce_for_pyspark_comparison(
198                    left.clone(),
199                    lower.clone(),
200                    &lt,
201                    &rt_lower,
202                    &CompareOp::GtEq,
203                ) {
204                    Ok((a, b)) => (a, b),
205                    Err(_) => (left.clone(), lower),
206                };
207                let upper_clone = upper.clone();
208                let (left_cc, upper_c) = match coerce_for_pyspark_comparison(
209                    left_c.clone(),
210                    upper,
211                    &lt,
212                    &rt_upper,
213                    &CompareOp::LtEq,
214                ) {
215                    Ok((a, b)) => (a, b),
216                    Err(_) => (left_c.clone(), upper_clone),
217                };
218                return Ok(left_cc.clone().gt_eq(lower_c).and(left_cc.lt_eq(upper_c)));
219            }
220            "**" | "pow" => {
221                let left_v = obj
222                    .get("left")
223                    .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'left'")))?;
224                let right_v = obj
225                    .get("right")
226                    .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'right'")))?;
227                let l = expr_from_value(left_v)?;
228                let r = expr_from_value(right_v)?;
229                let left_col = expr_to_column(l);
230                let right_col = expr_to_column(r);
231                return Ok(left_col.pow_with(&right_col).into_expr());
232            }
233            "isin" => {
234                // {"op": "isin", "left": <col_expr>, "right": <list_expr>} or
235                // {"op": "isin", "left": <col_expr>, "values": [<lit>, ...]}
236                // Empty list -> lit(false) for all rows (issue #518)
237                let left_v = obj
238                    .get("left")
239                    .ok_or_else(|| PlanExprError("op 'isin' requires 'left'".to_string()))?;
240                let left_expr = expr_from_value(left_v)?;
241                let values_opt =
242                    if let Some(values_arr) = obj.get("values").and_then(Value::as_array) {
243                        try_values_for_isin(values_arr)?
244                    } else if let Some(right_v) = obj.get("right") {
245                        try_values_from_plan_value(right_v)?
246                    } else {
247                        return Err(PlanExprError(
248                            "op 'isin' requires 'right' or 'values'".to_string(),
249                        ));
250                    };
251                return Ok(match values_opt {
252                    None => lit(false),
253                    Some(values_expr) => left_expr.is_in(values_expr, false),
254                });
255            }
256            "getItem" => {
257                // {"op": "getItem", "left": <col_expr>, "right": <index_lit>} - PySpark 0-based
258                let left_v = obj
259                    .get("left")
260                    .ok_or_else(|| PlanExprError("op 'getItem' requires 'left'".to_string()))?;
261                let right_v = obj
262                    .get("right")
263                    .ok_or_else(|| PlanExprError("op 'getItem' requires 'right'".to_string()))?;
264                let col_expr = expr_from_value(left_v)?;
265                let idx = lit_as_i64(right_v)?;
266                let col_c = expr_to_column(col_expr);
267                return Ok(col_c.get_item(idx).into_expr());
268            }
269            "startswith" => {
270                // {"op": "startswith", "left": col, "right": {"lit": "prefix"}}
271                let left_v = obj
272                    .get("left")
273                    .ok_or_else(|| PlanExprError("op 'startswith' requires 'left'".to_string()))?;
274                let right_v = obj
275                    .get("right")
276                    .ok_or_else(|| PlanExprError("op 'startswith' requires 'right'".to_string()))?;
277                let col_expr = expr_from_value(left_v)?;
278                let prefix = lit_as_string(right_v)?;
279                let col_c = expr_to_column(col_expr);
280                return Ok(crate::functions::startswith(&col_c, &prefix).into_expr());
281            }
282            "is_null" => {
283                let arg = obj
284                    .get("arg")
285                    .ok_or_else(|| PlanExprError("op 'is_null' requires 'arg'".to_string()))?;
286                return Ok(expr_from_value(arg)?.is_null());
287            }
288            "is_not_null" => {
289                let arg = obj
290                    .get("arg")
291                    .ok_or_else(|| PlanExprError("op 'is_not_null' requires 'arg'".to_string()))?;
292                return Ok(expr_from_value(arg)?.is_not_null());
293            }
294            "regexp_extract" => {
295                // {"op": "regexp_extract", "left": col, "pattern": {"lit": "..."}, "group": {"lit": 0}}
296                // or {"op": "regexp_extract", "args": [col, pattern, group]}
297                if let Some(args) = obj.get("args").and_then(Value::as_array) {
298                    require_args_min("regexp_extract", args, 3)?;
299                    let col_expr = expr_from_value(&args[0])?;
300                    let pattern = lit_as_string(&args[1])?;
301                    let group_idx = lit_as_usize(&args[2])?;
302                    let col_c = expr_to_column(col_expr);
303                    return Ok(
304                        crate::functions::regexp_extract(&col_c, &pattern, group_idx).into_expr(),
305                    );
306                }
307                let left_v = obj.get("left").ok_or_else(|| {
308                    PlanExprError("op 'regexp_extract' requires 'left'".to_string())
309                })?;
310                let col_expr = expr_from_value(left_v)?;
311                let pattern_v =
312                    obj.get("pattern")
313                        .or_else(|| obj.get("right"))
314                        .ok_or_else(|| {
315                            PlanExprError(
316                                "op 'regexp_extract' requires 'pattern' or 'right'".to_string(),
317                            )
318                        })?;
319                let pattern = lit_as_string(pattern_v)?;
320                let group_idx = obj
321                    .get("group")
322                    .and_then(|v| lit_as_usize(v).ok())
323                    .unwrap_or(0);
324                let col_c = expr_to_column(col_expr);
325                return Ok(
326                    crate::functions::regexp_extract(&col_c, &pattern, group_idx).into_expr(),
327                );
328            }
329            "regexp_replace" => {
330                // {"op": "regexp_replace", "left": col, "pattern": {"lit": "..."}, "replacement": {"lit": "..."}}
331                // or {"op": "regexp_replace", "args": [col, pattern, replacement]} (issue #528)
332                if let Some(args) = obj.get("args").and_then(Value::as_array) {
333                    require_args_min("regexp_replace", args, 3)?;
334                    let col_expr = expr_from_value(&args[0])?;
335                    let pattern = lit_as_string(&args[1])?;
336                    let replacement = lit_as_string(&args[2])?;
337                    let col_c = expr_to_column(col_expr);
338                    return Ok(
339                        crate::functions::regexp_replace(&col_c, &pattern, &replacement)
340                            .into_expr(),
341                    );
342                }
343                let left_v = obj.get("left").ok_or_else(|| {
344                    PlanExprError("op 'regexp_replace' requires 'left'".to_string())
345                })?;
346                let col_expr = expr_from_value(left_v)?;
347                let pattern =
348                    lit_as_string(obj.get("pattern").or_else(|| obj.get("right")).ok_or_else(
349                        || {
350                            PlanExprError(
351                                "op 'regexp_replace' requires 'pattern' or 'right'".to_string(),
352                            )
353                        },
354                    )?)?;
355                let replacement = lit_as_string(obj.get("replacement").ok_or_else(|| {
356                    PlanExprError("op 'regexp_replace' requires 'replacement'".to_string())
357                })?)?;
358                let col_c = expr_to_column(col_expr);
359                return Ok(
360                    crate::functions::regexp_replace(&col_c, &pattern, &replacement).into_expr(),
361                );
362            }
363            "create_map" | "createMap" => {
364                // {"op": "create_map"|"createMap", "args": [key1, val1, key2, val2, ...]} (issue #542)
365                let args_arr = obj.get("args").and_then(Value::as_array).ok_or_else(|| {
366                    PlanExprError("op 'create_map'/'createMap' requires 'args' array".to_string())
367                })?;
368                let exprs: Result<Vec<Expr>, _> = args_arr.iter().map(expr_from_value).collect();
369                let cols: Vec<crate::Column> = exprs?.into_iter().map(expr_to_column).collect();
370                let refs: Vec<&crate::Column> = cols.iter().collect();
371                return Ok(crate::functions::create_map(&refs)
372                    .map_err(|e| PlanExprError(e.to_string()))?
373                    .into_expr());
374            }
375            "add" | "+" => {
376                // {"op": "add", "left": <expr>, "right": <expr>} - e.g. row_number().over(w) + 10
377                let left_v = obj
378                    .get("left")
379                    .ok_or_else(|| PlanExprError("op 'add' requires 'left'".to_string()))?;
380                let right_v = obj
381                    .get("right")
382                    .ok_or_else(|| PlanExprError("op 'add' requires 'right'".to_string()))?;
383                let l = expr_from_value(left_v)?;
384                let r = expr_from_value(right_v)?;
385                let a = expr_to_column(l);
386                let b = expr_to_column(r);
387                return Ok(a.add_pyspark(&b).into_expr());
388            }
389            "sub" | "minus" | "-" => {
390                // {"op": "sub", "left": <expr>, "right": <expr>} — e.g. (1 - col("x")) #556
391                let left_v = obj
392                    .get("left")
393                    .ok_or_else(|| PlanExprError("op 'sub' requires 'left'".to_string()))?;
394                let right_v = obj
395                    .get("right")
396                    .ok_or_else(|| PlanExprError("op 'sub' requires 'right'".to_string()))?;
397                let l = expr_from_value(left_v)?;
398                let r = expr_from_value(right_v)?;
399                let a = expr_to_column(l);
400                let b = expr_to_column(r);
401                return Ok(a.subtract(&b).into_expr());
402            }
403            "mul" | "*" => {
404                // {"op": "mul", "left": <expr>, "right": <expr>} — e.g. (100 * col("x")) #556
405                let left_v = obj
406                    .get("left")
407                    .ok_or_else(|| PlanExprError("op 'mul' requires 'left'".to_string()))?;
408                let right_v = obj
409                    .get("right")
410                    .ok_or_else(|| PlanExprError("op 'mul' requires 'right'".to_string()))?;
411                let l = expr_from_value(left_v)?;
412                let r = expr_from_value(right_v)?;
413                let a = expr_to_column(l);
414                let b = expr_to_column(r);
415                return Ok(a.multiply(&b).into_expr());
416            }
417            "udf" => {
418                // {"op": "udf", "udf"|"name": "udf_name", "args": [<expr>, ...]} (issue #545)
419                let udf_name = obj
420                    .get("udf")
421                    .or_else(|| obj.get("name"))
422                    .and_then(Value::as_str)
423                    .ok_or_else(|| {
424                        PlanExprError("op 'udf' requires 'udf' or 'name'".to_string())
425                    })?;
426                let args = obj
427                    .get("args")
428                    .and_then(Value::as_array)
429                    .ok_or_else(|| PlanExprError("op 'udf' requires 'args' array".to_string()))?;
430                let col = column_from_udf_call(udf_name, args)?;
431                if col.udf_call.is_some() {
432                    return Err(PlanExprError(
433                        "Python/Vectorized UDFs are only supported in withColumn/select, not in filter/plan expressions"
434                            .into(),
435                    ));
436                }
437                return Ok(col.expr().clone());
438            }
439            // #547, #554, #583: Sparkless may send functions as op with "args" (same semantics as fn)
440            "translate" | "substring_index" | "substringIndex" | "levenshtein" | "soundex"
441            | "crc32" | "xxhash64" | "get_json_object" | "getJsonObject" | "json_tuple"
442            | "jsonTuple" | "regexp_extract_all" | "regexpExtractAll" | "date_trunc"
443            | "dateTrunc" | "to_date" | "toDate" | "format_string" | "formatString" | "log"
444            | "explode" | "explode_outer" | "explodeOuter" | "concat" | "contains" => {
445                let args = obj
446                    .get("args")
447                    .and_then(Value::as_array)
448                    .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'args' array")))?;
449                let fn_name = match op {
450                    "substringIndex" => "substring_index",
451                    "getJsonObject" => "get_json_object",
452                    "jsonTuple" => "json_tuple",
453                    "regexpExtractAll" => "regexp_extract_all",
454                    "dateTrunc" => "date_trunc",
455                    "toDate" => "to_date",
456                    "formatString" => "format_string",
457                    "explodeOuter" => "explode_outer",
458                    other => other,
459                };
460                return expr_from_fn(fn_name, args);
461            }
462            _ => {
463                return Err(PlanExprError(format!("unsupported expression op: {op}")));
464            }
465        }
466    }
467
468    // UDF call: {"udf": "name", "args": [<expr>, ...]} - returns Expr for Rust UDF only (Python UDF needs Column path)
469    if let Some(udf_name) = obj.get("udf").and_then(Value::as_str) {
470        let args = obj
471            .get("args")
472            .and_then(Value::as_array)
473            .ok_or_else(|| PlanExprError("udf requires 'args' array".to_string()))?;
474        let col = column_from_udf_call(udf_name, args)?;
475        if col.udf_call.is_some() {
476            return Err(PlanExprError(
477                "Python/Vectorized UDFs are only supported in withColumn/select, not in filter/plan expressions"
478                    .into(),
479            ));
480        }
481        return Ok(col.expr().clone());
482    }
483
484    // Function call: {"fn"|"function": "upper"|...|"row_number", "args": [<expr>, ...], optional "window": {...}}
485    // Sparkless may send "function" instead of "fn" (issue #517). Window fns allow empty args.
486    let fn_name = obj
487        .get("fn")
488        .or_else(|| obj.get("function"))
489        .and_then(Value::as_str);
490    if let Some(fn_name) = fn_name {
491        if let Some(window_val) = obj.get("window") {
492            return expr_from_window_fn(
493                fn_name,
494                window_val,
495                obj.get("args").and_then(Value::as_array),
496            );
497        }
498        let args = obj
499            .get("args")
500            .and_then(Value::as_array)
501            .ok_or_else(|| PlanExprError(format!("fn '{fn_name}' requires 'args' array")))?;
502        return expr_from_fn(fn_name, args);
503    }
504
505    // type: "window" - Sparkless: {"type": "window", "fn"|"function": "row_number", "window": {...}}
506    if let Some(typ) = obj.get("type").and_then(Value::as_str) {
507        if typ == "window" {
508            let fn_name = obj
509                .get("fn")
510                .or_else(|| obj.get("function"))
511                .and_then(Value::as_str)
512                .ok_or_else(|| {
513                    PlanExprError("type window requires 'fn' or 'function'".to_string())
514                })?;
515            let window_val = obj
516                .get("window")
517                .ok_or_else(|| PlanExprError("type window requires 'window'".to_string()))?;
518            let args = obj.get("args").and_then(Value::as_array);
519            return expr_from_window_fn(fn_name, window_val, args);
520        }
521    }
522
523    Err(PlanExprError(
524        "expression must have 'col', 'lit', 'op', 'fn', or 'type'".to_string(),
525    ))
526}
527
528/// Extract column name from window spec item: "col" or {"col": "name"} or {"col": "name", "asc": true} (issue #517).
529fn window_col_from_value(x: &Value) -> Option<String> {
530    if let Some(s) = x.as_str() {
531        return Some(s.to_string());
532    }
533    if let Some(obj) = x.as_object() {
534        if let Some(name) = obj.get("col").and_then(Value::as_str) {
535            return Some(name.to_string());
536        }
537    }
538    None
539}
540
541/// Parse window spec object into (order_col_names, partition_by_col_names).
542/// order_by / partition_by can be arrays of "col" or {"col": "name", "asc": true}.
543/// Does not apply fallback; callers that need order column use order_cols.or(part_cols) (issue #517).
544fn parse_window_spec(v: &Value) -> Result<(Vec<String>, Vec<String>), PlanExprError> {
545    let obj = v
546        .as_object()
547        .ok_or_else(|| PlanExprError("window spec must be object".to_string()))?;
548    let order_arr = obj.get("order_by").and_then(Value::as_array);
549    let part_arr = obj.get("partition_by").and_then(Value::as_array);
550    let order_cols: Vec<String> = order_arr
551        .map(|a| a.iter().filter_map(window_col_from_value).collect())
552        .unwrap_or_default();
553    let part_cols: Vec<String> = part_arr
554        .map(|a| a.iter().filter_map(window_col_from_value).collect())
555        .unwrap_or_default();
556    Ok((order_cols, part_cols))
557}
558
559/// Order column names for window: prefer order_by, fall back to partition_by (issue #517).
560fn window_order_cols(order_cols: &[String], part_cols: &[String]) -> Vec<String> {
561    if order_cols.is_empty() {
562        part_cols.to_vec()
563    } else {
564        order_cols.to_vec()
565    }
566}
567
568/// Build window expression for row_number (issue #517, #520).
569fn expr_from_row_number_window(v: &Value) -> Result<Expr, PlanExprError> {
570    let (order_cols, part_cols) = parse_window_spec(v)?;
571    let part_refs: Vec<&str> = part_cols.iter().map(|s| s.as_str()).collect();
572    let effective_order = window_order_cols(&order_cols, &part_cols);
573    let order_col = if effective_order.is_empty() {
574        crate::Column::from_expr(lit(1i32), None)
575    } else {
576        crate::Column::new(effective_order[0].clone())
577    };
578    let rn = order_col.row_number(false).over(&part_refs);
579    Ok(rn.into_expr())
580}
581
582/// Dispatch window fn by name: row_number, rank, dense_rank, percent_rank, ntile, lag, lead, sum, avg (issues #517, #521).
583fn expr_from_window_fn(
584    fn_name: &str,
585    window_val: &Value,
586    args: Option<&Vec<Value>>,
587) -> Result<Expr, PlanExprError> {
588    use crate::Column;
589    let (order_cols, part_cols) = parse_window_spec(window_val)?;
590    let part_refs: Vec<&str> = part_cols.iter().map(|s| s.as_str()).collect();
591    let effective_order = window_order_cols(&order_cols, &part_cols);
592    let empty: &[Value] = &[];
593    let args: &[Value] = args.map_or(empty, |v| v);
594    let order_col = if effective_order.is_empty() {
595        Column::from_expr(lit(1i32), None)
596    } else {
597        Column::new(effective_order[0].clone())
598    };
599
600    match fn_name {
601        "row_number" => expr_from_row_number_window(window_val),
602        "rank" => {
603            let c = order_col.rank(false).over(&part_refs);
604            Ok(c.into_expr())
605        }
606        "dense_rank" => {
607            let c = order_col.dense_rank(false).over(&part_refs);
608            Ok(c.into_expr())
609        }
610        "percent_rank" => {
611            let c = order_col.percent_rank(&part_refs, false);
612            Ok(c.into_expr())
613        }
614        "ntile" => {
615            let n = args
616                .first()
617                .and_then(|v| v.get("lit").and_then(Value::as_i64))
618                .or_else(|| args.first().and_then(Value::as_i64))
619                .ok_or_else(|| {
620                    PlanExprError("ntile window requires n (number of buckets)".to_string())
621                })? as u32;
622            let c = order_col.ntile(n.max(1), &part_refs, false);
623            Ok(c.into_expr())
624        }
625        "lag" => {
626            let n = args
627                .get(1)
628                .and_then(|v| v.get("lit").and_then(Value::as_i64))
629                .or_else(|| args.get(1).and_then(Value::as_i64))
630                .unwrap_or(1);
631            let col_expr =
632                expr_to_column(expr_from_value(args.first().ok_or_else(|| {
633                    PlanExprError("lag window requires column arg".to_string())
634                })?)?);
635            let c = col_expr.lag(n).over(&part_refs);
636            Ok(c.into_expr())
637        }
638        "lead" => {
639            let n = args
640                .get(1)
641                .and_then(|v| v.get("lit").and_then(Value::as_i64))
642                .or_else(|| args.get(1).and_then(Value::as_i64))
643                .unwrap_or(1);
644            let col_expr =
645                expr_to_column(expr_from_value(args.first().ok_or_else(|| {
646                    PlanExprError("lead window requires column arg".to_string())
647                })?)?);
648            let c = col_expr.lead(n).over(&part_refs);
649            Ok(c.into_expr())
650        }
651        "sum" => {
652            let col_expr =
653                expr_to_column(expr_from_value(args.first().ok_or_else(|| {
654                    PlanExprError("sum window requires column arg".to_string())
655                })?)?);
656            let sum_expr = col_expr.expr().clone().sum();
657            let partition_exprs: Vec<Expr> = part_refs.iter().map(|s| col(*s)).collect();
658            Ok(sum_expr.over(partition_exprs))
659        }
660        "avg" | "mean" => {
661            let col_expr =
662                expr_to_column(expr_from_value(args.first().ok_or_else(|| {
663                    PlanExprError("avg window requires column arg".to_string())
664                })?)?);
665            let mean_expr = col_expr.expr().clone().mean();
666            let partition_exprs: Vec<Expr> = part_refs.iter().map(|s| col(*s)).collect();
667            Ok(mean_expr.over(partition_exprs))
668        }
669        "approx_count_distinct" => {
670            let col_expr = expr_to_column(expr_from_value(args.first().ok_or_else(|| {
671                PlanExprError("approx_count_distinct window requires column arg".to_string())
672            })?)?);
673            let n_unique_expr = col_expr.expr().clone().n_unique().cast(DataType::Int64);
674            let partition_exprs: Vec<Expr> = part_refs.iter().map(|s| col(*s)).collect();
675            Ok(n_unique_expr.over(partition_exprs))
676        }
677        _ => Err(PlanExprError(format!(
678            "unsupported window fn '{fn_name}' (supported: row_number, rank, dense_rank, percent_rank, ntile, lag, lead, sum, avg, approx_count_distinct)"
679        ))),
680    }
681}
682
683fn lit_from_value(v: &Value) -> Result<Expr, PlanExprError> {
684    use polars::prelude::{NULL, lit};
685    if v.is_null() {
686        return Ok(lit(NULL));
687    }
688    if let Some(n) = v.as_i64() {
689        return Ok(lit(n));
690    }
691    if let Some(n) = v.as_f64() {
692        return Ok(lit(n));
693    }
694    if let Some(b) = v.as_bool() {
695        return Ok(lit(b));
696    }
697    if let Some(s) = v.as_str() {
698        return Ok(lit(s));
699    }
700    Err(PlanExprError("unsupported literal type".to_string()))
701}
702
703// --- Literal extraction from {"lit": value} (for function args) ---
704
705fn lit_as_string(v: &Value) -> Result<String, PlanExprError> {
706    let lit_val = v
707        .get("lit")
708        .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
709    if lit_val.is_null() {
710        return Err(PlanExprError("literal string cannot be null".to_string()));
711    }
712    if let Some(s) = lit_val.as_str() {
713        return Ok(s.to_string());
714    }
715    if let Some(n) = lit_val.as_i64() {
716        return Ok(n.to_string());
717    }
718    if let Some(n) = lit_val.as_f64() {
719        return Ok(n.to_string());
720    }
721    if let Some(b) = lit_val.as_bool() {
722        return Ok(b.to_string());
723    }
724    Err(PlanExprError(
725        "literal must be string, number, or bool".to_string(),
726    ))
727}
728
729fn lit_as_i64(v: &Value) -> Result<i64, PlanExprError> {
730    let lit_val = v
731        .get("lit")
732        .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
733    lit_val
734        .as_i64()
735        .ok_or_else(|| PlanExprError("literal must be integer".to_string()))
736}
737
738fn lit_as_i32(v: &Value) -> Result<i32, PlanExprError> {
739    let n = lit_as_i64(v)?;
740    n.try_into()
741        .map_err(|_| PlanExprError("literal out of i32 range".to_string()))
742}
743
744fn lit_as_u32(v: &Value) -> Result<u32, PlanExprError> {
745    let lit_val = v
746        .get("lit")
747        .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
748    if let Some(n) = lit_val.as_u64() {
749        return n
750            .try_into()
751            .map_err(|_| PlanExprError("literal out of u32 range".to_string()));
752    }
753    if let Some(n) = lit_val.as_i64() {
754        return (n as u64)
755            .try_into()
756            .map_err(|_| PlanExprError("literal out of u32 range".to_string()));
757    }
758    Err(PlanExprError("literal must be number".to_string()))
759}
760
761fn lit_as_f64(v: &Value) -> Result<f64, PlanExprError> {
762    let lit_val = v
763        .get("lit")
764        .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
765    if let Some(n) = lit_val.as_f64() {
766        return Ok(n);
767    }
768    if let Some(n) = lit_val.as_i64() {
769        return Ok(n as f64);
770    }
771    Err(PlanExprError("literal must be number".to_string()))
772}
773
774#[allow(dead_code)]
775fn lit_as_bool(v: &Value) -> Result<bool, PlanExprError> {
776    let lit_val = v
777        .get("lit")
778        .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
779    lit_val
780        .as_bool()
781        .ok_or_else(|| PlanExprError("literal must be boolean".to_string()))
782}
783
784fn lit_as_usize(v: &Value) -> Result<usize, PlanExprError> {
785    let n = lit_as_i64(v)?;
786    if n < 0 {
787        return Err(PlanExprError(
788            "literal must be non-negative for usize".to_string(),
789        ));
790    }
791    n.try_into()
792        .map_err(|_| PlanExprError("literal out of usize range".to_string()))
793}
794
795/// Extract values for isin from JSON array. Each element: {"lit": v} or plain v. Returns Expr for is_in.
796/// When arr is empty or parses to no values (e.g. all nulls), returns Ok(None) — caller uses lit(false)
797/// so that col.isin([]) is false for all rows (PySpark parity; issue #518).
798/// Type of series (issue #581): when all values are integers use Int64 series so Int64 column
799/// isin(1, 3) works; when all are floats use Float64; otherwise use String series for mixed/string.
800fn try_values_for_isin(arr: &[Value]) -> Result<Option<Expr>, PlanExprError> {
801    if arr.is_empty() {
802        return Ok(None);
803    }
804    let mut str_vals: Vec<String> = Vec::new();
805    let mut int_vals: Vec<i64> = Vec::new();
806    let mut float_vals: Vec<f64> = Vec::new();
807    let mut has_string = false;
808    let mut has_float = false;
809    for v in arr {
810        let lit_val = if let Some(obj) = v.as_object() {
811            obj.get("lit").unwrap_or(v)
812        } else {
813            v
814        };
815        if lit_val.is_null() {
816            continue;
817        }
818        if let Some(s) = lit_val.as_str() {
819            str_vals.push(s.to_string());
820            has_string = true;
821        } else if let Some(n) = lit_val.as_i64() {
822            int_vals.push(n);
823            str_vals.push(n.to_string());
824        } else if let Some(n) = lit_val.as_f64() {
825            float_vals.push(n);
826            str_vals.push(n.to_string());
827            has_float = true;
828        }
829    }
830    if str_vals.is_empty() {
831        return Ok(None);
832    }
833    let s: Series = if has_string {
834        Series::from_iter(str_vals.iter().map(|x| x.as_str()))
835    } else if !has_float && int_vals.len() == str_vals.len() {
836        Series::from_iter(int_vals)
837    } else if float_vals.len() == str_vals.len() {
838        Series::from_iter(float_vals)
839    } else {
840        Series::from_iter(str_vals.iter().map(|x| x.as_str()))
841    };
842    Ok(Some(lit(s)))
843}
844
845/// Extract values for isin from plan value. Returns None when empty (caller uses lit(false)).
846fn try_values_from_plan_value(v: &Value) -> Result<Option<Expr>, PlanExprError> {
847    if let Some(lit_val) = v.get("lit") {
848        if let Some(arr) = lit_val.as_array() {
849            return try_values_for_isin(arr);
850        }
851        // Single literal - wrap in array
852        #[allow(clippy::cloned_ref_to_slice_refs)]
853        return try_values_for_isin(&[v.clone()]);
854    }
855    if let Some(arr) = v.as_array() {
856        return try_values_for_isin(arr);
857    }
858    Err(PlanExprError(
859        "isin right/values must be array or {lit: [...]}".to_string(),
860    ))
861}
862
863/// Optional string literal: if `args[i]` is missing or null, return None; else require {"lit": "..."}.
864fn arg_lit_opt_str(args: &[Value], i: usize) -> Result<Option<String>, PlanExprError> {
865    let v = match args.get(i) {
866        Some(x) => x,
867        None => return Ok(None),
868    };
869    if v.is_null() {
870        return Ok(None);
871    }
872    if let Some(obj) = v.as_object() {
873        if obj.get("lit").is_some() {
874            return Ok(Some(lit_as_string(v)?));
875        }
876    }
877    Ok(None)
878}
879
880fn arg_expr(args: &[Value], i: usize) -> Result<Expr, PlanExprError> {
881    let v = args
882        .get(i)
883        .ok_or_else(|| PlanExprError(format!("fn requires argument at index {i}")))?;
884    expr_from_value(v)
885}
886
887/// Accept string literal in either form: bare JSON string or {"lit": "..."} (issue #582).
888fn arg_lit_str(args: &[Value], i: usize) -> Result<String, PlanExprError> {
889    let v = args
890        .get(i)
891        .ok_or_else(|| PlanExprError(format!("fn requires string literal at index {i}")))?;
892    if let Some(s) = v.as_str() {
893        return Ok(s.to_string());
894    }
895    lit_as_string(v)
896}
897
898fn arg_lit_i64(args: &[Value], i: usize) -> Result<i64, PlanExprError> {
899    let v = args
900        .get(i)
901        .ok_or_else(|| PlanExprError(format!("fn requires integer literal at index {i}")))?;
902    lit_as_i64(v)
903}
904
905fn arg_lit_i32(args: &[Value], i: usize) -> Result<i32, PlanExprError> {
906    let v = args
907        .get(i)
908        .ok_or_else(|| PlanExprError(format!("fn requires integer literal at index {i}")))?;
909    lit_as_i32(v)
910}
911
912fn arg_lit_u32(args: &[Value], i: usize) -> Result<u32, PlanExprError> {
913    let v = args
914        .get(i)
915        .ok_or_else(|| PlanExprError(format!("fn requires non-negative integer at index {i}")))?;
916    lit_as_u32(v)
917}
918
919fn arg_lit_f64(args: &[Value], i: usize) -> Result<f64, PlanExprError> {
920    let v = args
921        .get(i)
922        .ok_or_else(|| PlanExprError(format!("fn requires number literal at index {i}")))?;
923    lit_as_f64(v)
924}
925
926/// Accept non-negative integer in either form: bare JSON number or {"lit": n} (issue #582).
927fn arg_lit_usize(args: &[Value], i: usize) -> Result<usize, PlanExprError> {
928    let v = args
929        .get(i)
930        .ok_or_else(|| PlanExprError(format!("fn requires non-negative integer at index {i}")))?;
931    if let Some(n) = v.as_i64() {
932        if n < 0 {
933            return Err(PlanExprError(
934                "literal must be non-negative for usize".to_string(),
935            ));
936        }
937        return n
938            .try_into()
939            .map_err(|_| PlanExprError("literal out of usize range".to_string()));
940    }
941    if let Some(n) = v.as_u64() {
942        return n
943            .try_into()
944            .map_err(|_| PlanExprError("literal out of usize range".to_string()));
945    }
946    lit_as_usize(v)
947}
948
949/// Get optional i64 from `args[i]` if present and a literal.
950fn opt_lit_i64(args: &[Value], i: usize) -> Option<i64> {
951    let v = args.get(i)?;
952    v.get("lit").and_then(Value::as_i64)
953}
954
955/// Get optional u64 from args (e.g. for rand(seed)).
956#[allow(dead_code)]
957fn opt_lit_u64(args: &[Value], i: usize) -> Option<u64> {
958    let v = args.get(i)?;
959    if let Some(n) = v.get("lit").and_then(Value::as_i64) {
960        if n >= 0 {
961            return Some(n as u64);
962        }
963        return Some((-n) as u64); // allow negative seed as unsigned
964    }
965    v.get("lit").and_then(Value::as_u64)
966}
967
968fn expr_to_column(expr: Expr) -> crate::Column {
969    crate::Column::from_expr(expr, None)
970}
971
972/// Null-safe equality: (a <=> b) is true when both null, or both non-null and equal.
973/// Applies PySpark-style type coercion (e.g. string vs int) so eq_null_safe matches PySpark (issue #266).
974fn eq_null_safe_expr(left: Expr, right: Expr) -> Result<Expr, PlanExprError> {
975    use polars::prelude::*;
976    let (left_c, right_c) = crate::type_coercion::coerce_for_pyspark_eq_null_safe(left, right)
977        .map_err(|e| PlanExprError(e.to_string()))?;
978    let left_null = left_c.clone().is_null();
979    let right_null = right_c.clone().is_null();
980    let both_null = left_null.clone().and(right_null.clone());
981    let both_non_null = left_null.not().and(right_null.not());
982    let eq_result = left_c.eq(right_c);
983    Ok(when(both_null)
984        .then(lit(true))
985        .when(both_non_null)
986        .then(eq_result)
987        .otherwise(lit(false)))
988}
989
990/// Find index of the closing paren matching the open paren at start.
991fn matching_paren(s: &str, start: usize) -> Option<usize> {
992    if s.as_bytes().get(start) != Some(&b'(') {
993        return None;
994    }
995    let mut depth = 1u32;
996    for (i, b) in s.bytes().enumerate().skip(start + 1) {
997        match b {
998            b'(' => depth += 1,
999            b')' => {
1000                depth -= 1;
1001                if depth == 0 {
1002                    return Some(i);
1003                }
1004            }
1005            _ => {}
1006        }
1007    }
1008    None
1009}
1010
1011/// Parse a single part (column name or literal) for concat/concat_ws.
1012fn concat_part_to_expr(part: &str) -> Expr {
1013    let part = part.trim();
1014    if part.is_empty() {
1015        return lit("");
1016    }
1017    if (part.starts_with('"') && part.ends_with('"'))
1018        || (part.starts_with('\'') && part.ends_with('\''))
1019    {
1020        let inner = part[1..part.len() - 1].trim_matches(['\'', '"']);
1021        return lit(inner);
1022    }
1023    col(part)
1024}
1025
1026/// Try to parse a select-item string as concat(...) or concat_ws(...) expression.
1027/// Used when Sparkless sends e.g. "concat(first_name, , last_name)" as a column name;
1028/// we treat it as an expression and evaluate it. Returns None if s doesn't look like concat/concat_ws.
1029pub fn try_parse_concat_expr_from_string(s: &str) -> Option<Expr> {
1030    use polars::prelude::concat_str;
1031    let s = s.trim();
1032    // concat(...)
1033    if s.starts_with("concat(") {
1034        let close = matching_paren(s, 6)?; // 6 = len("concat")
1035        if close != s.len() - 1 {
1036            return None;
1037        }
1038        let inner = s[7..close].trim();
1039        let parts: Vec<&str> = inner.split(',').map(str::trim).collect();
1040        if parts.is_empty() {
1041            return None;
1042        }
1043        let exprs: Vec<Expr> = parts.iter().map(|p| concat_part_to_expr(p)).collect();
1044        return Some(concat_str(&exprs, "", false));
1045    }
1046    // concat_ws(sep, ...)
1047    if s.starts_with("concat_ws(") {
1048        let close = matching_paren(s, 10)?; // 10 = len("concat_ws")
1049        if close != s.len() - 1 {
1050            return None;
1051        }
1052        let inner = s[10..close].trim();
1053        let parts: Vec<&str> = inner.split(',').map(str::trim).collect();
1054        if parts.len() < 2 {
1055            return None;
1056        }
1057        let sep = parts[0].trim_matches(['\'', '"']);
1058        let exprs: Vec<Expr> = parts
1059            .iter()
1060            .skip(1)
1061            .map(|p| concat_part_to_expr(p))
1062            .collect();
1063        if exprs.is_empty() {
1064            return None;
1065        }
1066        return Some(concat_str(&exprs, sep, false));
1067    }
1068    None
1069}
1070
1071/// Build a Column from a UDF call. Used by expr_from_value and apply_op (withColumn).
1072/// Returns Column; caller checks udf_call for Python UDF (needs with_column, not with_column_expr).
1073pub fn column_from_udf_call(
1074    udf_name: &str,
1075    args: &[Value],
1076) -> Result<crate::Column, PlanExprError> {
1077    use crate::Column;
1078    let cols: Vec<Column> = args
1079        .iter()
1080        .map(|v| expr_from_value(v).map(expr_to_column))
1081        .collect::<Result<Vec<_>, _>>()?;
1082    crate::functions::call_udf(udf_name, &cols).map_err(|e| PlanExprError(e.to_string()))
1083}
1084
1085/// Try to parse a UDF expression and build Column. Supports {"udf": "name", "args": [...]}
1086/// and {"fn": "call_udf", "args": [{"lit": "name"}, ...]}. Returns None if not a UDF expression.
1087pub fn try_column_from_udf_value(v: &Value) -> Option<Result<crate::Column, PlanExprError>> {
1088    let obj = v.as_object()?;
1089    let (udf_name, args) = if let Some(name) = obj.get("udf").and_then(Value::as_str) {
1090        let args = obj.get("args")?.as_array()?;
1091        (name.to_string(), args)
1092    } else if obj.get("fn").and_then(Value::as_str) == Some("call_udf") {
1093        let args = obj.get("args")?.as_array()?;
1094        if args.is_empty() {
1095            return Some(Err(PlanExprError(
1096                "call_udf requires at least name and one arg".into(),
1097            )));
1098        }
1099        let name = match lit_as_string(&args[0]) {
1100            Ok(n) => n,
1101            Err(e) => return Some(Err(e)),
1102        };
1103        let rest: &[Value] = &args[1..];
1104        return Some(column_from_udf_call(&name, rest));
1105    } else {
1106        return None;
1107    };
1108    Some(column_from_udf_call(&udf_name, args))
1109}
1110
1111fn expr_from_fn(name: &str, args: &[Value]) -> Result<Expr, PlanExprError> {
1112    use crate::Column;
1113    #[allow(unused_imports)]
1114    use crate::functions::{
1115        add_months, array_agg, array_append, array_compact, array_contains, array_distinct,
1116        array_except, array_insert, array_intersect, array_join, array_prepend, array_remove,
1117        array_slice, array_sort, array_sum, array_union, arrays_overlap, arrays_zip, ascii,
1118        assert_true, atan2, base64, bin, bit_and, bit_count, bit_get, bit_length, bit_or, bit_xor,
1119        bitwise_not, bround, btrim, cast, cbrt, ceiling, char as rs_char, chr, coalesce, concat,
1120        concat_ws, contains, conv, cos, cosh, cot, crc32, csc, curdate, current_catalog,
1121        current_database, current_date, current_schema, current_timestamp, current_timezone,
1122        current_user, date_add, date_diff, date_format, date_from_unix_date, date_part, date_sub,
1123        date_trunc, dateadd, datediff, datepart, day, dayname, dayofmonth, dayofweek, dayofyear,
1124        days, decode, degrees, e, element_at, elt, encode, endswith, equal_null, exp,
1125        explode_outer, extract, factorial, find_in_set, floor, format_number, format_string,
1126        from_unixtime, from_utc_timestamp, get, get_json_object, getbit, greatest, hash, hex, hour,
1127        hypot, ilike, initcap, input_file_name, instr, isnan, json_tuple, last_day, lcase, least,
1128        left, length, like, lit_str, ln, localtimestamp, locate, log, log1p, log2, log10, lower,
1129        lpad, make_date, make_interval, make_timestamp, make_timestamp_ntz, mask, md5, minute,
1130        monotonically_increasing_id, month, months_between, nanvl, negate, negative, next_day, now,
1131        nullif, nvl, nvl2, octet_length, overlay, parse_url, pi, pmod, positive, pow, power,
1132        quarter, radians, raise_error, rand, randn, regexp, regexp_count, regexp_extract,
1133        regexp_extract_all, regexp_instr, regexp_like, regexp_replace, regexp_substr, repeat,
1134        replace, reverse, right, rint, rlike, round, rpad, sec, second, sha1, sha2, shift_left,
1135        shift_right, signum, sin, sinh, size, soundex, spark_partition_id, split, split_part, sqrt,
1136        startswith, str_to_map, struct_, substr, substring, substring_index, tan, tanh,
1137        timestamp_micros, timestamp_millis, timestamp_seconds, timestampadd, timestampdiff,
1138        to_binary, to_char, to_date, to_degrees, to_radians, to_timestamp, to_unix_timestamp,
1139        to_utc_timestamp, to_varchar, translate, trim, trunc, try_add, try_cast, try_divide,
1140        try_element_at, try_multiply, try_subtract, try_to_binary, try_to_number, try_to_timestamp,
1141        typeof_, ucase, unbase64, unhex, unix_date, unix_micros, unix_millis, unix_seconds,
1142        unix_timestamp, unix_timestamp_now, upper, url_decode, url_encode, user, version, weekday,
1143        weekofyear, when_then_otherwise_null, width_bucket, xxhash64, year,
1144    };
1145
1146    match name {
1147        "call_udf" => {
1148            if args.is_empty() {
1149                return Err(PlanExprError(
1150                    "call_udf requires at least name and one arg".into(),
1151                ));
1152            }
1153            let udf_name = lit_as_string(&args[0])?;
1154            let col = column_from_udf_call(&udf_name, &args[1..])?;
1155            if col.udf_call.is_some() {
1156                return Err(PlanExprError(
1157                    "Python/Vectorized UDFs are only supported in withColumn/select, not in filter/plan expressions"
1158                        .into(),
1159                ));
1160            }
1161            Ok(col.expr().clone())
1162        }
1163        "upper" => {
1164            require_args(name, args, 1)?;
1165            let c = expr_to_column(arg_expr(args, 0)?);
1166            Ok(upper(&c).into_expr())
1167        }
1168        "lower" => {
1169            require_args(name, args, 1)?;
1170            let c = expr_to_column(arg_expr(args, 0)?);
1171            Ok(lower(&c).into_expr())
1172        }
1173        "coalesce" => {
1174            if args.is_empty() {
1175                return Err(PlanExprError(format!(
1176                    "fn '{name}' requires at least one argument"
1177                )));
1178            }
1179            let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1180            let exprs = exprs?;
1181            Ok(polars::prelude::coalesce(&exprs))
1182        }
1183        "when" => {
1184            if args.len() != 2 {
1185                return Err(PlanExprError(format!(
1186                    "fn '{name}' two-arg form requires [condition, then_expr]"
1187                )));
1188            }
1189            let cond = expr_to_column(arg_expr(args, 0)?);
1190            let then_val = expr_to_column(arg_expr(args, 1)?);
1191            Ok(when_then_otherwise_null(&cond, &then_val).into_expr())
1192        }
1193        // --- String ---
1194        "length" | "char_length" | "character_length" => {
1195            require_args(name, args, 1)?;
1196            Ok(length(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1197        }
1198        "trim" => {
1199            require_args(name, args, 1)?;
1200            Ok(trim(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1201        }
1202        "ltrim" => {
1203            require_args(name, args, 1)?;
1204            Ok(crate::functions::ltrim(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1205        }
1206        "rtrim" => {
1207            require_args(name, args, 1)?;
1208            Ok(crate::functions::rtrim(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1209        }
1210        "btrim" => {
1211            require_args_min(name, args, 1)?;
1212            let c = expr_to_column(arg_expr(args, 0)?);
1213            let trim_str: Option<String> = arg_lit_opt_str(args, 1)?;
1214            Ok(btrim(&c, trim_str.as_deref()).into_expr())
1215        }
1216        "substring" | "substr" => {
1217            require_args_min(name, args, 2)?;
1218            let c = expr_to_column(arg_expr(args, 0)?);
1219            let start = arg_lit_i64(args, 1)?;
1220            let len = opt_lit_i64(args, 2);
1221            Ok(substring(&c, start, len).into_expr())
1222        }
1223        "concat" => {
1224            if args.is_empty() {
1225                return Err(PlanExprError(format!(
1226                    "fn '{name}' requires at least one argument"
1227                )));
1228            }
1229            let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1230            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1231            let refs: Vec<&Column> = cols.iter().collect();
1232            Ok(concat(&refs).into_expr())
1233        }
1234        "concat_ws" => {
1235            require_args_min(name, args, 2)?;
1236            let sep = arg_lit_str(args, 0)?;
1237            let exprs: Result<Vec<Expr>, _> = args.iter().skip(1).map(expr_from_value).collect();
1238            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1239            let refs: Vec<&Column> = cols.iter().collect();
1240            Ok(concat_ws(&sep, &refs).into_expr())
1241        }
1242        "initcap" => {
1243            require_args(name, args, 1)?;
1244            Ok(initcap(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1245        }
1246        "repeat" => {
1247            require_args(name, args, 2)?;
1248            let c = expr_to_column(arg_expr(args, 0)?);
1249            let n = arg_lit_i32(args, 1)?;
1250            Ok(repeat(&c, n).into_expr())
1251        }
1252        "reverse" => {
1253            require_args(name, args, 1)?;
1254            Ok(reverse(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1255        }
1256        "instr" => {
1257            require_args(name, args, 2)?;
1258            let c = expr_to_column(arg_expr(args, 0)?);
1259            let substr = arg_lit_str(args, 1)?;
1260            Ok(instr(&c, &substr).into_expr())
1261        }
1262        "position" => {
1263            require_args_min(name, args, 2)?;
1264            let substr = arg_lit_str(args, 0)?;
1265            let c = expr_to_column(arg_expr(args, 1)?);
1266            let pos = opt_lit_i64(args, 2).unwrap_or(1);
1267            Ok(locate(&substr, &c, pos).into_expr())
1268        }
1269        "locate" => {
1270            require_args_min(name, args, 2)?;
1271            let substr = arg_lit_str(args, 0)?;
1272            let c = expr_to_column(arg_expr(args, 1)?);
1273            let pos = opt_lit_i64(args, 2).unwrap_or(1);
1274            Ok(locate(&substr, &c, pos).into_expr())
1275        }
1276        "ascii" => {
1277            require_args(name, args, 1)?;
1278            Ok(ascii(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1279        }
1280        "format_number" => {
1281            require_args(name, args, 2)?;
1282            let c = expr_to_column(arg_expr(args, 0)?);
1283            let decimals = arg_lit_u32(args, 1)?;
1284            Ok(format_number(&c, decimals).into_expr())
1285        }
1286        "overlay" => {
1287            require_args_min(name, args, 4)?;
1288            let c = expr_to_column(arg_expr(args, 0)?);
1289            let replace_str = arg_lit_str(args, 1)?;
1290            let pos = arg_lit_i64(args, 2)?;
1291            let len = arg_lit_i64(args, 3)?;
1292            Ok(overlay(&c, &replace_str, pos, len).into_expr())
1293        }
1294        "char" => {
1295            require_args(name, args, 1)?;
1296            Ok(rs_char(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1297        }
1298        "chr" => {
1299            require_args(name, args, 1)?;
1300            Ok(chr(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1301        }
1302        "base64" => {
1303            require_args(name, args, 1)?;
1304            Ok(base64(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1305        }
1306        "unbase64" => {
1307            require_args(name, args, 1)?;
1308            Ok(unbase64(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1309        }
1310        "sha1" => {
1311            require_args(name, args, 1)?;
1312            Ok(sha1(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1313        }
1314        "sha2" => {
1315            require_args(name, args, 2)?;
1316            let c = expr_to_column(arg_expr(args, 0)?);
1317            let bits = arg_lit_i32(args, 1)?;
1318            Ok(sha2(&c, bits).into_expr())
1319        }
1320        "md5" => {
1321            require_args(name, args, 1)?;
1322            Ok(md5(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1323        }
1324        "lpad" => {
1325            require_args(name, args, 3)?;
1326            let c = expr_to_column(arg_expr(args, 0)?);
1327            let len = arg_lit_i32(args, 1)?;
1328            let pad = arg_lit_str(args, 2)?;
1329            Ok(lpad(&c, len, &pad).into_expr())
1330        }
1331        "rpad" => {
1332            require_args(name, args, 3)?;
1333            let c = expr_to_column(arg_expr(args, 0)?);
1334            let len = arg_lit_i32(args, 1)?;
1335            let pad = arg_lit_str(args, 2)?;
1336            Ok(rpad(&c, len, &pad).into_expr())
1337        }
1338        "translate" => {
1339            require_args(name, args, 3)?;
1340            let c = expr_to_column(arg_expr(args, 0)?);
1341            let from_str = arg_lit_str(args, 1)?;
1342            let to_str = arg_lit_str(args, 2)?;
1343            Ok(translate(&c, &from_str, &to_str).into_expr())
1344        }
1345        "substring_index" => {
1346            require_args(name, args, 3)?;
1347            let c = expr_to_column(arg_expr(args, 0)?);
1348            let delim = arg_lit_str(args, 1)?;
1349            let count = arg_lit_i64(args, 2)?;
1350            Ok(substring_index(&c, &delim, count).into_expr())
1351        }
1352        "left" => {
1353            require_args(name, args, 2)?;
1354            let c = expr_to_column(arg_expr(args, 0)?);
1355            let n = arg_lit_i64(args, 1)?;
1356            Ok(left(&c, n).into_expr())
1357        }
1358        "right" => {
1359            require_args(name, args, 2)?;
1360            let c = expr_to_column(arg_expr(args, 0)?);
1361            let n = arg_lit_i64(args, 1)?;
1362            Ok(right(&c, n).into_expr())
1363        }
1364        "replace" => {
1365            require_args(name, args, 3)?;
1366            let c = expr_to_column(arg_expr(args, 0)?);
1367            let search = arg_lit_str(args, 1)?;
1368            let replacement = arg_lit_str(args, 2)?;
1369            Ok(replace(&c, &search, &replacement).into_expr())
1370        }
1371        "startswith" => {
1372            require_args(name, args, 2)?;
1373            let c = expr_to_column(arg_expr(args, 0)?);
1374            let prefix = arg_lit_str(args, 1)?;
1375            Ok(startswith(&c, &prefix).into_expr())
1376        }
1377        "endswith" => {
1378            require_args(name, args, 2)?;
1379            let c = expr_to_column(arg_expr(args, 0)?);
1380            let suffix = arg_lit_str(args, 1)?;
1381            Ok(endswith(&c, &suffix).into_expr())
1382        }
1383        "contains" => {
1384            require_args(name, args, 2)?;
1385            let c = expr_to_column(arg_expr(args, 0)?);
1386            let substring = arg_lit_str(args, 1)?;
1387            Ok(contains(&c, &substring).into_expr())
1388        }
1389        "like" => {
1390            require_args_min(name, args, 2)?;
1391            let c = expr_to_column(arg_expr(args, 0)?);
1392            let pattern = arg_lit_str(args, 1)?;
1393            let escape = arg_lit_opt_str(args, 2)?.and_then(|s| s.chars().next());
1394            Ok(like(&c, &pattern, escape).into_expr())
1395        }
1396        "ilike" => {
1397            require_args_min(name, args, 2)?;
1398            let c = expr_to_column(arg_expr(args, 0)?);
1399            let pattern = arg_lit_str(args, 1)?;
1400            let escape = arg_lit_opt_str(args, 2)?.and_then(|s| s.chars().next());
1401            Ok(ilike(&c, &pattern, escape).into_expr())
1402        }
1403        "rlike" | "regexp" => {
1404            require_args(name, args, 2)?;
1405            let c = expr_to_column(arg_expr(args, 0)?);
1406            let pattern = arg_lit_str(args, 1)?;
1407            Ok(rlike(&c, &pattern).into_expr())
1408        }
1409        "soundex" => {
1410            require_args(name, args, 1)?;
1411            Ok(soundex(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1412        }
1413        "levenshtein" => {
1414            require_args(name, args, 2)?;
1415            let a = expr_to_column(arg_expr(args, 0)?);
1416            let b = expr_to_column(arg_expr(args, 1)?);
1417            Ok(crate::functions::levenshtein(&a, &b).into_expr())
1418        }
1419        "crc32" => {
1420            require_args(name, args, 1)?;
1421            Ok(crc32(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1422        }
1423        "xxhash64" => {
1424            require_args(name, args, 1)?;
1425            Ok(xxhash64(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1426        }
1427        "regexp_extract" => {
1428            // Plan execution requires literal pattern and group index (issue #523).
1429            require_args(name, args, 3)?;
1430            let c = expr_to_column(arg_expr(args, 0)?);
1431            let pattern = arg_lit_str(args, 1).map_err(|_| {
1432                PlanExprError(
1433                    "regexp_extract in plan requires literal pattern at arg 2 (column refs not supported)".into(),
1434                )
1435            })?;
1436            let group_index = arg_lit_usize(args, 2).map_err(|_| {
1437                PlanExprError(
1438                    "regexp_extract in plan requires literal group index at arg 3 (column refs not supported)".into(),
1439                )
1440            })?;
1441            Ok(regexp_extract(&c, &pattern, group_index).into_expr())
1442        }
1443        "regexp_replace" => {
1444            require_args(name, args, 3)?;
1445            let c = expr_to_column(arg_expr(args, 0)?);
1446            let pattern = arg_lit_str(args, 1)?;
1447            let replacement = arg_lit_str(args, 2)?;
1448            Ok(regexp_replace(&c, &pattern, &replacement).into_expr())
1449        }
1450        "regexp_extract_all" => {
1451            require_args(name, args, 2)?;
1452            let c = expr_to_column(arg_expr(args, 0)?);
1453            let pattern = arg_lit_str(args, 1)?;
1454            Ok(regexp_extract_all(&c, &pattern).into_expr())
1455        }
1456        "regexp_like" => {
1457            require_args(name, args, 2)?;
1458            let c = expr_to_column(arg_expr(args, 0)?);
1459            let pattern = arg_lit_str(args, 1)?;
1460            Ok(regexp_like(&c, &pattern).into_expr())
1461        }
1462        "regexp_count" => {
1463            require_args(name, args, 2)?;
1464            let c = expr_to_column(arg_expr(args, 0)?);
1465            let pattern = arg_lit_str(args, 1)?;
1466            Ok(regexp_count(&c, &pattern).into_expr())
1467        }
1468        "regexp_substr" => {
1469            require_args(name, args, 2)?;
1470            let c = expr_to_column(arg_expr(args, 0)?);
1471            let pattern = arg_lit_str(args, 1)?;
1472            Ok(regexp_substr(&c, &pattern).into_expr())
1473        }
1474        "regexp_instr" => {
1475            require_args_min(name, args, 2)?;
1476            let c = expr_to_column(arg_expr(args, 0)?);
1477            let pattern = arg_lit_str(args, 1)?;
1478            let group_idx = args.get(2).and_then(|v| lit_as_usize(v).ok());
1479            Ok(regexp_instr(&c, &pattern, group_idx).into_expr())
1480        }
1481        "split" => {
1482            require_args_min(name, args, 2)?;
1483            if args.len() > 3 {
1484                return Err(PlanExprError("split takes at most 3 arguments".to_string()));
1485            }
1486            let c = expr_to_column(arg_expr(args, 0)?);
1487            let delimiter = arg_lit_str(args, 1)?;
1488            let limit = args
1489                .get(2)
1490                .and_then(|v| lit_as_i64(v).ok())
1491                .map(|n| n as i32);
1492            Ok(split(&c, &delimiter, limit).into_expr())
1493        }
1494        "split_part" => {
1495            require_args(name, args, 3)?;
1496            let c = expr_to_column(arg_expr(args, 0)?);
1497            let delimiter = arg_lit_str(args, 1)?;
1498            let part_num = arg_lit_i64(args, 2)?;
1499            Ok(split_part(&c, &delimiter, part_num).into_expr())
1500        }
1501        "find_in_set" => {
1502            require_args(name, args, 2)?;
1503            let str_col = expr_to_column(arg_expr(args, 0)?);
1504            let set_col = expr_to_column(arg_expr(args, 1)?);
1505            Ok(find_in_set(&str_col, &set_col).into_expr())
1506        }
1507        "format_string" | "printf" => {
1508            require_args_min(name, args, 2)?;
1509            let format_str = arg_lit_str(args, 0)?;
1510            let exprs: Result<Vec<Expr>, _> = args.iter().skip(1).map(expr_from_value).collect();
1511            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1512            let refs: Vec<&Column> = cols.iter().collect();
1513            Ok(format_string(&format_str, &refs).into_expr())
1514        }
1515        "lcase" => {
1516            require_args(name, args, 1)?;
1517            Ok(lcase(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1518        }
1519        "ucase" => {
1520            require_args(name, args, 1)?;
1521            Ok(ucase(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1522        }
1523        "mask" => {
1524            require_args_min(name, args, 1)?;
1525            let c = expr_to_column(arg_expr(args, 0)?);
1526            let u = args
1527                .get(1)
1528                .and_then(|v| lit_as_string(v).ok())
1529                .and_then(|s| s.chars().next());
1530            let l = args
1531                .get(2)
1532                .and_then(|v| lit_as_string(v).ok())
1533                .and_then(|s| s.chars().next());
1534            let d = args
1535                .get(3)
1536                .and_then(|v| lit_as_string(v).ok())
1537                .and_then(|s| s.chars().next());
1538            let o = args
1539                .get(4)
1540                .and_then(|v| lit_as_string(v).ok())
1541                .and_then(|s| s.chars().next());
1542            Ok(mask(&c, u, l, d, o).into_expr())
1543        }
1544        "str_to_map" => {
1545            require_args_min(name, args, 1)?;
1546            let c = expr_to_column(arg_expr(args, 0)?);
1547            let pair_delim: Option<String> = arg_lit_opt_str(args, 1)?;
1548            let key_value_delim: Option<String> = arg_lit_opt_str(args, 2)?;
1549            Ok(str_to_map(&c, pair_delim.as_deref(), key_value_delim.as_deref()).into_expr())
1550        }
1551        "get_json_object" => {
1552            require_args(name, args, 2)?;
1553            let c = expr_to_column(arg_expr(args, 0)?);
1554            let path = arg_lit_str(args, 1)?;
1555            Ok(get_json_object(&c, &path).into_expr())
1556        }
1557        "json_tuple" => {
1558            require_args_min(name, args, 2)?;
1559            let c = expr_to_column(arg_expr(args, 0)?);
1560            let keys: Vec<String> = args[1..]
1561                .iter()
1562                .map(lit_as_string)
1563                .collect::<Result<Vec<_>, _>>()?;
1564            let key_refs: Vec<&str> = keys.iter().map(String::as_str).collect();
1565            Ok(json_tuple(&c, &key_refs).into_expr())
1566        }
1567        "isin" => {
1568            // {"fn": "isin", "args": [col_expr, lit1, lit2, ...]} - col.isin(1, 3)
1569            // Empty list or [col, {"lit": null}] -> lit(false) (issue #518)
1570            require_args_min(name, args, 1)?;
1571            let col_expr = arg_expr(args, 0)?;
1572            let values_opt = try_values_for_isin(&args[1..])?;
1573            Ok(match values_opt {
1574                None => lit(false),
1575                Some(values_expr) => col_expr.is_in(values_expr, false),
1576            })
1577        }
1578        _ => expr_from_fn_rest(name, args),
1579    }
1580}
1581
1582fn expr_from_fn_rest(name: &str, args: &[Value]) -> Result<Expr, PlanExprError> {
1583    use crate::Column;
1584    #[allow(unused_imports)]
1585    use crate::functions::{
1586        abs, acos, add_months, array, array_agg, array_append, array_compact, array_contains,
1587        array_distinct, array_except, array_insert, array_intersect, array_join, array_max,
1588        array_min, array_prepend, array_remove, array_size, array_slice, array_sort, array_sum,
1589        array_union, arrays_overlap, arrays_zip, asin, atan, atan2, bround, cast, cbrt, ceiling,
1590        cos, cosh, cot, create_map, csc, curdate, current_catalog, current_database, current_date,
1591        current_schema, current_timestamp, current_timezone, current_user, date_add, date_diff,
1592        date_format, date_from_unix_date, date_part, date_sub, date_trunc, dateadd, datediff,
1593        datepart, day, dayname, dayofmonth, dayofweek, dayofyear, days, decode, degrees, e,
1594        element_at, encode, equal_null, exp, explode, explode_outer, expm1, extract, factorial,
1595        floor, from_unixtime, from_utc_timestamp, get, get_json_object, greatest, grouping,
1596        grouping_id, hash, hour, hours, hypot, input_file_name, last_day, least, localtimestamp,
1597        log, log1p, log2, log10, make_date, make_interval, make_timestamp, make_timestamp_ntz,
1598        map_keys, map_values, minute, minutes, monotonically_increasing_id, month, months,
1599        months_between, negate, next_day, now, nullif, nvl, nvl2, parse_url, pi, pmod, positive,
1600        pow, quarter, radians, rint, round, sec, second, shift_left, shift_right, signum, sin,
1601        sinh, size, spark_partition_id, sqrt, tan, tanh, timestamp_micros, timestamp_millis,
1602        timestamp_seconds, timestampadd, timestampdiff, to_binary, to_char, to_date, to_degrees,
1603        to_number, to_radians, to_timestamp, to_unix_timestamp, to_utc_timestamp, to_varchar,
1604        trunc, try_add, try_cast, try_divide, try_element_at, try_multiply, try_subtract,
1605        try_to_number, try_to_timestamp, typeof_, unix_date, unix_micros, unix_millis,
1606        unix_seconds, unix_timestamp, unix_timestamp_now, user, weekday, weekofyear, width_bucket,
1607        year, years,
1608    };
1609
1610    // --- Math / numeric ---
1611    match name {
1612        "abs" => {
1613            require_args(name, args, 1)?;
1614            Ok(abs(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1615        }
1616        "ceil" | "ceiling" => {
1617            require_args(name, args, 1)?;
1618            Ok(ceiling(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1619        }
1620        "floor" => {
1621            require_args(name, args, 1)?;
1622            Ok(floor(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1623        }
1624        "round" => {
1625            require_args_min(name, args, 1)?;
1626            let c = expr_to_column(arg_expr(args, 0)?);
1627            let decimals = opt_lit_i64(args, 1).map(|n| n as u32).unwrap_or(0);
1628            Ok(round(&c, decimals).into_expr())
1629        }
1630        "bround" => {
1631            require_args_min(name, args, 1)?;
1632            let c = expr_to_column(arg_expr(args, 0)?);
1633            let scale = opt_lit_i64(args, 1).unwrap_or(0) as i32;
1634            Ok(bround(&c, scale).into_expr())
1635        }
1636        "negate" | "negative" => {
1637            require_args(name, args, 1)?;
1638            Ok(negate(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1639        }
1640        "positive" => {
1641            require_args(name, args, 1)?;
1642            Ok(positive(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1643        }
1644        "sqrt" => {
1645            require_args(name, args, 1)?;
1646            Ok(sqrt(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1647        }
1648        "pow" | "power" => {
1649            require_args(name, args, 2)?;
1650            let c = expr_to_column(arg_expr(args, 0)?);
1651            let exp_val = arg_lit_i64(args, 1)?;
1652            Ok(pow(&c, exp_val).into_expr())
1653        }
1654        "exp" => {
1655            require_args(name, args, 1)?;
1656            Ok(exp(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1657        }
1658        "log" | "ln" => {
1659            if args.len() == 1 {
1660                Ok(log(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1661            } else if args.len() == 2 {
1662                let col_expr = expr_to_column(arg_expr(args, 0)?);
1663                let base = match &args[1] {
1664                    Value::Number(n) => n
1665                        .as_f64()
1666                        .ok_or_else(|| PlanExprError("log base must be a number".to_string()))?,
1667                    _ => return Err(PlanExprError("log base must be a number".to_string())),
1668                };
1669                Ok(crate::functions::log_with_base(&col_expr, base).into_expr())
1670            } else {
1671                Err(PlanExprError(format!(
1672                    "fn '{name}' requires 1 or 2 arguments"
1673                )))
1674            }
1675        }
1676        "sin" => {
1677            require_args(name, args, 1)?;
1678            Ok(sin(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1679        }
1680        "cos" => {
1681            require_args(name, args, 1)?;
1682            Ok(cos(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1683        }
1684        "tan" => {
1685            require_args(name, args, 1)?;
1686            Ok(tan(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1687        }
1688        "asin" => {
1689            require_args(name, args, 1)?;
1690            Ok(crate::functions::asin(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1691        }
1692        "acos" => {
1693            require_args(name, args, 1)?;
1694            Ok(crate::functions::acos(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1695        }
1696        "atan" => {
1697            require_args(name, args, 1)?;
1698            Ok(crate::functions::atan(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1699        }
1700        "atan2" => {
1701            require_args(name, args, 2)?;
1702            let y = expr_to_column(arg_expr(args, 0)?);
1703            let x = expr_to_column(arg_expr(args, 1)?);
1704            Ok(atan2(&y, &x).into_expr())
1705        }
1706        "degrees" => {
1707            require_args(name, args, 1)?;
1708            Ok(degrees(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1709        }
1710        "radians" => {
1711            require_args(name, args, 1)?;
1712            Ok(radians(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1713        }
1714        "signum" | "sign" => {
1715            require_args(name, args, 1)?;
1716            Ok(signum(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1717        }
1718        "cot" => {
1719            require_args(name, args, 1)?;
1720            Ok(cot(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1721        }
1722        "csc" => {
1723            require_args(name, args, 1)?;
1724            Ok(csc(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1725        }
1726        "sec" => {
1727            require_args(name, args, 1)?;
1728            Ok(sec(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1729        }
1730        "e" => {
1731            if !args.is_empty() {
1732                return Err(PlanExprError("fn 'e' takes no arguments".to_string()));
1733            }
1734            Ok(e().into_expr())
1735        }
1736        "pi" => {
1737            if !args.is_empty() {
1738                return Err(PlanExprError("fn 'pi' takes no arguments".to_string()));
1739            }
1740            Ok(pi().into_expr())
1741        }
1742        "pmod" => {
1743            require_args(name, args, 2)?;
1744            let a = expr_to_column(arg_expr(args, 0)?);
1745            let b = expr_to_column(arg_expr(args, 1)?);
1746            Ok(pmod(&a, &b).into_expr())
1747        }
1748        "factorial" => {
1749            require_args(name, args, 1)?;
1750            Ok(factorial(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1751        }
1752        "hypot" => {
1753            require_args(name, args, 2)?;
1754            let x = expr_to_column(arg_expr(args, 0)?);
1755            let y = expr_to_column(arg_expr(args, 1)?);
1756            Ok(hypot(&x, &y).into_expr())
1757        }
1758        "cosh" => {
1759            require_args(name, args, 1)?;
1760            Ok(cosh(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1761        }
1762        "sinh" => {
1763            require_args(name, args, 1)?;
1764            Ok(sinh(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1765        }
1766        "tanh" => {
1767            require_args(name, args, 1)?;
1768            Ok(tanh(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1769        }
1770        "cbrt" => {
1771            require_args(name, args, 1)?;
1772            Ok(cbrt(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1773        }
1774        "expm1" => {
1775            require_args(name, args, 1)?;
1776            Ok(crate::functions::expm1(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1777        }
1778        "log1p" => {
1779            require_args(name, args, 1)?;
1780            Ok(log1p(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1781        }
1782        "log10" => {
1783            require_args(name, args, 1)?;
1784            Ok(log10(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1785        }
1786        "log2" => {
1787            require_args(name, args, 1)?;
1788            Ok(log2(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1789        }
1790        "rint" => {
1791            require_args(name, args, 1)?;
1792            Ok(crate::functions::rint(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1793        }
1794        "to_degrees" => {
1795            require_args(name, args, 1)?;
1796            Ok(to_degrees(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1797        }
1798        "to_radians" => {
1799            require_args(name, args, 1)?;
1800            Ok(to_radians(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1801        }
1802        // --- Type / conditional ---
1803        "cast" => {
1804            require_args(name, args, 2)?;
1805            let c = expr_to_column(arg_expr(args, 0)?);
1806            let type_name = arg_lit_str(args, 1)?;
1807            Ok(cast(&c, &type_name).map_err(PlanExprError)?.into_expr())
1808        }
1809        "try_cast" => {
1810            require_args(name, args, 2)?;
1811            let c = expr_to_column(arg_expr(args, 0)?);
1812            let type_name = arg_lit_str(args, 1)?;
1813            Ok(try_cast(&c, &type_name).map_err(PlanExprError)?.into_expr())
1814        }
1815        "nvl" | "ifnull" => {
1816            require_args(name, args, 2)?;
1817            let a = expr_to_column(arg_expr(args, 0)?);
1818            let b = expr_to_column(arg_expr(args, 1)?);
1819            Ok(nvl(&a, &b).into_expr())
1820        }
1821        "nullif" => {
1822            require_args(name, args, 2)?;
1823            let a = expr_to_column(arg_expr(args, 0)?);
1824            let b = expr_to_column(arg_expr(args, 1)?);
1825            Ok(nullif(&a, &b).into_expr())
1826        }
1827        "greatest" => {
1828            require_args_min(name, args, 1)?;
1829            let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1830            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1831            let refs: Vec<&Column> = cols.iter().collect();
1832            Ok(greatest(&refs).map_err(PlanExprError)?.into_expr())
1833        }
1834        "least" => {
1835            require_args_min(name, args, 1)?;
1836            let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1837            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1838            let refs: Vec<&Column> = cols.iter().collect();
1839            Ok(least(&refs).map_err(PlanExprError)?.into_expr())
1840        }
1841        "typeof" => {
1842            require_args(name, args, 1)?;
1843            Ok(typeof_(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1844        }
1845        "try_divide" => {
1846            require_args(name, args, 2)?;
1847            let a = expr_to_column(arg_expr(args, 0)?);
1848            let b = expr_to_column(arg_expr(args, 1)?);
1849            Ok(try_divide(&a, &b).into_expr())
1850        }
1851        // PySpark-style arithmetic with string/numeric coercion (issue #201)
1852        "add" | "+" => {
1853            require_args(name, args, 2)?;
1854            let a = expr_to_column(arg_expr(args, 0)?);
1855            let b = expr_to_column(arg_expr(args, 1)?);
1856            Ok(a.add_pyspark(&b).into_expr())
1857        }
1858        "subtract" | "-" => {
1859            require_args(name, args, 2)?;
1860            let a = expr_to_column(arg_expr(args, 0)?);
1861            let b = expr_to_column(arg_expr(args, 1)?);
1862            Ok(a.subtract_pyspark(&b).into_expr())
1863        }
1864        "multiply" | "*" => {
1865            require_args(name, args, 2)?;
1866            let a = expr_to_column(arg_expr(args, 0)?);
1867            let b = expr_to_column(arg_expr(args, 1)?);
1868            Ok(a.multiply_pyspark(&b).into_expr())
1869        }
1870        "divide" | "/" => {
1871            require_args(name, args, 2)?;
1872            let a = expr_to_column(arg_expr(args, 0)?);
1873            let b = expr_to_column(arg_expr(args, 1)?);
1874            Ok(a.divide_pyspark(&b).into_expr())
1875        }
1876        "mod" | "remainder" | "%" => {
1877            require_args(name, args, 2)?;
1878            let a = expr_to_column(arg_expr(args, 0)?);
1879            let b = expr_to_column(arg_expr(args, 1)?);
1880            Ok(a.mod_pyspark(&b).into_expr())
1881        }
1882        "try_add" => {
1883            require_args(name, args, 2)?;
1884            let a = expr_to_column(arg_expr(args, 0)?);
1885            let b = expr_to_column(arg_expr(args, 1)?);
1886            Ok(try_add(&a, &b).into_expr())
1887        }
1888        "try_subtract" => {
1889            require_args(name, args, 2)?;
1890            let a = expr_to_column(arg_expr(args, 0)?);
1891            let b = expr_to_column(arg_expr(args, 1)?);
1892            Ok(try_subtract(&a, &b).into_expr())
1893        }
1894        "try_multiply" => {
1895            require_args(name, args, 2)?;
1896            let a = expr_to_column(arg_expr(args, 0)?);
1897            let b = expr_to_column(arg_expr(args, 1)?);
1898            Ok(try_multiply(&a, &b).into_expr())
1899        }
1900        "width_bucket" => {
1901            require_args(name, args, 4)?;
1902            let val = expr_to_column(arg_expr(args, 0)?);
1903            let min_val = arg_lit_f64(args, 1)?;
1904            let max_val = arg_lit_f64(args, 2)?;
1905            let num_bucket = arg_lit_i64(args, 3)?;
1906            if num_bucket <= 0 {
1907                return Err(PlanExprError(
1908                    "width_bucket: num_bucket must be positive".into(),
1909                ));
1910            }
1911            Ok(width_bucket(&val, min_val, max_val, num_bucket).into_expr())
1912        }
1913        "equal_null" => {
1914            require_args(name, args, 2)?;
1915            let a = expr_to_column(arg_expr(args, 0)?);
1916            let b = expr_to_column(arg_expr(args, 1)?);
1917            Ok(equal_null(&a, &b).into_expr())
1918        }
1919        // --- Datetime ---
1920        "year" => {
1921            require_args(name, args, 1)?;
1922            Ok(year(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1923        }
1924        "month" => {
1925            require_args(name, args, 1)?;
1926            Ok(month(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1927        }
1928        "day" | "dayofmonth" => {
1929            require_args(name, args, 1)?;
1930            Ok(day(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1931        }
1932        "hour" => {
1933            require_args(name, args, 1)?;
1934            Ok(hour(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1935        }
1936        "minute" => {
1937            require_args(name, args, 1)?;
1938            Ok(minute(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1939        }
1940        "second" => {
1941            require_args(name, args, 1)?;
1942            Ok(second(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1943        }
1944        "quarter" => {
1945            require_args(name, args, 1)?;
1946            Ok(quarter(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1947        }
1948        "weekofyear" => {
1949            require_args(name, args, 1)?;
1950            Ok(weekofyear(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1951        }
1952        "dayofweek" => {
1953            require_args(name, args, 1)?;
1954            Ok(dayofweek(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1955        }
1956        "dayofyear" => {
1957            require_args(name, args, 1)?;
1958            Ok(dayofyear(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1959        }
1960        "to_date" => {
1961            require_args_min(name, args, 1)?;
1962            if args.len() > 2 {
1963                return Err(PlanExprError(format!(
1964                    "fn '{name}' takes at most 2 argument(s)"
1965                )));
1966            }
1967            let col = expr_to_column(arg_expr(args, 0)?);
1968            let format_str = if args.len() == 2 {
1969                Some(arg_lit_str(args, 1)?)
1970            } else {
1971                None
1972            };
1973            to_date(&col, format_str.as_deref())
1974                .map_err(PlanExprError)
1975                .map(|c| c.into_expr())
1976        }
1977        "date_format" => {
1978            require_args(name, args, 2)?;
1979            let c = expr_to_column(arg_expr(args, 0)?);
1980            let format = arg_lit_str(args, 1)?;
1981            Ok(date_format(&c, &format).into_expr())
1982        }
1983        "date_add" => {
1984            require_args(name, args, 2)?;
1985            let c = expr_to_column(arg_expr(args, 0)?);
1986            let n = arg_lit_i32(args, 1)?;
1987            Ok(date_add(&c, n).into_expr())
1988        }
1989        "date_sub" => {
1990            require_args(name, args, 2)?;
1991            let c = expr_to_column(arg_expr(args, 0)?);
1992            let n = arg_lit_i32(args, 1)?;
1993            Ok(date_sub(&c, n).into_expr())
1994        }
1995        "datediff" | "date_diff" => {
1996            require_args(name, args, 2)?;
1997            let end = expr_to_column(arg_expr(args, 0)?);
1998            let start = expr_to_column(arg_expr(args, 1)?);
1999            Ok(datediff(&end, &start).into_expr())
2000        }
2001        "last_day" => {
2002            require_args(name, args, 1)?;
2003            Ok(last_day(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2004        }
2005        "trunc" => {
2006            require_args(name, args, 2)?;
2007            let c = expr_to_column(arg_expr(args, 0)?);
2008            let format = arg_lit_str(args, 1)?;
2009            Ok(trunc(&c, &format).into_expr())
2010        }
2011        "date_trunc" => {
2012            require_args(name, args, 2)?;
2013            let format = arg_lit_str(args, 0)?;
2014            let c = expr_to_column(arg_expr(args, 1)?);
2015            Ok(date_trunc(&format, &c).into_expr())
2016        }
2017        "add_months" => {
2018            require_args(name, args, 2)?;
2019            let c = expr_to_column(arg_expr(args, 0)?);
2020            let n = arg_lit_i32(args, 1)?;
2021            Ok(add_months(&c, n).into_expr())
2022        }
2023        "months_between" => {
2024            require_args_min(name, args, 2)?;
2025            let end = expr_to_column(arg_expr(args, 0)?);
2026            let start = expr_to_column(arg_expr(args, 1)?);
2027            let round_off = args
2028                .get(2)
2029                .and_then(|v| v.get("lit").and_then(Value::as_bool))
2030                .unwrap_or(true);
2031            Ok(months_between(&end, &start, round_off).into_expr())
2032        }
2033        "next_day" => {
2034            require_args(name, args, 2)?;
2035            let c = expr_to_column(arg_expr(args, 0)?);
2036            let day_of_week = arg_lit_str(args, 1)?;
2037            Ok(next_day(&c, &day_of_week).into_expr())
2038        }
2039        "unix_timestamp" => {
2040            if args.is_empty() {
2041                Ok(unix_timestamp_now().into_expr())
2042            } else {
2043                require_args_min(name, args, 1)?;
2044                let c = expr_to_column(arg_expr(args, 0)?);
2045                let format: Option<String> = arg_lit_opt_str(args, 1)?;
2046                Ok(unix_timestamp(&c, format.as_deref()).into_expr())
2047            }
2048        }
2049        "from_unixtime" => {
2050            require_args_min(name, args, 1)?;
2051            let c = expr_to_column(arg_expr(args, 0)?);
2052            let format: Option<String> = arg_lit_opt_str(args, 1)?;
2053            Ok(from_unixtime(&c, format.as_deref()).into_expr())
2054        }
2055        "to_unix_timestamp" => {
2056            require_args_min(name, args, 1)?;
2057            let c = expr_to_column(arg_expr(args, 0)?);
2058            let format: Option<String> = arg_lit_opt_str(args, 1)?;
2059            Ok(to_unix_timestamp(&c, format.as_deref()).into_expr())
2060        }
2061        "make_date" => {
2062            require_args(name, args, 3)?;
2063            let y = expr_to_column(arg_expr(args, 0)?);
2064            let m = expr_to_column(arg_expr(args, 1)?);
2065            let d = expr_to_column(arg_expr(args, 2)?);
2066            Ok(make_date(&y, &m, &d).into_expr())
2067        }
2068        "make_timestamp" => {
2069            require_args_min(name, args, 6)?;
2070            let y = expr_to_column(arg_expr(args, 0)?);
2071            let mo = expr_to_column(arg_expr(args, 1)?);
2072            let d = expr_to_column(arg_expr(args, 2)?);
2073            let h = expr_to_column(arg_expr(args, 3)?);
2074            let mi = expr_to_column(arg_expr(args, 4)?);
2075            let s = expr_to_column(arg_expr(args, 5)?);
2076            let tz: Option<String> = arg_lit_opt_str(args, 6)?;
2077            Ok(make_timestamp(&y, &mo, &d, &h, &mi, &s, tz.as_deref()).into_expr())
2078        }
2079        "make_timestamp_ntz" => {
2080            require_args(name, args, 6)?;
2081            let y = expr_to_column(arg_expr(args, 0)?);
2082            let mo = expr_to_column(arg_expr(args, 1)?);
2083            let d = expr_to_column(arg_expr(args, 2)?);
2084            let h = expr_to_column(arg_expr(args, 3)?);
2085            let mi = expr_to_column(arg_expr(args, 4)?);
2086            let s = expr_to_column(arg_expr(args, 5)?);
2087            Ok(make_timestamp_ntz(&y, &mo, &d, &h, &mi, &s).into_expr())
2088        }
2089        "timestampadd" => {
2090            require_args(name, args, 3)?;
2091            let unit = arg_lit_str(args, 0)?;
2092            let amount = expr_to_column(arg_expr(args, 1)?);
2093            let ts = expr_to_column(arg_expr(args, 2)?);
2094            Ok(timestampadd(&unit, &amount, &ts).into_expr())
2095        }
2096        "timestampdiff" => {
2097            require_args(name, args, 3)?;
2098            let unit = arg_lit_str(args, 0)?;
2099            let start = expr_to_column(arg_expr(args, 1)?);
2100            let end = expr_to_column(arg_expr(args, 2)?);
2101            Ok(timestampdiff(&unit, &start, &end).into_expr())
2102        }
2103        "days" => {
2104            require_args(name, args, 1)?;
2105            let n = arg_lit_i64(args, 0)?;
2106            Ok(days(n).into_expr())
2107        }
2108        "hours" => {
2109            require_args(name, args, 1)?;
2110            let n = arg_lit_i64(args, 0)?;
2111            Ok(hours(n).into_expr())
2112        }
2113        "minutes" => {
2114            require_args(name, args, 1)?;
2115            let n = arg_lit_i64(args, 0)?;
2116            Ok(minutes(n).into_expr())
2117        }
2118        "months" => {
2119            require_args(name, args, 1)?;
2120            let n = arg_lit_i64(args, 0)?;
2121            Ok(months(n).into_expr())
2122        }
2123        "years" => {
2124            require_args(name, args, 1)?;
2125            let n = arg_lit_i64(args, 0)?;
2126            Ok(years(n).into_expr())
2127        }
2128        "from_utc_timestamp" => {
2129            require_args(name, args, 2)?;
2130            let c = expr_to_column(arg_expr(args, 0)?);
2131            let tz = arg_lit_str(args, 1)?;
2132            Ok(from_utc_timestamp(&c, &tz).into_expr())
2133        }
2134        "to_utc_timestamp" => {
2135            require_args(name, args, 2)?;
2136            let c = expr_to_column(arg_expr(args, 0)?);
2137            let tz = arg_lit_str(args, 1)?;
2138            Ok(to_utc_timestamp(&c, &tz).into_expr())
2139        }
2140        "convert_timezone" => {
2141            require_args(name, args, 3)?;
2142            let source_tz = arg_lit_str(args, 0)?;
2143            let target_tz = arg_lit_str(args, 1)?;
2144            let c = expr_to_column(arg_expr(args, 2)?);
2145            Ok(crate::functions::convert_timezone(&source_tz, &target_tz, &c).into_expr())
2146        }
2147        "current_date" | "curdate" => {
2148            if !args.is_empty() {
2149                return Err(PlanExprError(format!("fn '{name}' takes no arguments")));
2150            }
2151            Ok(current_date().into_expr())
2152        }
2153        "current_timestamp" | "now" => {
2154            if !args.is_empty() {
2155                return Err(PlanExprError(format!("fn '{name}' takes no arguments")));
2156            }
2157            Ok(current_timestamp().into_expr())
2158        }
2159        "localtimestamp" => {
2160            if !args.is_empty() {
2161                return Err(PlanExprError(
2162                    "fn 'localtimestamp' takes no arguments".to_string(),
2163                ));
2164            }
2165            Ok(localtimestamp().into_expr())
2166        }
2167        "extract" | "date_part" | "datepart" => {
2168            require_args(name, args, 2)?;
2169            let c = expr_to_column(arg_expr(args, 0)?);
2170            let field = arg_lit_str(args, 1)?;
2171            Ok(extract(&c, &field).into_expr())
2172        }
2173        "dateadd" => {
2174            require_args(name, args, 2)?;
2175            let c = expr_to_column(arg_expr(args, 0)?);
2176            let n = arg_lit_i32(args, 1)?;
2177            Ok(dateadd(&c, n).into_expr())
2178        }
2179        "unix_micros" | "unix_millis" | "unix_seconds" => {
2180            require_args(name, args, 1)?;
2181            let c = expr_to_column(arg_expr(args, 0)?);
2182            let out = match name {
2183                "unix_micros" => unix_micros(&c),
2184                "unix_millis" => unix_millis(&c),
2185                _ => unix_seconds(&c),
2186            };
2187            Ok(out.into_expr())
2188        }
2189        "dayname" => {
2190            require_args(name, args, 1)?;
2191            Ok(dayname(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2192        }
2193        "weekday" => {
2194            require_args(name, args, 1)?;
2195            Ok(weekday(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2196        }
2197        "timestamp_seconds" | "timestamp_millis" | "timestamp_micros" => {
2198            require_args(name, args, 1)?;
2199            let c = expr_to_column(arg_expr(args, 0)?);
2200            let out = match name {
2201                "timestamp_seconds" => timestamp_seconds(&c),
2202                "timestamp_millis" => timestamp_millis(&c),
2203                _ => timestamp_micros(&c),
2204            };
2205            Ok(out.into_expr())
2206        }
2207        "unix_date" => {
2208            require_args(name, args, 1)?;
2209            Ok(unix_date(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2210        }
2211        "date_from_unix_date" => {
2212            require_args(name, args, 1)?;
2213            Ok(date_from_unix_date(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2214        }
2215        "to_char" | "to_varchar" => {
2216            require_args_min(name, args, 1)?;
2217            let c = expr_to_column(arg_expr(args, 0)?);
2218            let format: Option<String> = arg_lit_opt_str(args, 1)?;
2219            Ok(to_char(&c, format.as_deref())
2220                .map_err(PlanExprError)?
2221                .into_expr())
2222        }
2223        "to_timestamp" => {
2224            require_args_min(name, args, 1)?;
2225            let c = expr_to_column(arg_expr(args, 0)?);
2226            let format: Option<String> = arg_lit_opt_str(args, 1)?;
2227            Ok(to_timestamp(&c, format.as_deref())
2228                .map_err(PlanExprError)?
2229                .into_expr())
2230        }
2231        "try_to_timestamp" => {
2232            require_args_min(name, args, 1)?;
2233            let c = expr_to_column(arg_expr(args, 0)?);
2234            let format: Option<String> = arg_lit_opt_str(args, 1)?;
2235            Ok(try_to_timestamp(&c, format.as_deref())
2236                .map_err(PlanExprError)?
2237                .into_expr())
2238        }
2239        "to_number" | "try_to_number" => {
2240            require_args_min(name, args, 1)?;
2241            let c = expr_to_column(arg_expr(args, 0)?);
2242            let format: Option<String> = arg_lit_opt_str(args, 1)?;
2243            let out = if name == "to_number" {
2244                to_number(&c, format.as_deref()).map_err(PlanExprError)?
2245            } else {
2246                try_to_number(&c, format.as_deref()).map_err(PlanExprError)?
2247            };
2248            Ok(out.into_expr())
2249        }
2250        "current_timezone" => {
2251            if !args.is_empty() {
2252                return Err(PlanExprError(
2253                    "fn 'current_timezone' takes no arguments".to_string(),
2254                ));
2255            }
2256            Ok(current_timezone().into_expr())
2257        }
2258        // --- Zero-arg JVM/runtime stubs ---
2259        "spark_partition_id"
2260        | "input_file_name"
2261        | "monotonically_increasing_id"
2262        | "current_catalog"
2263        | "current_database"
2264        | "current_schema"
2265        | "current_user"
2266        | "user" => {
2267            if !args.is_empty() {
2268                return Err(PlanExprError(format!("fn '{name}' takes no arguments")));
2269            }
2270            let out = match name {
2271                "spark_partition_id" => spark_partition_id(),
2272                "input_file_name" => input_file_name(),
2273                "monotonically_increasing_id" => monotonically_increasing_id(),
2274                "current_catalog" => current_catalog(),
2275                "current_database" => current_database(),
2276                "current_schema" => current_schema(),
2277                "current_user" => current_user(),
2278                "user" => user(),
2279                _ => current_catalog(), // unreachable
2280            };
2281            Ok(out.into_expr())
2282        }
2283        "hash" => {
2284            require_args_min(name, args, 1)?;
2285            let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
2286            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
2287            let refs: Vec<&Column> = cols.iter().collect();
2288            Ok(crate::functions::hash(&refs).into_expr())
2289        }
2290        "shift_left" => {
2291            require_args(name, args, 2)?;
2292            let c = expr_to_column(arg_expr(args, 0)?);
2293            let n = arg_lit_i32(args, 1)?;
2294            Ok(shift_left(&c, n).into_expr())
2295        }
2296        "shift_right" => {
2297            require_args(name, args, 2)?;
2298            let c = expr_to_column(arg_expr(args, 0)?);
2299            let n = arg_lit_i32(args, 1)?;
2300            Ok(shift_right(&c, n).into_expr())
2301        }
2302        "version" => {
2303            if !args.is_empty() {
2304                return Err(PlanExprError("fn 'version' takes no arguments".to_string()));
2305            }
2306            Ok(crate::functions::version().into_expr())
2307        }
2308        // --- Array / list ---
2309        "array" => {
2310            let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
2311            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
2312            let refs: Vec<&Column> = cols.iter().collect();
2313            Ok(array(&refs)
2314                .map_err(|e| PlanExprError(e.to_string()))?
2315                .into_expr())
2316        }
2317        "array_max" => {
2318            require_args(name, args, 1)?;
2319            Ok(crate::functions::array_max(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2320        }
2321        "array_min" => {
2322            require_args(name, args, 1)?;
2323            Ok(crate::functions::array_min(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2324        }
2325        "array_size" | "size" | "cardinality" => {
2326            require_args(name, args, 1)?;
2327            let c = expr_to_column(arg_expr(args, 0)?);
2328            Ok(array_size(&c).into_expr())
2329        }
2330        "element_at" => {
2331            require_args(name, args, 2)?;
2332            let c = expr_to_column(arg_expr(args, 0)?);
2333            let idx = arg_lit_i64(args, 1)?;
2334            Ok(element_at(&c, idx).into_expr())
2335        }
2336        "try_element_at" => {
2337            require_args(name, args, 2)?;
2338            let c = expr_to_column(arg_expr(args, 0)?);
2339            let idx = arg_lit_i64(args, 1)?;
2340            Ok(try_element_at(&c, idx).into_expr())
2341        }
2342        "array_contains" => {
2343            require_args(name, args, 2)?;
2344            let arr = expr_to_column(arg_expr(args, 0)?);
2345            let val = expr_to_column(arg_expr(args, 1)?);
2346            Ok(array_contains(&arr, &val).into_expr())
2347        }
2348        "array_join" => {
2349            require_args(name, args, 2)?;
2350            let c = expr_to_column(arg_expr(args, 0)?);
2351            let sep = arg_lit_str(args, 1)?;
2352            Ok(array_join(&c, &sep).into_expr())
2353        }
2354        "array_sort" => {
2355            require_args(name, args, 1)?;
2356            Ok(array_sort(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2357        }
2358        "array_distinct" => {
2359            require_args(name, args, 1)?;
2360            Ok(array_distinct(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2361        }
2362        "array_slice" => {
2363            require_args_min(name, args, 2)?;
2364            let c = expr_to_column(arg_expr(args, 0)?);
2365            let start = arg_lit_i64(args, 1)?;
2366            let length = opt_lit_i64(args, 2);
2367            Ok(array_slice(&c, start, length).into_expr())
2368        }
2369        "array_compact" => {
2370            require_args(name, args, 1)?;
2371            Ok(array_compact(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2372        }
2373        "array_remove" => {
2374            require_args(name, args, 2)?;
2375            let arr = expr_to_column(arg_expr(args, 0)?);
2376            let val = expr_to_column(arg_expr(args, 1)?);
2377            Ok(array_remove(&arr, &val).into_expr())
2378        }
2379        "explode" => {
2380            require_args(name, args, 1)?;
2381            Ok(explode(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2382        }
2383        "explode_outer" => {
2384            require_args(name, args, 1)?;
2385            Ok(explode_outer(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2386        }
2387        "inline" => {
2388            require_args(name, args, 1)?;
2389            Ok(crate::functions::inline(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2390        }
2391        "inline_outer" => {
2392            require_args(name, args, 1)?;
2393            Ok(crate::functions::inline_outer(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2394        }
2395        "sequence" => {
2396            require_args_min(name, args, 2)?;
2397            let start = expr_to_column(arg_expr(args, 0)?);
2398            let stop = expr_to_column(arg_expr(args, 1)?);
2399            let step = if args.len() > 2 {
2400                Some(expr_to_column(arg_expr(args, 2)?))
2401            } else {
2402                None
2403            };
2404            Ok(crate::functions::sequence(&start, &stop, step.as_ref()).into_expr())
2405        }
2406        "shuffle" => {
2407            require_args(name, args, 1)?;
2408            Ok(crate::functions::shuffle(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2409        }
2410        "array_position" => {
2411            require_args(name, args, 2)?;
2412            let arr = expr_to_column(arg_expr(args, 0)?);
2413            let val = expr_to_column(arg_expr(args, 1)?);
2414            Ok(crate::functions::array_position(&arr, &val).into_expr())
2415        }
2416        "array_append" => {
2417            require_args(name, args, 2)?;
2418            let arr = expr_to_column(arg_expr(args, 0)?);
2419            let elem = expr_to_column(arg_expr(args, 1)?);
2420            Ok(array_append(&arr, &elem).into_expr())
2421        }
2422        "array_prepend" => {
2423            require_args(name, args, 2)?;
2424            let arr = expr_to_column(arg_expr(args, 0)?);
2425            let elem = expr_to_column(arg_expr(args, 1)?);
2426            Ok(array_prepend(&arr, &elem).into_expr())
2427        }
2428        "array_insert" => {
2429            require_args(name, args, 3)?;
2430            let arr = expr_to_column(arg_expr(args, 0)?);
2431            let pos = expr_to_column(arg_expr(args, 1)?);
2432            let elem = expr_to_column(arg_expr(args, 2)?);
2433            Ok(array_insert(&arr, &pos, &elem).into_expr())
2434        }
2435        "array_except" => {
2436            require_args(name, args, 2)?;
2437            let a = expr_to_column(arg_expr(args, 0)?);
2438            let b = expr_to_column(arg_expr(args, 1)?);
2439            Ok(array_except(&a, &b).into_expr())
2440        }
2441        "array_intersect" => {
2442            require_args(name, args, 2)?;
2443            let a = expr_to_column(arg_expr(args, 0)?);
2444            let b = expr_to_column(arg_expr(args, 1)?);
2445            Ok(array_intersect(&a, &b).into_expr())
2446        }
2447        "array_union" => {
2448            require_args(name, args, 2)?;
2449            let a = expr_to_column(arg_expr(args, 0)?);
2450            let b = expr_to_column(arg_expr(args, 1)?);
2451            Ok(array_union(&a, &b).into_expr())
2452        }
2453        "arrays_overlap" => {
2454            require_args(name, args, 2)?;
2455            let a = expr_to_column(arg_expr(args, 0)?);
2456            let b = expr_to_column(arg_expr(args, 1)?);
2457            Ok(arrays_overlap(&a, &b).into_expr())
2458        }
2459        "arrays_zip" => {
2460            require_args(name, args, 2)?;
2461            let a = expr_to_column(arg_expr(args, 0)?);
2462            let b = expr_to_column(arg_expr(args, 1)?);
2463            Ok(arrays_zip(&a, &b).into_expr())
2464        }
2465        "array_agg" => {
2466            require_args(name, args, 1)?;
2467            Ok(array_agg(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2468        }
2469        "array_sum" => {
2470            require_args(name, args, 1)?;
2471            Ok(array_sum(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2472        }
2473        // --- Map / struct ---
2474        "create_map" | "createMap" => {
2475            // PySpark F.create_map(key1, val1, ...) or empty map {} per row (#512, #542).
2476            let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
2477            let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
2478            let refs: Vec<&Column> = cols.iter().collect();
2479            Ok(create_map(&refs)
2480                .map_err(|e| PlanExprError(e.to_string()))?
2481                .into_expr())
2482        }
2483        "map_keys" => {
2484            require_args(name, args, 1)?;
2485            Ok(map_keys(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2486        }
2487        "map_values" => {
2488            require_args(name, args, 1)?;
2489            Ok(map_values(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2490        }
2491        "get" => {
2492            require_args(name, args, 2)?;
2493            let map_col = expr_to_column(arg_expr(args, 0)?);
2494            let key = expr_to_column(arg_expr(args, 1)?);
2495            Ok(get(&map_col, &key).into_expr())
2496        }
2497        "get_field" | "getField" => {
2498            // Struct field by name (PySpark Column.getField). Used e.g. to assert withField result (issue #541).
2499            require_args(name, args, 2)?;
2500            let struct_col = expr_to_column(arg_expr(args, 0)?);
2501            let field_name = lit_as_string(&args[1])?;
2502            Ok(struct_col.get_field(&field_name).into_expr())
2503        }
2504        "get_item" => {
2505            // Array: get_item(col, 0); map: get_item(col, "key") (issue #522)
2506            require_args(name, args, 2)?;
2507            let col_c = expr_to_column(arg_expr(args, 0)?);
2508            let second = &args[1];
2509            if let Some(idx) = second.get("lit").and_then(|v| v.as_i64()) {
2510                Ok(col_c.get_item(idx).into_expr())
2511            } else {
2512                let key = expr_to_column(arg_expr(args, 1)?);
2513                Ok(get(&col_c, &key).into_expr())
2514            }
2515        }
2516        "struct" => {
2517            // struct(col1, col2, ...) (issue #527)
2518            require_args_min(name, args, 1)?;
2519            let cols: Vec<crate::Column> = (0..args.len())
2520                .map(|i| arg_expr(args, i).map(expr_to_column))
2521                .collect::<Result<Vec<_>, _>>()?;
2522            let refs: Vec<&crate::Column> = cols.iter().collect();
2523            Ok(crate::functions::struct_(&refs).into_expr())
2524        }
2525        "named_struct" => {
2526            // named_struct("name1", col1, "name2", col2, ...) (issue #527)
2527            require_args_min(name, args, 2)?;
2528            if !args.len().is_multiple_of(2) {
2529                return Err(PlanExprError(
2530                    "named_struct requires even number of args (name, value pairs)".into(),
2531                ));
2532            }
2533            let mut names: Vec<String> = Vec::new();
2534            let mut cols: Vec<crate::Column> = Vec::new();
2535            for i in (0..args.len()).step_by(2) {
2536                names.push(lit_as_string(&args[i])?);
2537                cols.push(expr_to_column(arg_expr(args, i + 1)?));
2538            }
2539            let refs: Vec<(&str, &crate::Column)> =
2540                names.iter().map(|s| s.as_str()).zip(cols.iter()).collect();
2541            Ok(crate::functions::named_struct(&refs).into_expr())
2542        }
2543        "with_field" | "withField" => {
2544            // withField(struct_col, field_name, value) - PySpark struct field add/replace (issue #541)
2545            require_args(name, args, 3)?;
2546            let struct_col = expr_to_column(arg_expr(args, 0)?);
2547            let field_name = lit_as_string(&args[1])?;
2548            let value_col = expr_to_column(arg_expr(args, 2)?);
2549            let out = struct_col
2550                .try_with_field(&field_name, &value_col)
2551                .map_err(|e| PlanExprError(format!("with_field: {e}")))?;
2552            Ok(out.into_expr())
2553        }
2554        "nvl2" => {
2555            require_args(name, args, 3)?;
2556            let col1 = expr_to_column(arg_expr(args, 0)?);
2557            let col2 = expr_to_column(arg_expr(args, 1)?);
2558            let col3 = expr_to_column(arg_expr(args, 2)?);
2559            Ok(nvl2(&col1, &col2, &col3).into_expr())
2560        }
2561        _ => Err(PlanExprError(format!("unsupported function: {name}"))),
2562    }
2563}
2564
2565fn require_args(name: &str, args: &[Value], n: usize) -> Result<(), PlanExprError> {
2566    if args.len() != n {
2567        return Err(PlanExprError(format!(
2568            "fn '{name}' requires exactly {n} argument(s)"
2569        )));
2570    }
2571    Ok(())
2572}
2573
2574fn require_args_min(name: &str, args: &[Value], n: usize) -> Result<(), PlanExprError> {
2575    if args.len() < n {
2576        return Err(PlanExprError(format!(
2577            "fn '{name}' requires at least {n} argument(s)"
2578        )));
2579    }
2580    Ok(())
2581}
2582
2583#[cfg(test)]
2584mod tests {
2585    use super::*;
2586    use serde_json::json;
2587
2588    #[test]
2589    fn test_bare_string_column_ref() {
2590        // Fixes #644: bare string as column reference
2591        let v = json!("age");
2592        let e = expr_from_value(&v).unwrap();
2593        assert!(matches!(e, polars::prelude::Expr::Column(_)));
2594    }
2595
2596    #[test]
2597    fn test_col() {
2598        let v = json!({"col": "age"});
2599        let _e = expr_from_value(&v).unwrap();
2600    }
2601
2602    #[test]
2603    fn test_lit_i64() {
2604        let v = json!({"lit": 30});
2605        let _ = expr_from_value(&v).unwrap();
2606    }
2607
2608    #[test]
2609    fn test_gt() {
2610        let v = json!({
2611            "op": "gt",
2612            "left": {"col": "age"},
2613            "right": {"lit": 30}
2614        });
2615        let _ = expr_from_value(&v).unwrap();
2616    }
2617
2618    #[test]
2619    fn test_and() {
2620        let v = json!({
2621            "op": "and",
2622            "left": {"op": "gt", "left": {"col": "a"}, "right": {"lit": 1}},
2623            "right": {"op": "lt", "left": {"col": "b"}, "right": {"lit": 10}}
2624        });
2625        let _ = expr_from_value(&v).unwrap();
2626    }
2627
2628    #[test]
2629    fn test_upper() {
2630        let v = json!({"fn": "upper", "args": [{"col": "name"}]});
2631        let _ = expr_from_value(&v).unwrap();
2632    }
2633
2634    #[test]
2635    fn test_length() {
2636        let v = json!({"fn": "length", "args": [{"col": "name"}]});
2637        let _ = expr_from_value(&v).unwrap();
2638    }
2639
2640    #[test]
2641    fn test_substring() {
2642        let v = json!({
2643            "fn": "substring",
2644            "args": [{"col": "s"}, {"lit": 1}, {"lit": 3}]
2645        });
2646        let _ = expr_from_value(&v).unwrap();
2647    }
2648
2649    #[test]
2650    fn test_year() {
2651        let v = json!({"fn": "year", "args": [{"col": "ts"}]});
2652        let _ = expr_from_value(&v).unwrap();
2653    }
2654
2655    #[test]
2656    fn test_cast() {
2657        let v = json!({
2658            "fn": "cast",
2659            "args": [{"col": "x"}, {"lit": "string"}]
2660        });
2661        let _ = expr_from_value(&v).unwrap();
2662    }
2663
2664    #[test]
2665    fn test_isin_op() {
2666        let v = json!({
2667            "op": "isin",
2668            "left": {"col": "id"},
2669            "right": {"lit": [1, 3]}
2670        });
2671        let _ = expr_from_value(&v).unwrap();
2672    }
2673
2674    #[test]
2675    fn test_isin_fn() {
2676        let v = json!({
2677            "fn": "isin",
2678            "args": [{"col": "id"}, {"lit": 1}, {"lit": 3}]
2679        });
2680        let _ = expr_from_value(&v).unwrap();
2681    }
2682
2683    /// col.isin([]) returns false for all rows (issue #518).
2684    #[test]
2685    fn test_isin_op_empty() {
2686        let v = json!({
2687            "op": "isin",
2688            "left": {"col": "id"},
2689            "right": {"lit": []}
2690        });
2691        let expr = expr_from_value(&v).unwrap();
2692        // Should be lit(false), not is_in with empty series
2693        assert!(matches!(expr, Expr::Literal(_)));
2694    }
2695
2696    /// col.isin() with no values or col.isin(null) -> false (issue #518).
2697    #[test]
2698    fn test_isin_fn_empty() {
2699        let v = json!({
2700            "fn": "isin",
2701            "args": [{"col": "id"}]
2702        });
2703        let expr = expr_from_value(&v).unwrap();
2704        assert!(matches!(expr, Expr::Literal(_)));
2705        let v2 = json!({
2706            "fn": "isin",
2707            "args": [{"col": "id"}, {"lit": null}]
2708        });
2709        let expr2 = expr_from_value(&v2).unwrap();
2710        assert!(matches!(expr2, Expr::Literal(_)));
2711    }
2712
2713    #[test]
2714    fn test_struct_named_struct_fn() {
2715        let v = json!({"fn": "struct", "args": [{"col": "a"}, {"col": "b"}]});
2716        let _ = expr_from_value(&v).unwrap();
2717        let v2 = json!({
2718            "fn": "named_struct",
2719            "args": [{"lit": "x"}, {"col": "a"}, {"lit": "y"}, {"col": "b"}]
2720        });
2721        let _ = expr_from_value(&v2).unwrap();
2722    }
2723
2724    #[test]
2725    fn test_get_item_fn() {
2726        let v = json!({"fn": "get_item", "args": [{"col": "arr"}, {"lit": 0}]});
2727        let _ = expr_from_value(&v).unwrap();
2728        let v2 = json!({"fn": "get_item", "args": [{"col": "m"}, {"lit": "key"}]});
2729        let _ = expr_from_value(&v2).unwrap();
2730    }
2731
2732    #[test]
2733    fn test_get_item_op() {
2734        let v = json!({"op": "getItem", "left": {"col": "arr"}, "right": {"lit": 1}});
2735        let _ = expr_from_value(&v).unwrap();
2736    }
2737
2738    #[test]
2739    fn test_startswith_op() {
2740        let v = json!({
2741            "op": "startswith",
2742            "left": {"col": "name"},
2743            "right": {"lit": "A"}
2744        });
2745        let _ = expr_from_value(&v).unwrap();
2746    }
2747
2748    #[test]
2749    fn test_is_null_op() {
2750        let v = json!({"op": "is_null", "arg": {"col": "x"}});
2751        let _ = expr_from_value(&v).unwrap();
2752    }
2753
2754    #[test]
2755    fn test_is_not_null_op() {
2756        let v = json!({"op": "is_not_null", "arg": {"col": "x"}});
2757        let _ = expr_from_value(&v).unwrap();
2758    }
2759
2760    #[test]
2761    fn test_regexp_extract_op() {
2762        let v = json!({
2763            "op": "regexp_extract",
2764            "left": {"col": "s"},
2765            "pattern": {"lit": r"(\w+)"},
2766            "group": {"lit": 1}
2767        });
2768        let _ = expr_from_value(&v).unwrap();
2769    }
2770
2771    /// Issue #582: fn form with bare string and number (Sparkless may send literals without {"lit": ...}).
2772    #[test]
2773    fn test_regexp_extract_fn_bare_literals() {
2774        let v = json!({
2775            "fn": "regexp_extract",
2776            "args": [{"col": "s"}, r"(\w+)", 1]
2777        });
2778        let _ = expr_from_value(&v).unwrap();
2779    }
2780
2781    #[test]
2782    fn test_regexp_replace_op() {
2783        let v = json!({
2784            "op": "regexp_replace",
2785            "left": {"col": "str"},
2786            "pattern": {"lit": r"\d"},
2787            "replacement": {"lit": "X"}
2788        });
2789        let _ = expr_from_value(&v).unwrap();
2790        let v2 = json!({
2791            "op": "regexp_replace",
2792            "args": [{"col": "str"}, {"lit": r"\d"}, {"lit": "X"}]
2793        });
2794        let _ = expr_from_value(&v2).unwrap();
2795    }
2796
2797    #[test]
2798    fn test_create_map_op() {
2799        let v = json!({
2800            "op": "create_map",
2801            "args": [{"lit": "k"}, {"col": "a"}]
2802        });
2803        let _ = expr_from_value(&v).unwrap();
2804    }
2805
2806    #[test]
2807    fn test_create_map_fn_empty() {
2808        // PySpark F.create_map() with no args: empty map {} per row (#512).
2809        let v = json!({"fn": "create_map", "args": []});
2810        let _ = expr_from_value(&v).unwrap();
2811    }
2812
2813    #[test]
2814    fn test_type_window_row_number_order_by() {
2815        let v = json!({
2816            "type": "window",
2817            "fn": "row_number",
2818            "window": {"order_by": ["val"]}
2819        });
2820        let _ = expr_from_value(&v).unwrap();
2821    }
2822
2823    /// Sparkless format: fn + args + window; order_by as [{"col": "salary", "asc": true}] (issue #517).
2824    #[test]
2825    fn test_window_row_number_sparkless_format() {
2826        let v = json!({
2827            "fn": "row_number",
2828            "args": [],
2829            "window": {
2830                "partition_by": ["dept"],
2831                "order_by": [{"col": "salary", "asc": true}]
2832            }
2833        });
2834        let _ = expr_from_value(&v).unwrap();
2835    }
2836
2837    /// row_number() with empty partition_by and order_by (single partition; issue #520).
2838    #[test]
2839    fn test_row_number_window_empty() {
2840        let v = json!({
2841            "type": "window",
2842            "fn": "row_number",
2843            "window": {}
2844        });
2845        let _ = expr_from_value(&v).unwrap();
2846        let v2 = json!({
2847            "fn": "row_number",
2848            "args": [],
2849            "window": {"partition_by": [], "order_by": []}
2850        });
2851        let _ = expr_from_value(&v2).unwrap();
2852    }
2853
2854    /// Sparkless may send "function" instead of "fn" (issue #517).
2855    #[test]
2856    fn test_type_window_function_key() {
2857        let v = json!({
2858            "type": "window",
2859            "function": "row_number",
2860            "window": {"partition_by": ["dept"]}
2861        });
2862        let _ = expr_from_value(&v).unwrap();
2863    }
2864
2865    /// rank, dense_rank in plan execution (issue #521).
2866    #[test]
2867    fn test_window_rank_dense_rank() {
2868        let v = json!({
2869            "fn": "rank",
2870            "args": [],
2871            "window": {"partition_by": ["dept"], "order_by": ["salary"]}
2872        });
2873        let _ = expr_from_value(&v).unwrap();
2874        let v2 = json!({
2875            "type": "window",
2876            "fn": "dense_rank",
2877            "window": {"order_by": ["val"]}
2878        });
2879        let _ = expr_from_value(&v2).unwrap();
2880    }
2881
2882    #[test]
2883    fn test_window_plus_literal_op() {
2884        // (row_number().over(w) + 10) - add op with window as left
2885        let v = json!({
2886            "op": "add",
2887            "left": {"type": "window", "fn": "row_number", "window": {"order_by": ["val"]}},
2888            "right": {"lit": 10}
2889        });
2890        let _ = expr_from_value(&v).unwrap();
2891    }
2892
2893    #[test]
2894    fn test_when_two_arg() {
2895        let v = json!({
2896            "fn": "when",
2897            "args": [
2898                {"op": "gt", "left": {"col": "a"}, "right": {"lit": 0}},
2899                {"lit": "positive"}
2900            ]
2901        });
2902        let _ = expr_from_value(&v).unwrap();
2903    }
2904
2905    #[test]
2906    fn test_concat() {
2907        let v = json!({
2908            "fn": "concat",
2909            "args": [{"col": "first"}, {"lit": " "}, {"col": "last"}]
2910        });
2911        let _ = expr_from_value(&v).unwrap();
2912    }
2913
2914    /// Issue #583: op form of concat (F.concat(a, b)).
2915    #[test]
2916    fn test_concat_op() {
2917        let v = json!({
2918            "op": "concat",
2919            "args": [{"col": "a"}, {"col": "b"}]
2920        });
2921        let _ = expr_from_value(&v).unwrap();
2922    }
2923
2924    /// Issue #583: op form of contains (F.col("name").contains("lic")).
2925    #[test]
2926    fn test_contains_op() {
2927        let v = json!({
2928            "op": "contains",
2929            "args": [{"col": "name"}, {"lit": "lic"}]
2930        });
2931        let _ = expr_from_value(&v).unwrap();
2932    }
2933
2934    #[test]
2935    fn test_greatest() {
2936        let v = json!({
2937            "fn": "greatest",
2938            "args": [{"col": "a"}, {"col": "b"}, {"lit": 0}]
2939        });
2940        let _ = expr_from_value(&v).unwrap();
2941    }
2942
2943    #[test]
2944    fn test_array_size() {
2945        let v = json!({"fn": "array_size", "args": [{"col": "arr"}]});
2946        let _ = expr_from_value(&v).unwrap();
2947    }
2948
2949    #[test]
2950    fn test_element_at() {
2951        let v = json!({"fn": "element_at", "args": [{"col": "arr"}, {"lit": 1}]});
2952        let _ = expr_from_value(&v).unwrap();
2953    }
2954
2955    #[test]
2956    fn test_coalesce() {
2957        let v = json!({
2958            "fn": "coalesce",
2959            "args": [{"col": "a"}, {"col": "b"}, {"lit": 0}]
2960        });
2961        let _ = expr_from_value(&v).unwrap();
2962    }
2963
2964    /// #628: between with string column and numeric bounds parses and uses coercion.
2965    #[test]
2966    fn test_between_string_column_numeric_bounds() {
2967        let v = json!({
2968            "op": "between",
2969            "left": {"col": "val"},
2970            "lower": {"lit": 1},
2971            "upper": {"lit": 10}
2972        });
2973        let expr = expr_from_value(&v).unwrap();
2974        // Expression should be (val >= 1) and (val <= 10) with coercion applied
2975        let _ = expr;
2976    }
2977}