1use polars::prelude::{DataType, Expr, Series, col, lit};
5use serde_json::Value;
6use std::error::Error;
7use std::fmt;
8
9#[derive(Debug)]
11pub struct PlanExprError(String);
12
13impl fmt::Display for PlanExprError {
14 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
15 write!(f, "{}", self.0)
16 }
17}
18
19impl Error for PlanExprError {}
20
21pub fn expr_from_value(v: &Value) -> Result<Expr, PlanExprError> {
26 if let Some(name) = v.as_str() {
28 return Ok(col(name));
29 }
30
31 let obj = v.as_object().ok_or_else(|| {
32 PlanExprError("expression must be a JSON object or column name string".to_string())
33 })?;
34
35 if let Some(name) = obj.get("col").and_then(Value::as_str) {
37 return Ok(col(name));
38 }
39
40 if let Some(lit_val) = obj.get("lit") {
42 return lit_from_value(lit_val);
43 }
44
45 if let Some(op) = obj.get("op").and_then(Value::as_str) {
47 match op {
48 "eq" | "ne" | "gt" | "ge" | "lt" | "le" => {
49 let left_v = obj
50 .get("left")
51 .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'left'")))?;
52 let right_v = obj
53 .get("right")
54 .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'right'")))?;
55 let l = expr_from_value(left_v)?;
56 let r = expr_from_value(right_v)?;
57
58 let infer_lit_type = |e: &Expr| -> Option<DataType> {
61 if let Expr::Literal(lv) = e {
62 let dt = lv.get_datatype();
63 if matches!(dt, DataType::Unknown(_)) {
64 None
65 } else {
66 Some(dt)
67 }
68 } else {
69 None
70 }
71 };
72
73 let l_ty = infer_lit_type(&l);
74 let r_ty = infer_lit_type(&r);
75
76 use crate::type_coercion::{CompareOp, coerce_for_pyspark_comparison};
79 let op_enum = match op {
80 "eq" => CompareOp::Eq,
81 "ne" => CompareOp::NotEq,
82 "gt" => CompareOp::Gt,
83 "ge" => CompareOp::GtEq,
84 "lt" => CompareOp::Lt,
85 "le" => CompareOp::LtEq,
86 _ => unreachable!(),
87 };
88 let (lt, rt) = match (&l_ty, &r_ty) {
89 (Some(lt), Some(rt)) => (lt.clone(), rt.clone()),
90 (Some(lt), None) => (lt.clone(), DataType::String),
91 (None, Some(rt)) => (DataType::String, rt.clone()),
92 (None, None) => {
93 return Ok(match op {
95 "eq" => l.eq(r),
96 "ne" => l.neq(r),
97 "gt" => l.gt(r),
98 "ge" => l.gt_eq(r),
99 "lt" => l.lt(r),
100 "le" => l.lt_eq(r),
101 _ => unreachable!(),
102 });
103 }
104 };
105 let expr =
106 match coerce_for_pyspark_comparison(l.clone(), r.clone(), <, &rt, &op_enum) {
107 Ok((lc, rc)) => match op {
108 "eq" => lc.eq(rc),
109 "ne" => lc.neq(rc),
110 "gt" => lc.gt(rc),
111 "ge" => lc.gt_eq(rc),
112 "lt" => lc.lt(rc),
113 "le" => lc.lt_eq(rc),
114 _ => unreachable!(),
115 },
116 Err(_) => match op {
117 "eq" => l.eq(r),
118 "ne" => l.neq(r),
119 "gt" => l.gt(r),
120 "ge" => l.gt_eq(r),
121 "lt" => l.lt(r),
122 "le" => l.lt_eq(r),
123 _ => unreachable!(),
124 },
125 };
126
127 return Ok(expr);
128 }
129 "eq_null_safe" => {
130 let left = obj.get("left").ok_or_else(|| {
131 PlanExprError("op 'eq_null_safe' requires 'left'".to_string())
132 })?;
133 let right = obj.get("right").ok_or_else(|| {
134 PlanExprError("op 'eq_null_safe' requires 'right'".to_string())
135 })?;
136 let l = expr_from_value(left)?;
137 let r = expr_from_value(right)?;
138 return eq_null_safe_expr(l, r);
139 }
140 "and" => {
141 let left = obj
142 .get("left")
143 .ok_or_else(|| PlanExprError("op 'and' requires 'left'".to_string()))?;
144 let right = obj
145 .get("right")
146 .ok_or_else(|| PlanExprError("op 'and' requires 'right'".to_string()))?;
147 return Ok(expr_from_value(left)?.and(expr_from_value(right)?));
148 }
149 "or" => {
150 let left = obj
151 .get("left")
152 .ok_or_else(|| PlanExprError("op 'or' requires 'left'".to_string()))?;
153 let right = obj
154 .get("right")
155 .ok_or_else(|| PlanExprError("op 'or' requires 'right'".to_string()))?;
156 return Ok(expr_from_value(left)?.or(expr_from_value(right)?));
157 }
158 "not" => {
159 let arg = obj
160 .get("arg")
161 .ok_or_else(|| PlanExprError("op 'not' requires 'arg'".to_string()))?;
162 return Ok(expr_from_value(arg)?.not());
163 }
164 "between" => {
165 let left_v = obj
166 .get("left")
167 .ok_or_else(|| PlanExprError("op 'between' requires 'left'".to_string()))?;
168 let lower_v = obj
169 .get("lower")
170 .ok_or_else(|| PlanExprError("op 'between' requires 'lower'".to_string()))?;
171 let upper_v = obj
172 .get("upper")
173 .ok_or_else(|| PlanExprError("op 'between' requires 'upper'".to_string()))?;
174 let left = expr_from_value(left_v)?;
175 let lower = expr_from_value(lower_v)?;
176 let upper = expr_from_value(upper_v)?;
177 let infer_lit_type = |e: &Expr| -> Option<DataType> {
179 if let Expr::Literal(lv) = e {
180 let dt = lv.get_datatype();
181 if matches!(dt, DataType::Unknown(_)) {
182 None
183 } else {
184 Some(dt)
185 }
186 } else {
187 None
188 }
189 };
190 let lower_ty = infer_lit_type(&lower);
191 let upper_ty = infer_lit_type(&upper);
192 use crate::type_coercion::{CompareOp, coerce_for_pyspark_comparison};
193 let lt = DataType::String;
195 let rt_lower = lower_ty.unwrap_or(DataType::String);
196 let rt_upper = upper_ty.unwrap_or(DataType::String);
197 let (left_c, lower_c) = match coerce_for_pyspark_comparison(
198 left.clone(),
199 lower.clone(),
200 <,
201 &rt_lower,
202 &CompareOp::GtEq,
203 ) {
204 Ok((a, b)) => (a, b),
205 Err(_) => (left.clone(), lower),
206 };
207 let upper_clone = upper.clone();
208 let (left_cc, upper_c) = match coerce_for_pyspark_comparison(
209 left_c.clone(),
210 upper,
211 <,
212 &rt_upper,
213 &CompareOp::LtEq,
214 ) {
215 Ok((a, b)) => (a, b),
216 Err(_) => (left_c.clone(), upper_clone),
217 };
218 return Ok(left_cc.clone().gt_eq(lower_c).and(left_cc.lt_eq(upper_c)));
219 }
220 "**" | "pow" => {
221 let left_v = obj
222 .get("left")
223 .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'left'")))?;
224 let right_v = obj
225 .get("right")
226 .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'right'")))?;
227 let l = expr_from_value(left_v)?;
228 let r = expr_from_value(right_v)?;
229 let left_col = expr_to_column(l);
230 let right_col = expr_to_column(r);
231 return Ok(left_col.pow_with(&right_col).into_expr());
232 }
233 "isin" => {
234 let left_v = obj
238 .get("left")
239 .ok_or_else(|| PlanExprError("op 'isin' requires 'left'".to_string()))?;
240 let left_expr = expr_from_value(left_v)?;
241 let values_opt =
242 if let Some(values_arr) = obj.get("values").and_then(Value::as_array) {
243 try_values_for_isin(values_arr)?
244 } else if let Some(right_v) = obj.get("right") {
245 try_values_from_plan_value(right_v)?
246 } else {
247 return Err(PlanExprError(
248 "op 'isin' requires 'right' or 'values'".to_string(),
249 ));
250 };
251 return Ok(match values_opt {
252 None => lit(false),
253 Some(values_expr) => left_expr.is_in(values_expr, false),
254 });
255 }
256 "getItem" => {
257 let left_v = obj
259 .get("left")
260 .ok_or_else(|| PlanExprError("op 'getItem' requires 'left'".to_string()))?;
261 let right_v = obj
262 .get("right")
263 .ok_or_else(|| PlanExprError("op 'getItem' requires 'right'".to_string()))?;
264 let col_expr = expr_from_value(left_v)?;
265 let idx = lit_as_i64(right_v)?;
266 let col_c = expr_to_column(col_expr);
267 return Ok(col_c.get_item(idx).into_expr());
268 }
269 "startswith" => {
270 let left_v = obj
272 .get("left")
273 .ok_or_else(|| PlanExprError("op 'startswith' requires 'left'".to_string()))?;
274 let right_v = obj
275 .get("right")
276 .ok_or_else(|| PlanExprError("op 'startswith' requires 'right'".to_string()))?;
277 let col_expr = expr_from_value(left_v)?;
278 let prefix = lit_as_string(right_v)?;
279 let col_c = expr_to_column(col_expr);
280 return Ok(crate::functions::startswith(&col_c, &prefix).into_expr());
281 }
282 "is_null" => {
283 let arg = obj
284 .get("arg")
285 .ok_or_else(|| PlanExprError("op 'is_null' requires 'arg'".to_string()))?;
286 return Ok(expr_from_value(arg)?.is_null());
287 }
288 "is_not_null" => {
289 let arg = obj
290 .get("arg")
291 .ok_or_else(|| PlanExprError("op 'is_not_null' requires 'arg'".to_string()))?;
292 return Ok(expr_from_value(arg)?.is_not_null());
293 }
294 "regexp_extract" => {
295 if let Some(args) = obj.get("args").and_then(Value::as_array) {
298 require_args_min("regexp_extract", args, 3)?;
299 let col_expr = expr_from_value(&args[0])?;
300 let pattern = lit_as_string(&args[1])?;
301 let group_idx = lit_as_usize(&args[2])?;
302 let col_c = expr_to_column(col_expr);
303 return Ok(
304 crate::functions::regexp_extract(&col_c, &pattern, group_idx).into_expr(),
305 );
306 }
307 let left_v = obj.get("left").ok_or_else(|| {
308 PlanExprError("op 'regexp_extract' requires 'left'".to_string())
309 })?;
310 let col_expr = expr_from_value(left_v)?;
311 let pattern_v =
312 obj.get("pattern")
313 .or_else(|| obj.get("right"))
314 .ok_or_else(|| {
315 PlanExprError(
316 "op 'regexp_extract' requires 'pattern' or 'right'".to_string(),
317 )
318 })?;
319 let pattern = lit_as_string(pattern_v)?;
320 let group_idx = obj
321 .get("group")
322 .and_then(|v| lit_as_usize(v).ok())
323 .unwrap_or(0);
324 let col_c = expr_to_column(col_expr);
325 return Ok(
326 crate::functions::regexp_extract(&col_c, &pattern, group_idx).into_expr(),
327 );
328 }
329 "regexp_replace" => {
330 if let Some(args) = obj.get("args").and_then(Value::as_array) {
333 require_args_min("regexp_replace", args, 3)?;
334 let col_expr = expr_from_value(&args[0])?;
335 let pattern = lit_as_string(&args[1])?;
336 let replacement = lit_as_string(&args[2])?;
337 let col_c = expr_to_column(col_expr);
338 return Ok(
339 crate::functions::regexp_replace(&col_c, &pattern, &replacement)
340 .into_expr(),
341 );
342 }
343 let left_v = obj.get("left").ok_or_else(|| {
344 PlanExprError("op 'regexp_replace' requires 'left'".to_string())
345 })?;
346 let col_expr = expr_from_value(left_v)?;
347 let pattern =
348 lit_as_string(obj.get("pattern").or_else(|| obj.get("right")).ok_or_else(
349 || {
350 PlanExprError(
351 "op 'regexp_replace' requires 'pattern' or 'right'".to_string(),
352 )
353 },
354 )?)?;
355 let replacement = lit_as_string(obj.get("replacement").ok_or_else(|| {
356 PlanExprError("op 'regexp_replace' requires 'replacement'".to_string())
357 })?)?;
358 let col_c = expr_to_column(col_expr);
359 return Ok(
360 crate::functions::regexp_replace(&col_c, &pattern, &replacement).into_expr(),
361 );
362 }
363 "create_map" | "createMap" => {
364 let args_arr = obj.get("args").and_then(Value::as_array).ok_or_else(|| {
366 PlanExprError("op 'create_map'/'createMap' requires 'args' array".to_string())
367 })?;
368 let exprs: Result<Vec<Expr>, _> = args_arr.iter().map(expr_from_value).collect();
369 let cols: Vec<crate::Column> = exprs?.into_iter().map(expr_to_column).collect();
370 let refs: Vec<&crate::Column> = cols.iter().collect();
371 return Ok(crate::functions::create_map(&refs)
372 .map_err(|e| PlanExprError(e.to_string()))?
373 .into_expr());
374 }
375 "add" | "+" => {
376 let left_v = obj
378 .get("left")
379 .ok_or_else(|| PlanExprError("op 'add' requires 'left'".to_string()))?;
380 let right_v = obj
381 .get("right")
382 .ok_or_else(|| PlanExprError("op 'add' requires 'right'".to_string()))?;
383 let l = expr_from_value(left_v)?;
384 let r = expr_from_value(right_v)?;
385 let a = expr_to_column(l);
386 let b = expr_to_column(r);
387 return Ok(a.add_pyspark(&b).into_expr());
388 }
389 "sub" | "minus" | "-" => {
390 let left_v = obj
392 .get("left")
393 .ok_or_else(|| PlanExprError("op 'sub' requires 'left'".to_string()))?;
394 let right_v = obj
395 .get("right")
396 .ok_or_else(|| PlanExprError("op 'sub' requires 'right'".to_string()))?;
397 let l = expr_from_value(left_v)?;
398 let r = expr_from_value(right_v)?;
399 let a = expr_to_column(l);
400 let b = expr_to_column(r);
401 return Ok(a.subtract(&b).into_expr());
402 }
403 "mul" | "*" => {
404 let left_v = obj
406 .get("left")
407 .ok_or_else(|| PlanExprError("op 'mul' requires 'left'".to_string()))?;
408 let right_v = obj
409 .get("right")
410 .ok_or_else(|| PlanExprError("op 'mul' requires 'right'".to_string()))?;
411 let l = expr_from_value(left_v)?;
412 let r = expr_from_value(right_v)?;
413 let a = expr_to_column(l);
414 let b = expr_to_column(r);
415 return Ok(a.multiply(&b).into_expr());
416 }
417 "udf" => {
418 let udf_name = obj
420 .get("udf")
421 .or_else(|| obj.get("name"))
422 .and_then(Value::as_str)
423 .ok_or_else(|| {
424 PlanExprError("op 'udf' requires 'udf' or 'name'".to_string())
425 })?;
426 let args = obj
427 .get("args")
428 .and_then(Value::as_array)
429 .ok_or_else(|| PlanExprError("op 'udf' requires 'args' array".to_string()))?;
430 let col = column_from_udf_call(udf_name, args)?;
431 if col.udf_call.is_some() {
432 return Err(PlanExprError(
433 "Python/Vectorized UDFs are only supported in withColumn/select, not in filter/plan expressions"
434 .into(),
435 ));
436 }
437 return Ok(col.expr().clone());
438 }
439 "translate" | "substring_index" | "substringIndex" | "levenshtein" | "soundex"
441 | "crc32" | "xxhash64" | "get_json_object" | "getJsonObject" | "json_tuple"
442 | "jsonTuple" | "regexp_extract_all" | "regexpExtractAll" | "date_trunc"
443 | "dateTrunc" | "to_date" | "toDate" | "format_string" | "formatString" | "log"
444 | "explode" | "explode_outer" | "explodeOuter" | "concat" | "contains" => {
445 let args = obj
446 .get("args")
447 .and_then(Value::as_array)
448 .ok_or_else(|| PlanExprError(format!("op '{op}' requires 'args' array")))?;
449 let fn_name = match op {
450 "substringIndex" => "substring_index",
451 "getJsonObject" => "get_json_object",
452 "jsonTuple" => "json_tuple",
453 "regexpExtractAll" => "regexp_extract_all",
454 "dateTrunc" => "date_trunc",
455 "toDate" => "to_date",
456 "formatString" => "format_string",
457 "explodeOuter" => "explode_outer",
458 other => other,
459 };
460 return expr_from_fn(fn_name, args);
461 }
462 _ => {
463 return Err(PlanExprError(format!("unsupported expression op: {op}")));
464 }
465 }
466 }
467
468 if let Some(udf_name) = obj.get("udf").and_then(Value::as_str) {
470 let args = obj
471 .get("args")
472 .and_then(Value::as_array)
473 .ok_or_else(|| PlanExprError("udf requires 'args' array".to_string()))?;
474 let col = column_from_udf_call(udf_name, args)?;
475 if col.udf_call.is_some() {
476 return Err(PlanExprError(
477 "Python/Vectorized UDFs are only supported in withColumn/select, not in filter/plan expressions"
478 .into(),
479 ));
480 }
481 return Ok(col.expr().clone());
482 }
483
484 let fn_name = obj
487 .get("fn")
488 .or_else(|| obj.get("function"))
489 .and_then(Value::as_str);
490 if let Some(fn_name) = fn_name {
491 if let Some(window_val) = obj.get("window") {
492 return expr_from_window_fn(
493 fn_name,
494 window_val,
495 obj.get("args").and_then(Value::as_array),
496 );
497 }
498 let args = obj
499 .get("args")
500 .and_then(Value::as_array)
501 .ok_or_else(|| PlanExprError(format!("fn '{fn_name}' requires 'args' array")))?;
502 return expr_from_fn(fn_name, args);
503 }
504
505 if let Some(typ) = obj.get("type").and_then(Value::as_str) {
507 if typ == "window" {
508 let fn_name = obj
509 .get("fn")
510 .or_else(|| obj.get("function"))
511 .and_then(Value::as_str)
512 .ok_or_else(|| {
513 PlanExprError("type window requires 'fn' or 'function'".to_string())
514 })?;
515 let window_val = obj
516 .get("window")
517 .ok_or_else(|| PlanExprError("type window requires 'window'".to_string()))?;
518 let args = obj.get("args").and_then(Value::as_array);
519 return expr_from_window_fn(fn_name, window_val, args);
520 }
521 }
522
523 Err(PlanExprError(
524 "expression must have 'col', 'lit', 'op', 'fn', or 'type'".to_string(),
525 ))
526}
527
528fn window_col_from_value(x: &Value) -> Option<String> {
530 if let Some(s) = x.as_str() {
531 return Some(s.to_string());
532 }
533 if let Some(obj) = x.as_object() {
534 if let Some(name) = obj.get("col").and_then(Value::as_str) {
535 return Some(name.to_string());
536 }
537 }
538 None
539}
540
541fn parse_window_spec(v: &Value) -> Result<(Vec<String>, Vec<String>), PlanExprError> {
545 let obj = v
546 .as_object()
547 .ok_or_else(|| PlanExprError("window spec must be object".to_string()))?;
548 let order_arr = obj.get("order_by").and_then(Value::as_array);
549 let part_arr = obj.get("partition_by").and_then(Value::as_array);
550 let order_cols: Vec<String> = order_arr
551 .map(|a| a.iter().filter_map(window_col_from_value).collect())
552 .unwrap_or_default();
553 let part_cols: Vec<String> = part_arr
554 .map(|a| a.iter().filter_map(window_col_from_value).collect())
555 .unwrap_or_default();
556 Ok((order_cols, part_cols))
557}
558
559fn window_order_cols(order_cols: &[String], part_cols: &[String]) -> Vec<String> {
561 if order_cols.is_empty() {
562 part_cols.to_vec()
563 } else {
564 order_cols.to_vec()
565 }
566}
567
568fn expr_from_row_number_window(v: &Value) -> Result<Expr, PlanExprError> {
570 let (order_cols, part_cols) = parse_window_spec(v)?;
571 let part_refs: Vec<&str> = part_cols.iter().map(|s| s.as_str()).collect();
572 let effective_order = window_order_cols(&order_cols, &part_cols);
573 let order_col = if effective_order.is_empty() {
574 crate::Column::from_expr(lit(1i32), None)
575 } else {
576 crate::Column::new(effective_order[0].clone())
577 };
578 let rn = order_col.row_number(false).over(&part_refs);
579 Ok(rn.into_expr())
580}
581
582fn expr_from_window_fn(
584 fn_name: &str,
585 window_val: &Value,
586 args: Option<&Vec<Value>>,
587) -> Result<Expr, PlanExprError> {
588 use crate::Column;
589 let (order_cols, part_cols) = parse_window_spec(window_val)?;
590 let part_refs: Vec<&str> = part_cols.iter().map(|s| s.as_str()).collect();
591 let effective_order = window_order_cols(&order_cols, &part_cols);
592 let empty: &[Value] = &[];
593 let args: &[Value] = args.map_or(empty, |v| v);
594 let order_col = if effective_order.is_empty() {
595 Column::from_expr(lit(1i32), None)
596 } else {
597 Column::new(effective_order[0].clone())
598 };
599
600 match fn_name {
601 "row_number" => expr_from_row_number_window(window_val),
602 "rank" => {
603 let c = order_col.rank(false).over(&part_refs);
604 Ok(c.into_expr())
605 }
606 "dense_rank" => {
607 let c = order_col.dense_rank(false).over(&part_refs);
608 Ok(c.into_expr())
609 }
610 "percent_rank" => {
611 let c = order_col.percent_rank(&part_refs, false);
612 Ok(c.into_expr())
613 }
614 "ntile" => {
615 let n = args
616 .first()
617 .and_then(|v| v.get("lit").and_then(Value::as_i64))
618 .or_else(|| args.first().and_then(Value::as_i64))
619 .ok_or_else(|| {
620 PlanExprError("ntile window requires n (number of buckets)".to_string())
621 })? as u32;
622 let c = order_col.ntile(n.max(1), &part_refs, false);
623 Ok(c.into_expr())
624 }
625 "lag" => {
626 let n = args
627 .get(1)
628 .and_then(|v| v.get("lit").and_then(Value::as_i64))
629 .or_else(|| args.get(1).and_then(Value::as_i64))
630 .unwrap_or(1);
631 let col_expr =
632 expr_to_column(expr_from_value(args.first().ok_or_else(|| {
633 PlanExprError("lag window requires column arg".to_string())
634 })?)?);
635 let c = col_expr.lag(n).over(&part_refs);
636 Ok(c.into_expr())
637 }
638 "lead" => {
639 let n = args
640 .get(1)
641 .and_then(|v| v.get("lit").and_then(Value::as_i64))
642 .or_else(|| args.get(1).and_then(Value::as_i64))
643 .unwrap_or(1);
644 let col_expr =
645 expr_to_column(expr_from_value(args.first().ok_or_else(|| {
646 PlanExprError("lead window requires column arg".to_string())
647 })?)?);
648 let c = col_expr.lead(n).over(&part_refs);
649 Ok(c.into_expr())
650 }
651 "sum" => {
652 let col_expr =
653 expr_to_column(expr_from_value(args.first().ok_or_else(|| {
654 PlanExprError("sum window requires column arg".to_string())
655 })?)?);
656 let sum_expr = col_expr.expr().clone().sum();
657 let partition_exprs: Vec<Expr> = part_refs.iter().map(|s| col(*s)).collect();
658 Ok(sum_expr.over(partition_exprs))
659 }
660 "avg" | "mean" => {
661 let col_expr =
662 expr_to_column(expr_from_value(args.first().ok_or_else(|| {
663 PlanExprError("avg window requires column arg".to_string())
664 })?)?);
665 let mean_expr = col_expr.expr().clone().mean();
666 let partition_exprs: Vec<Expr> = part_refs.iter().map(|s| col(*s)).collect();
667 Ok(mean_expr.over(partition_exprs))
668 }
669 "approx_count_distinct" => {
670 let col_expr = expr_to_column(expr_from_value(args.first().ok_or_else(|| {
671 PlanExprError("approx_count_distinct window requires column arg".to_string())
672 })?)?);
673 let n_unique_expr = col_expr.expr().clone().n_unique().cast(DataType::Int64);
674 let partition_exprs: Vec<Expr> = part_refs.iter().map(|s| col(*s)).collect();
675 Ok(n_unique_expr.over(partition_exprs))
676 }
677 _ => Err(PlanExprError(format!(
678 "unsupported window fn '{fn_name}' (supported: row_number, rank, dense_rank, percent_rank, ntile, lag, lead, sum, avg, approx_count_distinct)"
679 ))),
680 }
681}
682
683fn lit_from_value(v: &Value) -> Result<Expr, PlanExprError> {
684 use polars::prelude::{NULL, lit};
685 if v.is_null() {
686 return Ok(lit(NULL));
687 }
688 if let Some(n) = v.as_i64() {
689 return Ok(lit(n));
690 }
691 if let Some(n) = v.as_f64() {
692 return Ok(lit(n));
693 }
694 if let Some(b) = v.as_bool() {
695 return Ok(lit(b));
696 }
697 if let Some(s) = v.as_str() {
698 return Ok(lit(s));
699 }
700 Err(PlanExprError("unsupported literal type".to_string()))
701}
702
703fn lit_as_string(v: &Value) -> Result<String, PlanExprError> {
706 let lit_val = v
707 .get("lit")
708 .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
709 if lit_val.is_null() {
710 return Err(PlanExprError("literal string cannot be null".to_string()));
711 }
712 if let Some(s) = lit_val.as_str() {
713 return Ok(s.to_string());
714 }
715 if let Some(n) = lit_val.as_i64() {
716 return Ok(n.to_string());
717 }
718 if let Some(n) = lit_val.as_f64() {
719 return Ok(n.to_string());
720 }
721 if let Some(b) = lit_val.as_bool() {
722 return Ok(b.to_string());
723 }
724 Err(PlanExprError(
725 "literal must be string, number, or bool".to_string(),
726 ))
727}
728
729fn lit_as_i64(v: &Value) -> Result<i64, PlanExprError> {
730 let lit_val = v
731 .get("lit")
732 .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
733 lit_val
734 .as_i64()
735 .ok_or_else(|| PlanExprError("literal must be integer".to_string()))
736}
737
738fn lit_as_i32(v: &Value) -> Result<i32, PlanExprError> {
739 let n = lit_as_i64(v)?;
740 n.try_into()
741 .map_err(|_| PlanExprError("literal out of i32 range".to_string()))
742}
743
744fn lit_as_u32(v: &Value) -> Result<u32, PlanExprError> {
745 let lit_val = v
746 .get("lit")
747 .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
748 if let Some(n) = lit_val.as_u64() {
749 return n
750 .try_into()
751 .map_err(|_| PlanExprError("literal out of u32 range".to_string()));
752 }
753 if let Some(n) = lit_val.as_i64() {
754 return (n as u64)
755 .try_into()
756 .map_err(|_| PlanExprError("literal out of u32 range".to_string()));
757 }
758 Err(PlanExprError("literal must be number".to_string()))
759}
760
761fn lit_as_f64(v: &Value) -> Result<f64, PlanExprError> {
762 let lit_val = v
763 .get("lit")
764 .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
765 if let Some(n) = lit_val.as_f64() {
766 return Ok(n);
767 }
768 if let Some(n) = lit_val.as_i64() {
769 return Ok(n as f64);
770 }
771 Err(PlanExprError("literal must be number".to_string()))
772}
773
774#[allow(dead_code)]
775fn lit_as_bool(v: &Value) -> Result<bool, PlanExprError> {
776 let lit_val = v
777 .get("lit")
778 .ok_or_else(|| PlanExprError("expected literal".to_string()))?;
779 lit_val
780 .as_bool()
781 .ok_or_else(|| PlanExprError("literal must be boolean".to_string()))
782}
783
784fn lit_as_usize(v: &Value) -> Result<usize, PlanExprError> {
785 let n = lit_as_i64(v)?;
786 if n < 0 {
787 return Err(PlanExprError(
788 "literal must be non-negative for usize".to_string(),
789 ));
790 }
791 n.try_into()
792 .map_err(|_| PlanExprError("literal out of usize range".to_string()))
793}
794
795fn try_values_for_isin(arr: &[Value]) -> Result<Option<Expr>, PlanExprError> {
801 if arr.is_empty() {
802 return Ok(None);
803 }
804 let mut str_vals: Vec<String> = Vec::new();
805 let mut int_vals: Vec<i64> = Vec::new();
806 let mut float_vals: Vec<f64> = Vec::new();
807 let mut has_string = false;
808 let mut has_float = false;
809 for v in arr {
810 let lit_val = if let Some(obj) = v.as_object() {
811 obj.get("lit").unwrap_or(v)
812 } else {
813 v
814 };
815 if lit_val.is_null() {
816 continue;
817 }
818 if let Some(s) = lit_val.as_str() {
819 str_vals.push(s.to_string());
820 has_string = true;
821 } else if let Some(n) = lit_val.as_i64() {
822 int_vals.push(n);
823 str_vals.push(n.to_string());
824 } else if let Some(n) = lit_val.as_f64() {
825 float_vals.push(n);
826 str_vals.push(n.to_string());
827 has_float = true;
828 }
829 }
830 if str_vals.is_empty() {
831 return Ok(None);
832 }
833 let s: Series = if has_string {
834 Series::from_iter(str_vals.iter().map(|x| x.as_str()))
835 } else if !has_float && int_vals.len() == str_vals.len() {
836 Series::from_iter(int_vals)
837 } else if float_vals.len() == str_vals.len() {
838 Series::from_iter(float_vals)
839 } else {
840 Series::from_iter(str_vals.iter().map(|x| x.as_str()))
841 };
842 Ok(Some(lit(s)))
843}
844
845fn try_values_from_plan_value(v: &Value) -> Result<Option<Expr>, PlanExprError> {
847 if let Some(lit_val) = v.get("lit") {
848 if let Some(arr) = lit_val.as_array() {
849 return try_values_for_isin(arr);
850 }
851 #[allow(clippy::cloned_ref_to_slice_refs)]
853 return try_values_for_isin(&[v.clone()]);
854 }
855 if let Some(arr) = v.as_array() {
856 return try_values_for_isin(arr);
857 }
858 Err(PlanExprError(
859 "isin right/values must be array or {lit: [...]}".to_string(),
860 ))
861}
862
863fn arg_lit_opt_str(args: &[Value], i: usize) -> Result<Option<String>, PlanExprError> {
865 let v = match args.get(i) {
866 Some(x) => x,
867 None => return Ok(None),
868 };
869 if v.is_null() {
870 return Ok(None);
871 }
872 if let Some(obj) = v.as_object() {
873 if obj.get("lit").is_some() {
874 return Ok(Some(lit_as_string(v)?));
875 }
876 }
877 Ok(None)
878}
879
880fn arg_expr(args: &[Value], i: usize) -> Result<Expr, PlanExprError> {
881 let v = args
882 .get(i)
883 .ok_or_else(|| PlanExprError(format!("fn requires argument at index {i}")))?;
884 expr_from_value(v)
885}
886
887fn arg_lit_str(args: &[Value], i: usize) -> Result<String, PlanExprError> {
889 let v = args
890 .get(i)
891 .ok_or_else(|| PlanExprError(format!("fn requires string literal at index {i}")))?;
892 if let Some(s) = v.as_str() {
893 return Ok(s.to_string());
894 }
895 lit_as_string(v)
896}
897
898fn arg_lit_i64(args: &[Value], i: usize) -> Result<i64, PlanExprError> {
899 let v = args
900 .get(i)
901 .ok_or_else(|| PlanExprError(format!("fn requires integer literal at index {i}")))?;
902 lit_as_i64(v)
903}
904
905fn arg_lit_i32(args: &[Value], i: usize) -> Result<i32, PlanExprError> {
906 let v = args
907 .get(i)
908 .ok_or_else(|| PlanExprError(format!("fn requires integer literal at index {i}")))?;
909 lit_as_i32(v)
910}
911
912fn arg_lit_u32(args: &[Value], i: usize) -> Result<u32, PlanExprError> {
913 let v = args
914 .get(i)
915 .ok_or_else(|| PlanExprError(format!("fn requires non-negative integer at index {i}")))?;
916 lit_as_u32(v)
917}
918
919fn arg_lit_f64(args: &[Value], i: usize) -> Result<f64, PlanExprError> {
920 let v = args
921 .get(i)
922 .ok_or_else(|| PlanExprError(format!("fn requires number literal at index {i}")))?;
923 lit_as_f64(v)
924}
925
926fn arg_lit_usize(args: &[Value], i: usize) -> Result<usize, PlanExprError> {
928 let v = args
929 .get(i)
930 .ok_or_else(|| PlanExprError(format!("fn requires non-negative integer at index {i}")))?;
931 if let Some(n) = v.as_i64() {
932 if n < 0 {
933 return Err(PlanExprError(
934 "literal must be non-negative for usize".to_string(),
935 ));
936 }
937 return n
938 .try_into()
939 .map_err(|_| PlanExprError("literal out of usize range".to_string()));
940 }
941 if let Some(n) = v.as_u64() {
942 return n
943 .try_into()
944 .map_err(|_| PlanExprError("literal out of usize range".to_string()));
945 }
946 lit_as_usize(v)
947}
948
949fn opt_lit_i64(args: &[Value], i: usize) -> Option<i64> {
951 let v = args.get(i)?;
952 v.get("lit").and_then(Value::as_i64)
953}
954
955#[allow(dead_code)]
957fn opt_lit_u64(args: &[Value], i: usize) -> Option<u64> {
958 let v = args.get(i)?;
959 if let Some(n) = v.get("lit").and_then(Value::as_i64) {
960 if n >= 0 {
961 return Some(n as u64);
962 }
963 return Some((-n) as u64); }
965 v.get("lit").and_then(Value::as_u64)
966}
967
968fn expr_to_column(expr: Expr) -> crate::Column {
969 crate::Column::from_expr(expr, None)
970}
971
972fn eq_null_safe_expr(left: Expr, right: Expr) -> Result<Expr, PlanExprError> {
975 use polars::prelude::*;
976 let (left_c, right_c) = crate::type_coercion::coerce_for_pyspark_eq_null_safe(left, right)
977 .map_err(|e| PlanExprError(e.to_string()))?;
978 let left_null = left_c.clone().is_null();
979 let right_null = right_c.clone().is_null();
980 let both_null = left_null.clone().and(right_null.clone());
981 let both_non_null = left_null.not().and(right_null.not());
982 let eq_result = left_c.eq(right_c);
983 Ok(when(both_null)
984 .then(lit(true))
985 .when(both_non_null)
986 .then(eq_result)
987 .otherwise(lit(false)))
988}
989
990fn matching_paren(s: &str, start: usize) -> Option<usize> {
992 if s.as_bytes().get(start) != Some(&b'(') {
993 return None;
994 }
995 let mut depth = 1u32;
996 for (i, b) in s.bytes().enumerate().skip(start + 1) {
997 match b {
998 b'(' => depth += 1,
999 b')' => {
1000 depth -= 1;
1001 if depth == 0 {
1002 return Some(i);
1003 }
1004 }
1005 _ => {}
1006 }
1007 }
1008 None
1009}
1010
1011fn concat_part_to_expr(part: &str) -> Expr {
1013 let part = part.trim();
1014 if part.is_empty() {
1015 return lit("");
1016 }
1017 if (part.starts_with('"') && part.ends_with('"'))
1018 || (part.starts_with('\'') && part.ends_with('\''))
1019 {
1020 let inner = part[1..part.len() - 1].trim_matches(['\'', '"']);
1021 return lit(inner);
1022 }
1023 col(part)
1024}
1025
1026pub fn try_parse_concat_expr_from_string(s: &str) -> Option<Expr> {
1030 use polars::prelude::concat_str;
1031 let s = s.trim();
1032 if s.starts_with("concat(") {
1034 let close = matching_paren(s, 6)?; if close != s.len() - 1 {
1036 return None;
1037 }
1038 let inner = s[7..close].trim();
1039 let parts: Vec<&str> = inner.split(',').map(str::trim).collect();
1040 if parts.is_empty() {
1041 return None;
1042 }
1043 let exprs: Vec<Expr> = parts.iter().map(|p| concat_part_to_expr(p)).collect();
1044 return Some(concat_str(&exprs, "", false));
1045 }
1046 if s.starts_with("concat_ws(") {
1048 let close = matching_paren(s, 10)?; if close != s.len() - 1 {
1050 return None;
1051 }
1052 let inner = s[10..close].trim();
1053 let parts: Vec<&str> = inner.split(',').map(str::trim).collect();
1054 if parts.len() < 2 {
1055 return None;
1056 }
1057 let sep = parts[0].trim_matches(['\'', '"']);
1058 let exprs: Vec<Expr> = parts
1059 .iter()
1060 .skip(1)
1061 .map(|p| concat_part_to_expr(p))
1062 .collect();
1063 if exprs.is_empty() {
1064 return None;
1065 }
1066 return Some(concat_str(&exprs, sep, false));
1067 }
1068 None
1069}
1070
1071pub fn column_from_udf_call(
1074 udf_name: &str,
1075 args: &[Value],
1076) -> Result<crate::Column, PlanExprError> {
1077 use crate::Column;
1078 let cols: Vec<Column> = args
1079 .iter()
1080 .map(|v| expr_from_value(v).map(expr_to_column))
1081 .collect::<Result<Vec<_>, _>>()?;
1082 crate::functions::call_udf(udf_name, &cols).map_err(|e| PlanExprError(e.to_string()))
1083}
1084
1085pub fn try_column_from_udf_value(v: &Value) -> Option<Result<crate::Column, PlanExprError>> {
1088 let obj = v.as_object()?;
1089 let (udf_name, args) = if let Some(name) = obj.get("udf").and_then(Value::as_str) {
1090 let args = obj.get("args")?.as_array()?;
1091 (name.to_string(), args)
1092 } else if obj.get("fn").and_then(Value::as_str) == Some("call_udf") {
1093 let args = obj.get("args")?.as_array()?;
1094 if args.is_empty() {
1095 return Some(Err(PlanExprError(
1096 "call_udf requires at least name and one arg".into(),
1097 )));
1098 }
1099 let name = match lit_as_string(&args[0]) {
1100 Ok(n) => n,
1101 Err(e) => return Some(Err(e)),
1102 };
1103 let rest: &[Value] = &args[1..];
1104 return Some(column_from_udf_call(&name, rest));
1105 } else {
1106 return None;
1107 };
1108 Some(column_from_udf_call(&udf_name, args))
1109}
1110
1111fn expr_from_fn(name: &str, args: &[Value]) -> Result<Expr, PlanExprError> {
1112 use crate::Column;
1113 #[allow(unused_imports)]
1114 use crate::functions::{
1115 add_months, array_agg, array_append, array_compact, array_contains, array_distinct,
1116 array_except, array_insert, array_intersect, array_join, array_prepend, array_remove,
1117 array_slice, array_sort, array_sum, array_union, arrays_overlap, arrays_zip, ascii,
1118 assert_true, atan2, base64, bin, bit_and, bit_count, bit_get, bit_length, bit_or, bit_xor,
1119 bitwise_not, bround, btrim, cast, cbrt, ceiling, char as rs_char, chr, coalesce, concat,
1120 concat_ws, contains, conv, cos, cosh, cot, crc32, csc, curdate, current_catalog,
1121 current_database, current_date, current_schema, current_timestamp, current_timezone,
1122 current_user, date_add, date_diff, date_format, date_from_unix_date, date_part, date_sub,
1123 date_trunc, dateadd, datediff, datepart, day, dayname, dayofmonth, dayofweek, dayofyear,
1124 days, decode, degrees, e, element_at, elt, encode, endswith, equal_null, exp,
1125 explode_outer, extract, factorial, find_in_set, floor, format_number, format_string,
1126 from_unixtime, from_utc_timestamp, get, get_json_object, getbit, greatest, hash, hex, hour,
1127 hypot, ilike, initcap, input_file_name, instr, isnan, json_tuple, last_day, lcase, least,
1128 left, length, like, lit_str, ln, localtimestamp, locate, log, log1p, log2, log10, lower,
1129 lpad, make_date, make_interval, make_timestamp, make_timestamp_ntz, mask, md5, minute,
1130 monotonically_increasing_id, month, months_between, nanvl, negate, negative, next_day, now,
1131 nullif, nvl, nvl2, octet_length, overlay, parse_url, pi, pmod, positive, pow, power,
1132 quarter, radians, raise_error, rand, randn, regexp, regexp_count, regexp_extract,
1133 regexp_extract_all, regexp_instr, regexp_like, regexp_replace, regexp_substr, repeat,
1134 replace, reverse, right, rint, rlike, round, rpad, sec, second, sha1, sha2, shift_left,
1135 shift_right, signum, sin, sinh, size, soundex, spark_partition_id, split, split_part, sqrt,
1136 startswith, str_to_map, struct_, substr, substring, substring_index, tan, tanh,
1137 timestamp_micros, timestamp_millis, timestamp_seconds, timestampadd, timestampdiff,
1138 to_binary, to_char, to_date, to_degrees, to_radians, to_timestamp, to_unix_timestamp,
1139 to_utc_timestamp, to_varchar, translate, trim, trunc, try_add, try_cast, try_divide,
1140 try_element_at, try_multiply, try_subtract, try_to_binary, try_to_number, try_to_timestamp,
1141 typeof_, ucase, unbase64, unhex, unix_date, unix_micros, unix_millis, unix_seconds,
1142 unix_timestamp, unix_timestamp_now, upper, url_decode, url_encode, user, version, weekday,
1143 weekofyear, when_then_otherwise_null, width_bucket, xxhash64, year,
1144 };
1145
1146 match name {
1147 "call_udf" => {
1148 if args.is_empty() {
1149 return Err(PlanExprError(
1150 "call_udf requires at least name and one arg".into(),
1151 ));
1152 }
1153 let udf_name = lit_as_string(&args[0])?;
1154 let col = column_from_udf_call(&udf_name, &args[1..])?;
1155 if col.udf_call.is_some() {
1156 return Err(PlanExprError(
1157 "Python/Vectorized UDFs are only supported in withColumn/select, not in filter/plan expressions"
1158 .into(),
1159 ));
1160 }
1161 Ok(col.expr().clone())
1162 }
1163 "upper" => {
1164 require_args(name, args, 1)?;
1165 let c = expr_to_column(arg_expr(args, 0)?);
1166 Ok(upper(&c).into_expr())
1167 }
1168 "lower" => {
1169 require_args(name, args, 1)?;
1170 let c = expr_to_column(arg_expr(args, 0)?);
1171 Ok(lower(&c).into_expr())
1172 }
1173 "coalesce" => {
1174 if args.is_empty() {
1175 return Err(PlanExprError(format!(
1176 "fn '{name}' requires at least one argument"
1177 )));
1178 }
1179 let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1180 let exprs = exprs?;
1181 Ok(polars::prelude::coalesce(&exprs))
1182 }
1183 "when" => {
1184 if args.len() != 2 {
1185 return Err(PlanExprError(format!(
1186 "fn '{name}' two-arg form requires [condition, then_expr]"
1187 )));
1188 }
1189 let cond = expr_to_column(arg_expr(args, 0)?);
1190 let then_val = expr_to_column(arg_expr(args, 1)?);
1191 Ok(when_then_otherwise_null(&cond, &then_val).into_expr())
1192 }
1193 "length" | "char_length" | "character_length" => {
1195 require_args(name, args, 1)?;
1196 Ok(length(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1197 }
1198 "trim" => {
1199 require_args(name, args, 1)?;
1200 Ok(trim(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1201 }
1202 "ltrim" => {
1203 require_args(name, args, 1)?;
1204 Ok(crate::functions::ltrim(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1205 }
1206 "rtrim" => {
1207 require_args(name, args, 1)?;
1208 Ok(crate::functions::rtrim(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1209 }
1210 "btrim" => {
1211 require_args_min(name, args, 1)?;
1212 let c = expr_to_column(arg_expr(args, 0)?);
1213 let trim_str: Option<String> = arg_lit_opt_str(args, 1)?;
1214 Ok(btrim(&c, trim_str.as_deref()).into_expr())
1215 }
1216 "substring" | "substr" => {
1217 require_args_min(name, args, 2)?;
1218 let c = expr_to_column(arg_expr(args, 0)?);
1219 let start = arg_lit_i64(args, 1)?;
1220 let len = opt_lit_i64(args, 2);
1221 Ok(substring(&c, start, len).into_expr())
1222 }
1223 "concat" => {
1224 if args.is_empty() {
1225 return Err(PlanExprError(format!(
1226 "fn '{name}' requires at least one argument"
1227 )));
1228 }
1229 let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1230 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1231 let refs: Vec<&Column> = cols.iter().collect();
1232 Ok(concat(&refs).into_expr())
1233 }
1234 "concat_ws" => {
1235 require_args_min(name, args, 2)?;
1236 let sep = arg_lit_str(args, 0)?;
1237 let exprs: Result<Vec<Expr>, _> = args.iter().skip(1).map(expr_from_value).collect();
1238 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1239 let refs: Vec<&Column> = cols.iter().collect();
1240 Ok(concat_ws(&sep, &refs).into_expr())
1241 }
1242 "initcap" => {
1243 require_args(name, args, 1)?;
1244 Ok(initcap(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1245 }
1246 "repeat" => {
1247 require_args(name, args, 2)?;
1248 let c = expr_to_column(arg_expr(args, 0)?);
1249 let n = arg_lit_i32(args, 1)?;
1250 Ok(repeat(&c, n).into_expr())
1251 }
1252 "reverse" => {
1253 require_args(name, args, 1)?;
1254 Ok(reverse(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1255 }
1256 "instr" => {
1257 require_args(name, args, 2)?;
1258 let c = expr_to_column(arg_expr(args, 0)?);
1259 let substr = arg_lit_str(args, 1)?;
1260 Ok(instr(&c, &substr).into_expr())
1261 }
1262 "position" => {
1263 require_args_min(name, args, 2)?;
1264 let substr = arg_lit_str(args, 0)?;
1265 let c = expr_to_column(arg_expr(args, 1)?);
1266 let pos = opt_lit_i64(args, 2).unwrap_or(1);
1267 Ok(locate(&substr, &c, pos).into_expr())
1268 }
1269 "locate" => {
1270 require_args_min(name, args, 2)?;
1271 let substr = arg_lit_str(args, 0)?;
1272 let c = expr_to_column(arg_expr(args, 1)?);
1273 let pos = opt_lit_i64(args, 2).unwrap_or(1);
1274 Ok(locate(&substr, &c, pos).into_expr())
1275 }
1276 "ascii" => {
1277 require_args(name, args, 1)?;
1278 Ok(ascii(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1279 }
1280 "format_number" => {
1281 require_args(name, args, 2)?;
1282 let c = expr_to_column(arg_expr(args, 0)?);
1283 let decimals = arg_lit_u32(args, 1)?;
1284 Ok(format_number(&c, decimals).into_expr())
1285 }
1286 "overlay" => {
1287 require_args_min(name, args, 4)?;
1288 let c = expr_to_column(arg_expr(args, 0)?);
1289 let replace_str = arg_lit_str(args, 1)?;
1290 let pos = arg_lit_i64(args, 2)?;
1291 let len = arg_lit_i64(args, 3)?;
1292 Ok(overlay(&c, &replace_str, pos, len).into_expr())
1293 }
1294 "char" => {
1295 require_args(name, args, 1)?;
1296 Ok(rs_char(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1297 }
1298 "chr" => {
1299 require_args(name, args, 1)?;
1300 Ok(chr(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1301 }
1302 "base64" => {
1303 require_args(name, args, 1)?;
1304 Ok(base64(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1305 }
1306 "unbase64" => {
1307 require_args(name, args, 1)?;
1308 Ok(unbase64(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1309 }
1310 "sha1" => {
1311 require_args(name, args, 1)?;
1312 Ok(sha1(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1313 }
1314 "sha2" => {
1315 require_args(name, args, 2)?;
1316 let c = expr_to_column(arg_expr(args, 0)?);
1317 let bits = arg_lit_i32(args, 1)?;
1318 Ok(sha2(&c, bits).into_expr())
1319 }
1320 "md5" => {
1321 require_args(name, args, 1)?;
1322 Ok(md5(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1323 }
1324 "lpad" => {
1325 require_args(name, args, 3)?;
1326 let c = expr_to_column(arg_expr(args, 0)?);
1327 let len = arg_lit_i32(args, 1)?;
1328 let pad = arg_lit_str(args, 2)?;
1329 Ok(lpad(&c, len, &pad).into_expr())
1330 }
1331 "rpad" => {
1332 require_args(name, args, 3)?;
1333 let c = expr_to_column(arg_expr(args, 0)?);
1334 let len = arg_lit_i32(args, 1)?;
1335 let pad = arg_lit_str(args, 2)?;
1336 Ok(rpad(&c, len, &pad).into_expr())
1337 }
1338 "translate" => {
1339 require_args(name, args, 3)?;
1340 let c = expr_to_column(arg_expr(args, 0)?);
1341 let from_str = arg_lit_str(args, 1)?;
1342 let to_str = arg_lit_str(args, 2)?;
1343 Ok(translate(&c, &from_str, &to_str).into_expr())
1344 }
1345 "substring_index" => {
1346 require_args(name, args, 3)?;
1347 let c = expr_to_column(arg_expr(args, 0)?);
1348 let delim = arg_lit_str(args, 1)?;
1349 let count = arg_lit_i64(args, 2)?;
1350 Ok(substring_index(&c, &delim, count).into_expr())
1351 }
1352 "left" => {
1353 require_args(name, args, 2)?;
1354 let c = expr_to_column(arg_expr(args, 0)?);
1355 let n = arg_lit_i64(args, 1)?;
1356 Ok(left(&c, n).into_expr())
1357 }
1358 "right" => {
1359 require_args(name, args, 2)?;
1360 let c = expr_to_column(arg_expr(args, 0)?);
1361 let n = arg_lit_i64(args, 1)?;
1362 Ok(right(&c, n).into_expr())
1363 }
1364 "replace" => {
1365 require_args(name, args, 3)?;
1366 let c = expr_to_column(arg_expr(args, 0)?);
1367 let search = arg_lit_str(args, 1)?;
1368 let replacement = arg_lit_str(args, 2)?;
1369 Ok(replace(&c, &search, &replacement).into_expr())
1370 }
1371 "startswith" => {
1372 require_args(name, args, 2)?;
1373 let c = expr_to_column(arg_expr(args, 0)?);
1374 let prefix = arg_lit_str(args, 1)?;
1375 Ok(startswith(&c, &prefix).into_expr())
1376 }
1377 "endswith" => {
1378 require_args(name, args, 2)?;
1379 let c = expr_to_column(arg_expr(args, 0)?);
1380 let suffix = arg_lit_str(args, 1)?;
1381 Ok(endswith(&c, &suffix).into_expr())
1382 }
1383 "contains" => {
1384 require_args(name, args, 2)?;
1385 let c = expr_to_column(arg_expr(args, 0)?);
1386 let substring = arg_lit_str(args, 1)?;
1387 Ok(contains(&c, &substring).into_expr())
1388 }
1389 "like" => {
1390 require_args_min(name, args, 2)?;
1391 let c = expr_to_column(arg_expr(args, 0)?);
1392 let pattern = arg_lit_str(args, 1)?;
1393 let escape = arg_lit_opt_str(args, 2)?.and_then(|s| s.chars().next());
1394 Ok(like(&c, &pattern, escape).into_expr())
1395 }
1396 "ilike" => {
1397 require_args_min(name, args, 2)?;
1398 let c = expr_to_column(arg_expr(args, 0)?);
1399 let pattern = arg_lit_str(args, 1)?;
1400 let escape = arg_lit_opt_str(args, 2)?.and_then(|s| s.chars().next());
1401 Ok(ilike(&c, &pattern, escape).into_expr())
1402 }
1403 "rlike" | "regexp" => {
1404 require_args(name, args, 2)?;
1405 let c = expr_to_column(arg_expr(args, 0)?);
1406 let pattern = arg_lit_str(args, 1)?;
1407 Ok(rlike(&c, &pattern).into_expr())
1408 }
1409 "soundex" => {
1410 require_args(name, args, 1)?;
1411 Ok(soundex(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1412 }
1413 "levenshtein" => {
1414 require_args(name, args, 2)?;
1415 let a = expr_to_column(arg_expr(args, 0)?);
1416 let b = expr_to_column(arg_expr(args, 1)?);
1417 Ok(crate::functions::levenshtein(&a, &b).into_expr())
1418 }
1419 "crc32" => {
1420 require_args(name, args, 1)?;
1421 Ok(crc32(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1422 }
1423 "xxhash64" => {
1424 require_args(name, args, 1)?;
1425 Ok(xxhash64(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1426 }
1427 "regexp_extract" => {
1428 require_args(name, args, 3)?;
1430 let c = expr_to_column(arg_expr(args, 0)?);
1431 let pattern = arg_lit_str(args, 1).map_err(|_| {
1432 PlanExprError(
1433 "regexp_extract in plan requires literal pattern at arg 2 (column refs not supported)".into(),
1434 )
1435 })?;
1436 let group_index = arg_lit_usize(args, 2).map_err(|_| {
1437 PlanExprError(
1438 "regexp_extract in plan requires literal group index at arg 3 (column refs not supported)".into(),
1439 )
1440 })?;
1441 Ok(regexp_extract(&c, &pattern, group_index).into_expr())
1442 }
1443 "regexp_replace" => {
1444 require_args(name, args, 3)?;
1445 let c = expr_to_column(arg_expr(args, 0)?);
1446 let pattern = arg_lit_str(args, 1)?;
1447 let replacement = arg_lit_str(args, 2)?;
1448 Ok(regexp_replace(&c, &pattern, &replacement).into_expr())
1449 }
1450 "regexp_extract_all" => {
1451 require_args(name, args, 2)?;
1452 let c = expr_to_column(arg_expr(args, 0)?);
1453 let pattern = arg_lit_str(args, 1)?;
1454 Ok(regexp_extract_all(&c, &pattern).into_expr())
1455 }
1456 "regexp_like" => {
1457 require_args(name, args, 2)?;
1458 let c = expr_to_column(arg_expr(args, 0)?);
1459 let pattern = arg_lit_str(args, 1)?;
1460 Ok(regexp_like(&c, &pattern).into_expr())
1461 }
1462 "regexp_count" => {
1463 require_args(name, args, 2)?;
1464 let c = expr_to_column(arg_expr(args, 0)?);
1465 let pattern = arg_lit_str(args, 1)?;
1466 Ok(regexp_count(&c, &pattern).into_expr())
1467 }
1468 "regexp_substr" => {
1469 require_args(name, args, 2)?;
1470 let c = expr_to_column(arg_expr(args, 0)?);
1471 let pattern = arg_lit_str(args, 1)?;
1472 Ok(regexp_substr(&c, &pattern).into_expr())
1473 }
1474 "regexp_instr" => {
1475 require_args_min(name, args, 2)?;
1476 let c = expr_to_column(arg_expr(args, 0)?);
1477 let pattern = arg_lit_str(args, 1)?;
1478 let group_idx = args.get(2).and_then(|v| lit_as_usize(v).ok());
1479 Ok(regexp_instr(&c, &pattern, group_idx).into_expr())
1480 }
1481 "split" => {
1482 require_args_min(name, args, 2)?;
1483 if args.len() > 3 {
1484 return Err(PlanExprError("split takes at most 3 arguments".to_string()));
1485 }
1486 let c = expr_to_column(arg_expr(args, 0)?);
1487 let delimiter = arg_lit_str(args, 1)?;
1488 let limit = args
1489 .get(2)
1490 .and_then(|v| lit_as_i64(v).ok())
1491 .map(|n| n as i32);
1492 Ok(split(&c, &delimiter, limit).into_expr())
1493 }
1494 "split_part" => {
1495 require_args(name, args, 3)?;
1496 let c = expr_to_column(arg_expr(args, 0)?);
1497 let delimiter = arg_lit_str(args, 1)?;
1498 let part_num = arg_lit_i64(args, 2)?;
1499 Ok(split_part(&c, &delimiter, part_num).into_expr())
1500 }
1501 "find_in_set" => {
1502 require_args(name, args, 2)?;
1503 let str_col = expr_to_column(arg_expr(args, 0)?);
1504 let set_col = expr_to_column(arg_expr(args, 1)?);
1505 Ok(find_in_set(&str_col, &set_col).into_expr())
1506 }
1507 "format_string" | "printf" => {
1508 require_args_min(name, args, 2)?;
1509 let format_str = arg_lit_str(args, 0)?;
1510 let exprs: Result<Vec<Expr>, _> = args.iter().skip(1).map(expr_from_value).collect();
1511 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1512 let refs: Vec<&Column> = cols.iter().collect();
1513 Ok(format_string(&format_str, &refs).into_expr())
1514 }
1515 "lcase" => {
1516 require_args(name, args, 1)?;
1517 Ok(lcase(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1518 }
1519 "ucase" => {
1520 require_args(name, args, 1)?;
1521 Ok(ucase(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1522 }
1523 "mask" => {
1524 require_args_min(name, args, 1)?;
1525 let c = expr_to_column(arg_expr(args, 0)?);
1526 let u = args
1527 .get(1)
1528 .and_then(|v| lit_as_string(v).ok())
1529 .and_then(|s| s.chars().next());
1530 let l = args
1531 .get(2)
1532 .and_then(|v| lit_as_string(v).ok())
1533 .and_then(|s| s.chars().next());
1534 let d = args
1535 .get(3)
1536 .and_then(|v| lit_as_string(v).ok())
1537 .and_then(|s| s.chars().next());
1538 let o = args
1539 .get(4)
1540 .and_then(|v| lit_as_string(v).ok())
1541 .and_then(|s| s.chars().next());
1542 Ok(mask(&c, u, l, d, o).into_expr())
1543 }
1544 "str_to_map" => {
1545 require_args_min(name, args, 1)?;
1546 let c = expr_to_column(arg_expr(args, 0)?);
1547 let pair_delim: Option<String> = arg_lit_opt_str(args, 1)?;
1548 let key_value_delim: Option<String> = arg_lit_opt_str(args, 2)?;
1549 Ok(str_to_map(&c, pair_delim.as_deref(), key_value_delim.as_deref()).into_expr())
1550 }
1551 "get_json_object" => {
1552 require_args(name, args, 2)?;
1553 let c = expr_to_column(arg_expr(args, 0)?);
1554 let path = arg_lit_str(args, 1)?;
1555 Ok(get_json_object(&c, &path).into_expr())
1556 }
1557 "json_tuple" => {
1558 require_args_min(name, args, 2)?;
1559 let c = expr_to_column(arg_expr(args, 0)?);
1560 let keys: Vec<String> = args[1..]
1561 .iter()
1562 .map(lit_as_string)
1563 .collect::<Result<Vec<_>, _>>()?;
1564 let key_refs: Vec<&str> = keys.iter().map(String::as_str).collect();
1565 Ok(json_tuple(&c, &key_refs).into_expr())
1566 }
1567 "isin" => {
1568 require_args_min(name, args, 1)?;
1571 let col_expr = arg_expr(args, 0)?;
1572 let values_opt = try_values_for_isin(&args[1..])?;
1573 Ok(match values_opt {
1574 None => lit(false),
1575 Some(values_expr) => col_expr.is_in(values_expr, false),
1576 })
1577 }
1578 _ => expr_from_fn_rest(name, args),
1579 }
1580}
1581
1582fn expr_from_fn_rest(name: &str, args: &[Value]) -> Result<Expr, PlanExprError> {
1583 use crate::Column;
1584 #[allow(unused_imports)]
1585 use crate::functions::{
1586 abs, acos, add_months, array, array_agg, array_append, array_compact, array_contains,
1587 array_distinct, array_except, array_insert, array_intersect, array_join, array_max,
1588 array_min, array_prepend, array_remove, array_size, array_slice, array_sort, array_sum,
1589 array_union, arrays_overlap, arrays_zip, asin, atan, atan2, bround, cast, cbrt, ceiling,
1590 cos, cosh, cot, create_map, csc, curdate, current_catalog, current_database, current_date,
1591 current_schema, current_timestamp, current_timezone, current_user, date_add, date_diff,
1592 date_format, date_from_unix_date, date_part, date_sub, date_trunc, dateadd, datediff,
1593 datepart, day, dayname, dayofmonth, dayofweek, dayofyear, days, decode, degrees, e,
1594 element_at, encode, equal_null, exp, explode, explode_outer, expm1, extract, factorial,
1595 floor, from_unixtime, from_utc_timestamp, get, get_json_object, greatest, grouping,
1596 grouping_id, hash, hour, hours, hypot, input_file_name, last_day, least, localtimestamp,
1597 log, log1p, log2, log10, make_date, make_interval, make_timestamp, make_timestamp_ntz,
1598 map_keys, map_values, minute, minutes, monotonically_increasing_id, month, months,
1599 months_between, negate, next_day, now, nullif, nvl, nvl2, parse_url, pi, pmod, positive,
1600 pow, quarter, radians, rint, round, sec, second, shift_left, shift_right, signum, sin,
1601 sinh, size, spark_partition_id, sqrt, tan, tanh, timestamp_micros, timestamp_millis,
1602 timestamp_seconds, timestampadd, timestampdiff, to_binary, to_char, to_date, to_degrees,
1603 to_number, to_radians, to_timestamp, to_unix_timestamp, to_utc_timestamp, to_varchar,
1604 trunc, try_add, try_cast, try_divide, try_element_at, try_multiply, try_subtract,
1605 try_to_number, try_to_timestamp, typeof_, unix_date, unix_micros, unix_millis,
1606 unix_seconds, unix_timestamp, unix_timestamp_now, user, weekday, weekofyear, width_bucket,
1607 year, years,
1608 };
1609
1610 match name {
1612 "abs" => {
1613 require_args(name, args, 1)?;
1614 Ok(abs(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1615 }
1616 "ceil" | "ceiling" => {
1617 require_args(name, args, 1)?;
1618 Ok(ceiling(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1619 }
1620 "floor" => {
1621 require_args(name, args, 1)?;
1622 Ok(floor(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1623 }
1624 "round" => {
1625 require_args_min(name, args, 1)?;
1626 let c = expr_to_column(arg_expr(args, 0)?);
1627 let decimals = opt_lit_i64(args, 1).map(|n| n as u32).unwrap_or(0);
1628 Ok(round(&c, decimals).into_expr())
1629 }
1630 "bround" => {
1631 require_args_min(name, args, 1)?;
1632 let c = expr_to_column(arg_expr(args, 0)?);
1633 let scale = opt_lit_i64(args, 1).unwrap_or(0) as i32;
1634 Ok(bround(&c, scale).into_expr())
1635 }
1636 "negate" | "negative" => {
1637 require_args(name, args, 1)?;
1638 Ok(negate(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1639 }
1640 "positive" => {
1641 require_args(name, args, 1)?;
1642 Ok(positive(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1643 }
1644 "sqrt" => {
1645 require_args(name, args, 1)?;
1646 Ok(sqrt(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1647 }
1648 "pow" | "power" => {
1649 require_args(name, args, 2)?;
1650 let c = expr_to_column(arg_expr(args, 0)?);
1651 let exp_val = arg_lit_i64(args, 1)?;
1652 Ok(pow(&c, exp_val).into_expr())
1653 }
1654 "exp" => {
1655 require_args(name, args, 1)?;
1656 Ok(exp(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1657 }
1658 "log" | "ln" => {
1659 if args.len() == 1 {
1660 Ok(log(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1661 } else if args.len() == 2 {
1662 let col_expr = expr_to_column(arg_expr(args, 0)?);
1663 let base = match &args[1] {
1664 Value::Number(n) => n
1665 .as_f64()
1666 .ok_or_else(|| PlanExprError("log base must be a number".to_string()))?,
1667 _ => return Err(PlanExprError("log base must be a number".to_string())),
1668 };
1669 Ok(crate::functions::log_with_base(&col_expr, base).into_expr())
1670 } else {
1671 Err(PlanExprError(format!(
1672 "fn '{name}' requires 1 or 2 arguments"
1673 )))
1674 }
1675 }
1676 "sin" => {
1677 require_args(name, args, 1)?;
1678 Ok(sin(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1679 }
1680 "cos" => {
1681 require_args(name, args, 1)?;
1682 Ok(cos(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1683 }
1684 "tan" => {
1685 require_args(name, args, 1)?;
1686 Ok(tan(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1687 }
1688 "asin" => {
1689 require_args(name, args, 1)?;
1690 Ok(crate::functions::asin(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1691 }
1692 "acos" => {
1693 require_args(name, args, 1)?;
1694 Ok(crate::functions::acos(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1695 }
1696 "atan" => {
1697 require_args(name, args, 1)?;
1698 Ok(crate::functions::atan(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1699 }
1700 "atan2" => {
1701 require_args(name, args, 2)?;
1702 let y = expr_to_column(arg_expr(args, 0)?);
1703 let x = expr_to_column(arg_expr(args, 1)?);
1704 Ok(atan2(&y, &x).into_expr())
1705 }
1706 "degrees" => {
1707 require_args(name, args, 1)?;
1708 Ok(degrees(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1709 }
1710 "radians" => {
1711 require_args(name, args, 1)?;
1712 Ok(radians(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1713 }
1714 "signum" | "sign" => {
1715 require_args(name, args, 1)?;
1716 Ok(signum(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1717 }
1718 "cot" => {
1719 require_args(name, args, 1)?;
1720 Ok(cot(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1721 }
1722 "csc" => {
1723 require_args(name, args, 1)?;
1724 Ok(csc(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1725 }
1726 "sec" => {
1727 require_args(name, args, 1)?;
1728 Ok(sec(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1729 }
1730 "e" => {
1731 if !args.is_empty() {
1732 return Err(PlanExprError("fn 'e' takes no arguments".to_string()));
1733 }
1734 Ok(e().into_expr())
1735 }
1736 "pi" => {
1737 if !args.is_empty() {
1738 return Err(PlanExprError("fn 'pi' takes no arguments".to_string()));
1739 }
1740 Ok(pi().into_expr())
1741 }
1742 "pmod" => {
1743 require_args(name, args, 2)?;
1744 let a = expr_to_column(arg_expr(args, 0)?);
1745 let b = expr_to_column(arg_expr(args, 1)?);
1746 Ok(pmod(&a, &b).into_expr())
1747 }
1748 "factorial" => {
1749 require_args(name, args, 1)?;
1750 Ok(factorial(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1751 }
1752 "hypot" => {
1753 require_args(name, args, 2)?;
1754 let x = expr_to_column(arg_expr(args, 0)?);
1755 let y = expr_to_column(arg_expr(args, 1)?);
1756 Ok(hypot(&x, &y).into_expr())
1757 }
1758 "cosh" => {
1759 require_args(name, args, 1)?;
1760 Ok(cosh(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1761 }
1762 "sinh" => {
1763 require_args(name, args, 1)?;
1764 Ok(sinh(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1765 }
1766 "tanh" => {
1767 require_args(name, args, 1)?;
1768 Ok(tanh(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1769 }
1770 "cbrt" => {
1771 require_args(name, args, 1)?;
1772 Ok(cbrt(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1773 }
1774 "expm1" => {
1775 require_args(name, args, 1)?;
1776 Ok(crate::functions::expm1(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1777 }
1778 "log1p" => {
1779 require_args(name, args, 1)?;
1780 Ok(log1p(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1781 }
1782 "log10" => {
1783 require_args(name, args, 1)?;
1784 Ok(log10(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1785 }
1786 "log2" => {
1787 require_args(name, args, 1)?;
1788 Ok(log2(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1789 }
1790 "rint" => {
1791 require_args(name, args, 1)?;
1792 Ok(crate::functions::rint(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1793 }
1794 "to_degrees" => {
1795 require_args(name, args, 1)?;
1796 Ok(to_degrees(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1797 }
1798 "to_radians" => {
1799 require_args(name, args, 1)?;
1800 Ok(to_radians(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1801 }
1802 "cast" => {
1804 require_args(name, args, 2)?;
1805 let c = expr_to_column(arg_expr(args, 0)?);
1806 let type_name = arg_lit_str(args, 1)?;
1807 Ok(cast(&c, &type_name).map_err(PlanExprError)?.into_expr())
1808 }
1809 "try_cast" => {
1810 require_args(name, args, 2)?;
1811 let c = expr_to_column(arg_expr(args, 0)?);
1812 let type_name = arg_lit_str(args, 1)?;
1813 Ok(try_cast(&c, &type_name).map_err(PlanExprError)?.into_expr())
1814 }
1815 "nvl" | "ifnull" => {
1816 require_args(name, args, 2)?;
1817 let a = expr_to_column(arg_expr(args, 0)?);
1818 let b = expr_to_column(arg_expr(args, 1)?);
1819 Ok(nvl(&a, &b).into_expr())
1820 }
1821 "nullif" => {
1822 require_args(name, args, 2)?;
1823 let a = expr_to_column(arg_expr(args, 0)?);
1824 let b = expr_to_column(arg_expr(args, 1)?);
1825 Ok(nullif(&a, &b).into_expr())
1826 }
1827 "greatest" => {
1828 require_args_min(name, args, 1)?;
1829 let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1830 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1831 let refs: Vec<&Column> = cols.iter().collect();
1832 Ok(greatest(&refs).map_err(PlanExprError)?.into_expr())
1833 }
1834 "least" => {
1835 require_args_min(name, args, 1)?;
1836 let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
1837 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
1838 let refs: Vec<&Column> = cols.iter().collect();
1839 Ok(least(&refs).map_err(PlanExprError)?.into_expr())
1840 }
1841 "typeof" => {
1842 require_args(name, args, 1)?;
1843 Ok(typeof_(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1844 }
1845 "try_divide" => {
1846 require_args(name, args, 2)?;
1847 let a = expr_to_column(arg_expr(args, 0)?);
1848 let b = expr_to_column(arg_expr(args, 1)?);
1849 Ok(try_divide(&a, &b).into_expr())
1850 }
1851 "add" | "+" => {
1853 require_args(name, args, 2)?;
1854 let a = expr_to_column(arg_expr(args, 0)?);
1855 let b = expr_to_column(arg_expr(args, 1)?);
1856 Ok(a.add_pyspark(&b).into_expr())
1857 }
1858 "subtract" | "-" => {
1859 require_args(name, args, 2)?;
1860 let a = expr_to_column(arg_expr(args, 0)?);
1861 let b = expr_to_column(arg_expr(args, 1)?);
1862 Ok(a.subtract_pyspark(&b).into_expr())
1863 }
1864 "multiply" | "*" => {
1865 require_args(name, args, 2)?;
1866 let a = expr_to_column(arg_expr(args, 0)?);
1867 let b = expr_to_column(arg_expr(args, 1)?);
1868 Ok(a.multiply_pyspark(&b).into_expr())
1869 }
1870 "divide" | "/" => {
1871 require_args(name, args, 2)?;
1872 let a = expr_to_column(arg_expr(args, 0)?);
1873 let b = expr_to_column(arg_expr(args, 1)?);
1874 Ok(a.divide_pyspark(&b).into_expr())
1875 }
1876 "mod" | "remainder" | "%" => {
1877 require_args(name, args, 2)?;
1878 let a = expr_to_column(arg_expr(args, 0)?);
1879 let b = expr_to_column(arg_expr(args, 1)?);
1880 Ok(a.mod_pyspark(&b).into_expr())
1881 }
1882 "try_add" => {
1883 require_args(name, args, 2)?;
1884 let a = expr_to_column(arg_expr(args, 0)?);
1885 let b = expr_to_column(arg_expr(args, 1)?);
1886 Ok(try_add(&a, &b).into_expr())
1887 }
1888 "try_subtract" => {
1889 require_args(name, args, 2)?;
1890 let a = expr_to_column(arg_expr(args, 0)?);
1891 let b = expr_to_column(arg_expr(args, 1)?);
1892 Ok(try_subtract(&a, &b).into_expr())
1893 }
1894 "try_multiply" => {
1895 require_args(name, args, 2)?;
1896 let a = expr_to_column(arg_expr(args, 0)?);
1897 let b = expr_to_column(arg_expr(args, 1)?);
1898 Ok(try_multiply(&a, &b).into_expr())
1899 }
1900 "width_bucket" => {
1901 require_args(name, args, 4)?;
1902 let val = expr_to_column(arg_expr(args, 0)?);
1903 let min_val = arg_lit_f64(args, 1)?;
1904 let max_val = arg_lit_f64(args, 2)?;
1905 let num_bucket = arg_lit_i64(args, 3)?;
1906 if num_bucket <= 0 {
1907 return Err(PlanExprError(
1908 "width_bucket: num_bucket must be positive".into(),
1909 ));
1910 }
1911 Ok(width_bucket(&val, min_val, max_val, num_bucket).into_expr())
1912 }
1913 "equal_null" => {
1914 require_args(name, args, 2)?;
1915 let a = expr_to_column(arg_expr(args, 0)?);
1916 let b = expr_to_column(arg_expr(args, 1)?);
1917 Ok(equal_null(&a, &b).into_expr())
1918 }
1919 "year" => {
1921 require_args(name, args, 1)?;
1922 Ok(year(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1923 }
1924 "month" => {
1925 require_args(name, args, 1)?;
1926 Ok(month(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1927 }
1928 "day" | "dayofmonth" => {
1929 require_args(name, args, 1)?;
1930 Ok(day(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1931 }
1932 "hour" => {
1933 require_args(name, args, 1)?;
1934 Ok(hour(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1935 }
1936 "minute" => {
1937 require_args(name, args, 1)?;
1938 Ok(minute(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1939 }
1940 "second" => {
1941 require_args(name, args, 1)?;
1942 Ok(second(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1943 }
1944 "quarter" => {
1945 require_args(name, args, 1)?;
1946 Ok(quarter(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1947 }
1948 "weekofyear" => {
1949 require_args(name, args, 1)?;
1950 Ok(weekofyear(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1951 }
1952 "dayofweek" => {
1953 require_args(name, args, 1)?;
1954 Ok(dayofweek(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1955 }
1956 "dayofyear" => {
1957 require_args(name, args, 1)?;
1958 Ok(dayofyear(&expr_to_column(arg_expr(args, 0)?)).into_expr())
1959 }
1960 "to_date" => {
1961 require_args_min(name, args, 1)?;
1962 if args.len() > 2 {
1963 return Err(PlanExprError(format!(
1964 "fn '{name}' takes at most 2 argument(s)"
1965 )));
1966 }
1967 let col = expr_to_column(arg_expr(args, 0)?);
1968 let format_str = if args.len() == 2 {
1969 Some(arg_lit_str(args, 1)?)
1970 } else {
1971 None
1972 };
1973 to_date(&col, format_str.as_deref())
1974 .map_err(PlanExprError)
1975 .map(|c| c.into_expr())
1976 }
1977 "date_format" => {
1978 require_args(name, args, 2)?;
1979 let c = expr_to_column(arg_expr(args, 0)?);
1980 let format = arg_lit_str(args, 1)?;
1981 Ok(date_format(&c, &format).into_expr())
1982 }
1983 "date_add" => {
1984 require_args(name, args, 2)?;
1985 let c = expr_to_column(arg_expr(args, 0)?);
1986 let n = arg_lit_i32(args, 1)?;
1987 Ok(date_add(&c, n).into_expr())
1988 }
1989 "date_sub" => {
1990 require_args(name, args, 2)?;
1991 let c = expr_to_column(arg_expr(args, 0)?);
1992 let n = arg_lit_i32(args, 1)?;
1993 Ok(date_sub(&c, n).into_expr())
1994 }
1995 "datediff" | "date_diff" => {
1996 require_args(name, args, 2)?;
1997 let end = expr_to_column(arg_expr(args, 0)?);
1998 let start = expr_to_column(arg_expr(args, 1)?);
1999 Ok(datediff(&end, &start).into_expr())
2000 }
2001 "last_day" => {
2002 require_args(name, args, 1)?;
2003 Ok(last_day(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2004 }
2005 "trunc" => {
2006 require_args(name, args, 2)?;
2007 let c = expr_to_column(arg_expr(args, 0)?);
2008 let format = arg_lit_str(args, 1)?;
2009 Ok(trunc(&c, &format).into_expr())
2010 }
2011 "date_trunc" => {
2012 require_args(name, args, 2)?;
2013 let format = arg_lit_str(args, 0)?;
2014 let c = expr_to_column(arg_expr(args, 1)?);
2015 Ok(date_trunc(&format, &c).into_expr())
2016 }
2017 "add_months" => {
2018 require_args(name, args, 2)?;
2019 let c = expr_to_column(arg_expr(args, 0)?);
2020 let n = arg_lit_i32(args, 1)?;
2021 Ok(add_months(&c, n).into_expr())
2022 }
2023 "months_between" => {
2024 require_args_min(name, args, 2)?;
2025 let end = expr_to_column(arg_expr(args, 0)?);
2026 let start = expr_to_column(arg_expr(args, 1)?);
2027 let round_off = args
2028 .get(2)
2029 .and_then(|v| v.get("lit").and_then(Value::as_bool))
2030 .unwrap_or(true);
2031 Ok(months_between(&end, &start, round_off).into_expr())
2032 }
2033 "next_day" => {
2034 require_args(name, args, 2)?;
2035 let c = expr_to_column(arg_expr(args, 0)?);
2036 let day_of_week = arg_lit_str(args, 1)?;
2037 Ok(next_day(&c, &day_of_week).into_expr())
2038 }
2039 "unix_timestamp" => {
2040 if args.is_empty() {
2041 Ok(unix_timestamp_now().into_expr())
2042 } else {
2043 require_args_min(name, args, 1)?;
2044 let c = expr_to_column(arg_expr(args, 0)?);
2045 let format: Option<String> = arg_lit_opt_str(args, 1)?;
2046 Ok(unix_timestamp(&c, format.as_deref()).into_expr())
2047 }
2048 }
2049 "from_unixtime" => {
2050 require_args_min(name, args, 1)?;
2051 let c = expr_to_column(arg_expr(args, 0)?);
2052 let format: Option<String> = arg_lit_opt_str(args, 1)?;
2053 Ok(from_unixtime(&c, format.as_deref()).into_expr())
2054 }
2055 "to_unix_timestamp" => {
2056 require_args_min(name, args, 1)?;
2057 let c = expr_to_column(arg_expr(args, 0)?);
2058 let format: Option<String> = arg_lit_opt_str(args, 1)?;
2059 Ok(to_unix_timestamp(&c, format.as_deref()).into_expr())
2060 }
2061 "make_date" => {
2062 require_args(name, args, 3)?;
2063 let y = expr_to_column(arg_expr(args, 0)?);
2064 let m = expr_to_column(arg_expr(args, 1)?);
2065 let d = expr_to_column(arg_expr(args, 2)?);
2066 Ok(make_date(&y, &m, &d).into_expr())
2067 }
2068 "make_timestamp" => {
2069 require_args_min(name, args, 6)?;
2070 let y = expr_to_column(arg_expr(args, 0)?);
2071 let mo = expr_to_column(arg_expr(args, 1)?);
2072 let d = expr_to_column(arg_expr(args, 2)?);
2073 let h = expr_to_column(arg_expr(args, 3)?);
2074 let mi = expr_to_column(arg_expr(args, 4)?);
2075 let s = expr_to_column(arg_expr(args, 5)?);
2076 let tz: Option<String> = arg_lit_opt_str(args, 6)?;
2077 Ok(make_timestamp(&y, &mo, &d, &h, &mi, &s, tz.as_deref()).into_expr())
2078 }
2079 "make_timestamp_ntz" => {
2080 require_args(name, args, 6)?;
2081 let y = expr_to_column(arg_expr(args, 0)?);
2082 let mo = expr_to_column(arg_expr(args, 1)?);
2083 let d = expr_to_column(arg_expr(args, 2)?);
2084 let h = expr_to_column(arg_expr(args, 3)?);
2085 let mi = expr_to_column(arg_expr(args, 4)?);
2086 let s = expr_to_column(arg_expr(args, 5)?);
2087 Ok(make_timestamp_ntz(&y, &mo, &d, &h, &mi, &s).into_expr())
2088 }
2089 "timestampadd" => {
2090 require_args(name, args, 3)?;
2091 let unit = arg_lit_str(args, 0)?;
2092 let amount = expr_to_column(arg_expr(args, 1)?);
2093 let ts = expr_to_column(arg_expr(args, 2)?);
2094 Ok(timestampadd(&unit, &amount, &ts).into_expr())
2095 }
2096 "timestampdiff" => {
2097 require_args(name, args, 3)?;
2098 let unit = arg_lit_str(args, 0)?;
2099 let start = expr_to_column(arg_expr(args, 1)?);
2100 let end = expr_to_column(arg_expr(args, 2)?);
2101 Ok(timestampdiff(&unit, &start, &end).into_expr())
2102 }
2103 "days" => {
2104 require_args(name, args, 1)?;
2105 let n = arg_lit_i64(args, 0)?;
2106 Ok(days(n).into_expr())
2107 }
2108 "hours" => {
2109 require_args(name, args, 1)?;
2110 let n = arg_lit_i64(args, 0)?;
2111 Ok(hours(n).into_expr())
2112 }
2113 "minutes" => {
2114 require_args(name, args, 1)?;
2115 let n = arg_lit_i64(args, 0)?;
2116 Ok(minutes(n).into_expr())
2117 }
2118 "months" => {
2119 require_args(name, args, 1)?;
2120 let n = arg_lit_i64(args, 0)?;
2121 Ok(months(n).into_expr())
2122 }
2123 "years" => {
2124 require_args(name, args, 1)?;
2125 let n = arg_lit_i64(args, 0)?;
2126 Ok(years(n).into_expr())
2127 }
2128 "from_utc_timestamp" => {
2129 require_args(name, args, 2)?;
2130 let c = expr_to_column(arg_expr(args, 0)?);
2131 let tz = arg_lit_str(args, 1)?;
2132 Ok(from_utc_timestamp(&c, &tz).into_expr())
2133 }
2134 "to_utc_timestamp" => {
2135 require_args(name, args, 2)?;
2136 let c = expr_to_column(arg_expr(args, 0)?);
2137 let tz = arg_lit_str(args, 1)?;
2138 Ok(to_utc_timestamp(&c, &tz).into_expr())
2139 }
2140 "convert_timezone" => {
2141 require_args(name, args, 3)?;
2142 let source_tz = arg_lit_str(args, 0)?;
2143 let target_tz = arg_lit_str(args, 1)?;
2144 let c = expr_to_column(arg_expr(args, 2)?);
2145 Ok(crate::functions::convert_timezone(&source_tz, &target_tz, &c).into_expr())
2146 }
2147 "current_date" | "curdate" => {
2148 if !args.is_empty() {
2149 return Err(PlanExprError(format!("fn '{name}' takes no arguments")));
2150 }
2151 Ok(current_date().into_expr())
2152 }
2153 "current_timestamp" | "now" => {
2154 if !args.is_empty() {
2155 return Err(PlanExprError(format!("fn '{name}' takes no arguments")));
2156 }
2157 Ok(current_timestamp().into_expr())
2158 }
2159 "localtimestamp" => {
2160 if !args.is_empty() {
2161 return Err(PlanExprError(
2162 "fn 'localtimestamp' takes no arguments".to_string(),
2163 ));
2164 }
2165 Ok(localtimestamp().into_expr())
2166 }
2167 "extract" | "date_part" | "datepart" => {
2168 require_args(name, args, 2)?;
2169 let c = expr_to_column(arg_expr(args, 0)?);
2170 let field = arg_lit_str(args, 1)?;
2171 Ok(extract(&c, &field).into_expr())
2172 }
2173 "dateadd" => {
2174 require_args(name, args, 2)?;
2175 let c = expr_to_column(arg_expr(args, 0)?);
2176 let n = arg_lit_i32(args, 1)?;
2177 Ok(dateadd(&c, n).into_expr())
2178 }
2179 "unix_micros" | "unix_millis" | "unix_seconds" => {
2180 require_args(name, args, 1)?;
2181 let c = expr_to_column(arg_expr(args, 0)?);
2182 let out = match name {
2183 "unix_micros" => unix_micros(&c),
2184 "unix_millis" => unix_millis(&c),
2185 _ => unix_seconds(&c),
2186 };
2187 Ok(out.into_expr())
2188 }
2189 "dayname" => {
2190 require_args(name, args, 1)?;
2191 Ok(dayname(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2192 }
2193 "weekday" => {
2194 require_args(name, args, 1)?;
2195 Ok(weekday(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2196 }
2197 "timestamp_seconds" | "timestamp_millis" | "timestamp_micros" => {
2198 require_args(name, args, 1)?;
2199 let c = expr_to_column(arg_expr(args, 0)?);
2200 let out = match name {
2201 "timestamp_seconds" => timestamp_seconds(&c),
2202 "timestamp_millis" => timestamp_millis(&c),
2203 _ => timestamp_micros(&c),
2204 };
2205 Ok(out.into_expr())
2206 }
2207 "unix_date" => {
2208 require_args(name, args, 1)?;
2209 Ok(unix_date(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2210 }
2211 "date_from_unix_date" => {
2212 require_args(name, args, 1)?;
2213 Ok(date_from_unix_date(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2214 }
2215 "to_char" | "to_varchar" => {
2216 require_args_min(name, args, 1)?;
2217 let c = expr_to_column(arg_expr(args, 0)?);
2218 let format: Option<String> = arg_lit_opt_str(args, 1)?;
2219 Ok(to_char(&c, format.as_deref())
2220 .map_err(PlanExprError)?
2221 .into_expr())
2222 }
2223 "to_timestamp" => {
2224 require_args_min(name, args, 1)?;
2225 let c = expr_to_column(arg_expr(args, 0)?);
2226 let format: Option<String> = arg_lit_opt_str(args, 1)?;
2227 Ok(to_timestamp(&c, format.as_deref())
2228 .map_err(PlanExprError)?
2229 .into_expr())
2230 }
2231 "try_to_timestamp" => {
2232 require_args_min(name, args, 1)?;
2233 let c = expr_to_column(arg_expr(args, 0)?);
2234 let format: Option<String> = arg_lit_opt_str(args, 1)?;
2235 Ok(try_to_timestamp(&c, format.as_deref())
2236 .map_err(PlanExprError)?
2237 .into_expr())
2238 }
2239 "to_number" | "try_to_number" => {
2240 require_args_min(name, args, 1)?;
2241 let c = expr_to_column(arg_expr(args, 0)?);
2242 let format: Option<String> = arg_lit_opt_str(args, 1)?;
2243 let out = if name == "to_number" {
2244 to_number(&c, format.as_deref()).map_err(PlanExprError)?
2245 } else {
2246 try_to_number(&c, format.as_deref()).map_err(PlanExprError)?
2247 };
2248 Ok(out.into_expr())
2249 }
2250 "current_timezone" => {
2251 if !args.is_empty() {
2252 return Err(PlanExprError(
2253 "fn 'current_timezone' takes no arguments".to_string(),
2254 ));
2255 }
2256 Ok(current_timezone().into_expr())
2257 }
2258 "spark_partition_id"
2260 | "input_file_name"
2261 | "monotonically_increasing_id"
2262 | "current_catalog"
2263 | "current_database"
2264 | "current_schema"
2265 | "current_user"
2266 | "user" => {
2267 if !args.is_empty() {
2268 return Err(PlanExprError(format!("fn '{name}' takes no arguments")));
2269 }
2270 let out = match name {
2271 "spark_partition_id" => spark_partition_id(),
2272 "input_file_name" => input_file_name(),
2273 "monotonically_increasing_id" => monotonically_increasing_id(),
2274 "current_catalog" => current_catalog(),
2275 "current_database" => current_database(),
2276 "current_schema" => current_schema(),
2277 "current_user" => current_user(),
2278 "user" => user(),
2279 _ => current_catalog(), };
2281 Ok(out.into_expr())
2282 }
2283 "hash" => {
2284 require_args_min(name, args, 1)?;
2285 let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
2286 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
2287 let refs: Vec<&Column> = cols.iter().collect();
2288 Ok(crate::functions::hash(&refs).into_expr())
2289 }
2290 "shift_left" => {
2291 require_args(name, args, 2)?;
2292 let c = expr_to_column(arg_expr(args, 0)?);
2293 let n = arg_lit_i32(args, 1)?;
2294 Ok(shift_left(&c, n).into_expr())
2295 }
2296 "shift_right" => {
2297 require_args(name, args, 2)?;
2298 let c = expr_to_column(arg_expr(args, 0)?);
2299 let n = arg_lit_i32(args, 1)?;
2300 Ok(shift_right(&c, n).into_expr())
2301 }
2302 "version" => {
2303 if !args.is_empty() {
2304 return Err(PlanExprError("fn 'version' takes no arguments".to_string()));
2305 }
2306 Ok(crate::functions::version().into_expr())
2307 }
2308 "array" => {
2310 let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
2311 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
2312 let refs: Vec<&Column> = cols.iter().collect();
2313 Ok(array(&refs)
2314 .map_err(|e| PlanExprError(e.to_string()))?
2315 .into_expr())
2316 }
2317 "array_max" => {
2318 require_args(name, args, 1)?;
2319 Ok(crate::functions::array_max(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2320 }
2321 "array_min" => {
2322 require_args(name, args, 1)?;
2323 Ok(crate::functions::array_min(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2324 }
2325 "array_size" | "size" | "cardinality" => {
2326 require_args(name, args, 1)?;
2327 let c = expr_to_column(arg_expr(args, 0)?);
2328 Ok(array_size(&c).into_expr())
2329 }
2330 "element_at" => {
2331 require_args(name, args, 2)?;
2332 let c = expr_to_column(arg_expr(args, 0)?);
2333 let idx = arg_lit_i64(args, 1)?;
2334 Ok(element_at(&c, idx).into_expr())
2335 }
2336 "try_element_at" => {
2337 require_args(name, args, 2)?;
2338 let c = expr_to_column(arg_expr(args, 0)?);
2339 let idx = arg_lit_i64(args, 1)?;
2340 Ok(try_element_at(&c, idx).into_expr())
2341 }
2342 "array_contains" => {
2343 require_args(name, args, 2)?;
2344 let arr = expr_to_column(arg_expr(args, 0)?);
2345 let val = expr_to_column(arg_expr(args, 1)?);
2346 Ok(array_contains(&arr, &val).into_expr())
2347 }
2348 "array_join" => {
2349 require_args(name, args, 2)?;
2350 let c = expr_to_column(arg_expr(args, 0)?);
2351 let sep = arg_lit_str(args, 1)?;
2352 Ok(array_join(&c, &sep).into_expr())
2353 }
2354 "array_sort" => {
2355 require_args(name, args, 1)?;
2356 Ok(array_sort(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2357 }
2358 "array_distinct" => {
2359 require_args(name, args, 1)?;
2360 Ok(array_distinct(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2361 }
2362 "array_slice" => {
2363 require_args_min(name, args, 2)?;
2364 let c = expr_to_column(arg_expr(args, 0)?);
2365 let start = arg_lit_i64(args, 1)?;
2366 let length = opt_lit_i64(args, 2);
2367 Ok(array_slice(&c, start, length).into_expr())
2368 }
2369 "array_compact" => {
2370 require_args(name, args, 1)?;
2371 Ok(array_compact(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2372 }
2373 "array_remove" => {
2374 require_args(name, args, 2)?;
2375 let arr = expr_to_column(arg_expr(args, 0)?);
2376 let val = expr_to_column(arg_expr(args, 1)?);
2377 Ok(array_remove(&arr, &val).into_expr())
2378 }
2379 "explode" => {
2380 require_args(name, args, 1)?;
2381 Ok(explode(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2382 }
2383 "explode_outer" => {
2384 require_args(name, args, 1)?;
2385 Ok(explode_outer(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2386 }
2387 "inline" => {
2388 require_args(name, args, 1)?;
2389 Ok(crate::functions::inline(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2390 }
2391 "inline_outer" => {
2392 require_args(name, args, 1)?;
2393 Ok(crate::functions::inline_outer(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2394 }
2395 "sequence" => {
2396 require_args_min(name, args, 2)?;
2397 let start = expr_to_column(arg_expr(args, 0)?);
2398 let stop = expr_to_column(arg_expr(args, 1)?);
2399 let step = if args.len() > 2 {
2400 Some(expr_to_column(arg_expr(args, 2)?))
2401 } else {
2402 None
2403 };
2404 Ok(crate::functions::sequence(&start, &stop, step.as_ref()).into_expr())
2405 }
2406 "shuffle" => {
2407 require_args(name, args, 1)?;
2408 Ok(crate::functions::shuffle(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2409 }
2410 "array_position" => {
2411 require_args(name, args, 2)?;
2412 let arr = expr_to_column(arg_expr(args, 0)?);
2413 let val = expr_to_column(arg_expr(args, 1)?);
2414 Ok(crate::functions::array_position(&arr, &val).into_expr())
2415 }
2416 "array_append" => {
2417 require_args(name, args, 2)?;
2418 let arr = expr_to_column(arg_expr(args, 0)?);
2419 let elem = expr_to_column(arg_expr(args, 1)?);
2420 Ok(array_append(&arr, &elem).into_expr())
2421 }
2422 "array_prepend" => {
2423 require_args(name, args, 2)?;
2424 let arr = expr_to_column(arg_expr(args, 0)?);
2425 let elem = expr_to_column(arg_expr(args, 1)?);
2426 Ok(array_prepend(&arr, &elem).into_expr())
2427 }
2428 "array_insert" => {
2429 require_args(name, args, 3)?;
2430 let arr = expr_to_column(arg_expr(args, 0)?);
2431 let pos = expr_to_column(arg_expr(args, 1)?);
2432 let elem = expr_to_column(arg_expr(args, 2)?);
2433 Ok(array_insert(&arr, &pos, &elem).into_expr())
2434 }
2435 "array_except" => {
2436 require_args(name, args, 2)?;
2437 let a = expr_to_column(arg_expr(args, 0)?);
2438 let b = expr_to_column(arg_expr(args, 1)?);
2439 Ok(array_except(&a, &b).into_expr())
2440 }
2441 "array_intersect" => {
2442 require_args(name, args, 2)?;
2443 let a = expr_to_column(arg_expr(args, 0)?);
2444 let b = expr_to_column(arg_expr(args, 1)?);
2445 Ok(array_intersect(&a, &b).into_expr())
2446 }
2447 "array_union" => {
2448 require_args(name, args, 2)?;
2449 let a = expr_to_column(arg_expr(args, 0)?);
2450 let b = expr_to_column(arg_expr(args, 1)?);
2451 Ok(array_union(&a, &b).into_expr())
2452 }
2453 "arrays_overlap" => {
2454 require_args(name, args, 2)?;
2455 let a = expr_to_column(arg_expr(args, 0)?);
2456 let b = expr_to_column(arg_expr(args, 1)?);
2457 Ok(arrays_overlap(&a, &b).into_expr())
2458 }
2459 "arrays_zip" => {
2460 require_args(name, args, 2)?;
2461 let a = expr_to_column(arg_expr(args, 0)?);
2462 let b = expr_to_column(arg_expr(args, 1)?);
2463 Ok(arrays_zip(&a, &b).into_expr())
2464 }
2465 "array_agg" => {
2466 require_args(name, args, 1)?;
2467 Ok(array_agg(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2468 }
2469 "array_sum" => {
2470 require_args(name, args, 1)?;
2471 Ok(array_sum(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2472 }
2473 "create_map" | "createMap" => {
2475 let exprs: Result<Vec<Expr>, _> = args.iter().map(expr_from_value).collect();
2477 let cols: Vec<Column> = exprs?.into_iter().map(expr_to_column).collect();
2478 let refs: Vec<&Column> = cols.iter().collect();
2479 Ok(create_map(&refs)
2480 .map_err(|e| PlanExprError(e.to_string()))?
2481 .into_expr())
2482 }
2483 "map_keys" => {
2484 require_args(name, args, 1)?;
2485 Ok(map_keys(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2486 }
2487 "map_values" => {
2488 require_args(name, args, 1)?;
2489 Ok(map_values(&expr_to_column(arg_expr(args, 0)?)).into_expr())
2490 }
2491 "get" => {
2492 require_args(name, args, 2)?;
2493 let map_col = expr_to_column(arg_expr(args, 0)?);
2494 let key = expr_to_column(arg_expr(args, 1)?);
2495 Ok(get(&map_col, &key).into_expr())
2496 }
2497 "get_field" | "getField" => {
2498 require_args(name, args, 2)?;
2500 let struct_col = expr_to_column(arg_expr(args, 0)?);
2501 let field_name = lit_as_string(&args[1])?;
2502 Ok(struct_col.get_field(&field_name).into_expr())
2503 }
2504 "get_item" => {
2505 require_args(name, args, 2)?;
2507 let col_c = expr_to_column(arg_expr(args, 0)?);
2508 let second = &args[1];
2509 if let Some(idx) = second.get("lit").and_then(|v| v.as_i64()) {
2510 Ok(col_c.get_item(idx).into_expr())
2511 } else {
2512 let key = expr_to_column(arg_expr(args, 1)?);
2513 Ok(get(&col_c, &key).into_expr())
2514 }
2515 }
2516 "struct" => {
2517 require_args_min(name, args, 1)?;
2519 let cols: Vec<crate::Column> = (0..args.len())
2520 .map(|i| arg_expr(args, i).map(expr_to_column))
2521 .collect::<Result<Vec<_>, _>>()?;
2522 let refs: Vec<&crate::Column> = cols.iter().collect();
2523 Ok(crate::functions::struct_(&refs).into_expr())
2524 }
2525 "named_struct" => {
2526 require_args_min(name, args, 2)?;
2528 if !args.len().is_multiple_of(2) {
2529 return Err(PlanExprError(
2530 "named_struct requires even number of args (name, value pairs)".into(),
2531 ));
2532 }
2533 let mut names: Vec<String> = Vec::new();
2534 let mut cols: Vec<crate::Column> = Vec::new();
2535 for i in (0..args.len()).step_by(2) {
2536 names.push(lit_as_string(&args[i])?);
2537 cols.push(expr_to_column(arg_expr(args, i + 1)?));
2538 }
2539 let refs: Vec<(&str, &crate::Column)> =
2540 names.iter().map(|s| s.as_str()).zip(cols.iter()).collect();
2541 Ok(crate::functions::named_struct(&refs).into_expr())
2542 }
2543 "with_field" | "withField" => {
2544 require_args(name, args, 3)?;
2546 let struct_col = expr_to_column(arg_expr(args, 0)?);
2547 let field_name = lit_as_string(&args[1])?;
2548 let value_col = expr_to_column(arg_expr(args, 2)?);
2549 let out = struct_col
2550 .try_with_field(&field_name, &value_col)
2551 .map_err(|e| PlanExprError(format!("with_field: {e}")))?;
2552 Ok(out.into_expr())
2553 }
2554 "nvl2" => {
2555 require_args(name, args, 3)?;
2556 let col1 = expr_to_column(arg_expr(args, 0)?);
2557 let col2 = expr_to_column(arg_expr(args, 1)?);
2558 let col3 = expr_to_column(arg_expr(args, 2)?);
2559 Ok(nvl2(&col1, &col2, &col3).into_expr())
2560 }
2561 _ => Err(PlanExprError(format!("unsupported function: {name}"))),
2562 }
2563}
2564
2565fn require_args(name: &str, args: &[Value], n: usize) -> Result<(), PlanExprError> {
2566 if args.len() != n {
2567 return Err(PlanExprError(format!(
2568 "fn '{name}' requires exactly {n} argument(s)"
2569 )));
2570 }
2571 Ok(())
2572}
2573
2574fn require_args_min(name: &str, args: &[Value], n: usize) -> Result<(), PlanExprError> {
2575 if args.len() < n {
2576 return Err(PlanExprError(format!(
2577 "fn '{name}' requires at least {n} argument(s)"
2578 )));
2579 }
2580 Ok(())
2581}
2582
2583#[cfg(test)]
2584mod tests {
2585 use super::*;
2586 use serde_json::json;
2587
2588 #[test]
2589 fn test_bare_string_column_ref() {
2590 let v = json!("age");
2592 let e = expr_from_value(&v).unwrap();
2593 assert!(matches!(e, polars::prelude::Expr::Column(_)));
2594 }
2595
2596 #[test]
2597 fn test_col() {
2598 let v = json!({"col": "age"});
2599 let _e = expr_from_value(&v).unwrap();
2600 }
2601
2602 #[test]
2603 fn test_lit_i64() {
2604 let v = json!({"lit": 30});
2605 let _ = expr_from_value(&v).unwrap();
2606 }
2607
2608 #[test]
2609 fn test_gt() {
2610 let v = json!({
2611 "op": "gt",
2612 "left": {"col": "age"},
2613 "right": {"lit": 30}
2614 });
2615 let _ = expr_from_value(&v).unwrap();
2616 }
2617
2618 #[test]
2619 fn test_and() {
2620 let v = json!({
2621 "op": "and",
2622 "left": {"op": "gt", "left": {"col": "a"}, "right": {"lit": 1}},
2623 "right": {"op": "lt", "left": {"col": "b"}, "right": {"lit": 10}}
2624 });
2625 let _ = expr_from_value(&v).unwrap();
2626 }
2627
2628 #[test]
2629 fn test_upper() {
2630 let v = json!({"fn": "upper", "args": [{"col": "name"}]});
2631 let _ = expr_from_value(&v).unwrap();
2632 }
2633
2634 #[test]
2635 fn test_length() {
2636 let v = json!({"fn": "length", "args": [{"col": "name"}]});
2637 let _ = expr_from_value(&v).unwrap();
2638 }
2639
2640 #[test]
2641 fn test_substring() {
2642 let v = json!({
2643 "fn": "substring",
2644 "args": [{"col": "s"}, {"lit": 1}, {"lit": 3}]
2645 });
2646 let _ = expr_from_value(&v).unwrap();
2647 }
2648
2649 #[test]
2650 fn test_year() {
2651 let v = json!({"fn": "year", "args": [{"col": "ts"}]});
2652 let _ = expr_from_value(&v).unwrap();
2653 }
2654
2655 #[test]
2656 fn test_cast() {
2657 let v = json!({
2658 "fn": "cast",
2659 "args": [{"col": "x"}, {"lit": "string"}]
2660 });
2661 let _ = expr_from_value(&v).unwrap();
2662 }
2663
2664 #[test]
2665 fn test_isin_op() {
2666 let v = json!({
2667 "op": "isin",
2668 "left": {"col": "id"},
2669 "right": {"lit": [1, 3]}
2670 });
2671 let _ = expr_from_value(&v).unwrap();
2672 }
2673
2674 #[test]
2675 fn test_isin_fn() {
2676 let v = json!({
2677 "fn": "isin",
2678 "args": [{"col": "id"}, {"lit": 1}, {"lit": 3}]
2679 });
2680 let _ = expr_from_value(&v).unwrap();
2681 }
2682
2683 #[test]
2685 fn test_isin_op_empty() {
2686 let v = json!({
2687 "op": "isin",
2688 "left": {"col": "id"},
2689 "right": {"lit": []}
2690 });
2691 let expr = expr_from_value(&v).unwrap();
2692 assert!(matches!(expr, Expr::Literal(_)));
2694 }
2695
2696 #[test]
2698 fn test_isin_fn_empty() {
2699 let v = json!({
2700 "fn": "isin",
2701 "args": [{"col": "id"}]
2702 });
2703 let expr = expr_from_value(&v).unwrap();
2704 assert!(matches!(expr, Expr::Literal(_)));
2705 let v2 = json!({
2706 "fn": "isin",
2707 "args": [{"col": "id"}, {"lit": null}]
2708 });
2709 let expr2 = expr_from_value(&v2).unwrap();
2710 assert!(matches!(expr2, Expr::Literal(_)));
2711 }
2712
2713 #[test]
2714 fn test_struct_named_struct_fn() {
2715 let v = json!({"fn": "struct", "args": [{"col": "a"}, {"col": "b"}]});
2716 let _ = expr_from_value(&v).unwrap();
2717 let v2 = json!({
2718 "fn": "named_struct",
2719 "args": [{"lit": "x"}, {"col": "a"}, {"lit": "y"}, {"col": "b"}]
2720 });
2721 let _ = expr_from_value(&v2).unwrap();
2722 }
2723
2724 #[test]
2725 fn test_get_item_fn() {
2726 let v = json!({"fn": "get_item", "args": [{"col": "arr"}, {"lit": 0}]});
2727 let _ = expr_from_value(&v).unwrap();
2728 let v2 = json!({"fn": "get_item", "args": [{"col": "m"}, {"lit": "key"}]});
2729 let _ = expr_from_value(&v2).unwrap();
2730 }
2731
2732 #[test]
2733 fn test_get_item_op() {
2734 let v = json!({"op": "getItem", "left": {"col": "arr"}, "right": {"lit": 1}});
2735 let _ = expr_from_value(&v).unwrap();
2736 }
2737
2738 #[test]
2739 fn test_startswith_op() {
2740 let v = json!({
2741 "op": "startswith",
2742 "left": {"col": "name"},
2743 "right": {"lit": "A"}
2744 });
2745 let _ = expr_from_value(&v).unwrap();
2746 }
2747
2748 #[test]
2749 fn test_is_null_op() {
2750 let v = json!({"op": "is_null", "arg": {"col": "x"}});
2751 let _ = expr_from_value(&v).unwrap();
2752 }
2753
2754 #[test]
2755 fn test_is_not_null_op() {
2756 let v = json!({"op": "is_not_null", "arg": {"col": "x"}});
2757 let _ = expr_from_value(&v).unwrap();
2758 }
2759
2760 #[test]
2761 fn test_regexp_extract_op() {
2762 let v = json!({
2763 "op": "regexp_extract",
2764 "left": {"col": "s"},
2765 "pattern": {"lit": r"(\w+)"},
2766 "group": {"lit": 1}
2767 });
2768 let _ = expr_from_value(&v).unwrap();
2769 }
2770
2771 #[test]
2773 fn test_regexp_extract_fn_bare_literals() {
2774 let v = json!({
2775 "fn": "regexp_extract",
2776 "args": [{"col": "s"}, r"(\w+)", 1]
2777 });
2778 let _ = expr_from_value(&v).unwrap();
2779 }
2780
2781 #[test]
2782 fn test_regexp_replace_op() {
2783 let v = json!({
2784 "op": "regexp_replace",
2785 "left": {"col": "str"},
2786 "pattern": {"lit": r"\d"},
2787 "replacement": {"lit": "X"}
2788 });
2789 let _ = expr_from_value(&v).unwrap();
2790 let v2 = json!({
2791 "op": "regexp_replace",
2792 "args": [{"col": "str"}, {"lit": r"\d"}, {"lit": "X"}]
2793 });
2794 let _ = expr_from_value(&v2).unwrap();
2795 }
2796
2797 #[test]
2798 fn test_create_map_op() {
2799 let v = json!({
2800 "op": "create_map",
2801 "args": [{"lit": "k"}, {"col": "a"}]
2802 });
2803 let _ = expr_from_value(&v).unwrap();
2804 }
2805
2806 #[test]
2807 fn test_create_map_fn_empty() {
2808 let v = json!({"fn": "create_map", "args": []});
2810 let _ = expr_from_value(&v).unwrap();
2811 }
2812
2813 #[test]
2814 fn test_type_window_row_number_order_by() {
2815 let v = json!({
2816 "type": "window",
2817 "fn": "row_number",
2818 "window": {"order_by": ["val"]}
2819 });
2820 let _ = expr_from_value(&v).unwrap();
2821 }
2822
2823 #[test]
2825 fn test_window_row_number_sparkless_format() {
2826 let v = json!({
2827 "fn": "row_number",
2828 "args": [],
2829 "window": {
2830 "partition_by": ["dept"],
2831 "order_by": [{"col": "salary", "asc": true}]
2832 }
2833 });
2834 let _ = expr_from_value(&v).unwrap();
2835 }
2836
2837 #[test]
2839 fn test_row_number_window_empty() {
2840 let v = json!({
2841 "type": "window",
2842 "fn": "row_number",
2843 "window": {}
2844 });
2845 let _ = expr_from_value(&v).unwrap();
2846 let v2 = json!({
2847 "fn": "row_number",
2848 "args": [],
2849 "window": {"partition_by": [], "order_by": []}
2850 });
2851 let _ = expr_from_value(&v2).unwrap();
2852 }
2853
2854 #[test]
2856 fn test_type_window_function_key() {
2857 let v = json!({
2858 "type": "window",
2859 "function": "row_number",
2860 "window": {"partition_by": ["dept"]}
2861 });
2862 let _ = expr_from_value(&v).unwrap();
2863 }
2864
2865 #[test]
2867 fn test_window_rank_dense_rank() {
2868 let v = json!({
2869 "fn": "rank",
2870 "args": [],
2871 "window": {"partition_by": ["dept"], "order_by": ["salary"]}
2872 });
2873 let _ = expr_from_value(&v).unwrap();
2874 let v2 = json!({
2875 "type": "window",
2876 "fn": "dense_rank",
2877 "window": {"order_by": ["val"]}
2878 });
2879 let _ = expr_from_value(&v2).unwrap();
2880 }
2881
2882 #[test]
2883 fn test_window_plus_literal_op() {
2884 let v = json!({
2886 "op": "add",
2887 "left": {"type": "window", "fn": "row_number", "window": {"order_by": ["val"]}},
2888 "right": {"lit": 10}
2889 });
2890 let _ = expr_from_value(&v).unwrap();
2891 }
2892
2893 #[test]
2894 fn test_when_two_arg() {
2895 let v = json!({
2896 "fn": "when",
2897 "args": [
2898 {"op": "gt", "left": {"col": "a"}, "right": {"lit": 0}},
2899 {"lit": "positive"}
2900 ]
2901 });
2902 let _ = expr_from_value(&v).unwrap();
2903 }
2904
2905 #[test]
2906 fn test_concat() {
2907 let v = json!({
2908 "fn": "concat",
2909 "args": [{"col": "first"}, {"lit": " "}, {"col": "last"}]
2910 });
2911 let _ = expr_from_value(&v).unwrap();
2912 }
2913
2914 #[test]
2916 fn test_concat_op() {
2917 let v = json!({
2918 "op": "concat",
2919 "args": [{"col": "a"}, {"col": "b"}]
2920 });
2921 let _ = expr_from_value(&v).unwrap();
2922 }
2923
2924 #[test]
2926 fn test_contains_op() {
2927 let v = json!({
2928 "op": "contains",
2929 "args": [{"col": "name"}, {"lit": "lic"}]
2930 });
2931 let _ = expr_from_value(&v).unwrap();
2932 }
2933
2934 #[test]
2935 fn test_greatest() {
2936 let v = json!({
2937 "fn": "greatest",
2938 "args": [{"col": "a"}, {"col": "b"}, {"lit": 0}]
2939 });
2940 let _ = expr_from_value(&v).unwrap();
2941 }
2942
2943 #[test]
2944 fn test_array_size() {
2945 let v = json!({"fn": "array_size", "args": [{"col": "arr"}]});
2946 let _ = expr_from_value(&v).unwrap();
2947 }
2948
2949 #[test]
2950 fn test_element_at() {
2951 let v = json!({"fn": "element_at", "args": [{"col": "arr"}, {"lit": 1}]});
2952 let _ = expr_from_value(&v).unwrap();
2953 }
2954
2955 #[test]
2956 fn test_coalesce() {
2957 let v = json!({
2958 "fn": "coalesce",
2959 "args": [{"col": "a"}, {"col": "b"}, {"lit": 0}]
2960 });
2961 let _ = expr_from_value(&v).unwrap();
2962 }
2963
2964 #[test]
2966 fn test_between_string_column_numeric_bounds() {
2967 let v = json!({
2968 "op": "between",
2969 "left": {"col": "val"},
2970 "lower": {"lit": 1},
2971 "upper": {"lit": 10}
2972 });
2973 let expr = expr_from_value(&v).unwrap();
2974 let _ = expr;
2976 }
2977}