Skip to main content

robin_sparkless_expr/
functions.rs

1use crate::column::Column;
2use polars::prelude::*;
3
4// -----------------------------------------------------------------------------
5// -----------------------------------------------------------------------------
6
7/// Sort order specification for use in orderBy/sort. Holds expr + direction + null placement.
8#[derive(Debug, Clone)]
9pub struct SortOrder {
10    pub(crate) expr: Expr,
11    pub descending: bool,
12    pub nulls_last: bool,
13}
14
15impl SortOrder {
16    pub fn expr(&self) -> &Expr {
17        &self.expr
18    }
19}
20
21/// Ascending sort, nulls first (Spark default for ASC).
22pub fn asc(column: &Column) -> SortOrder {
23    SortOrder {
24        expr: column.expr().clone(),
25        descending: false,
26        nulls_last: false,
27    }
28}
29
30/// Ascending sort, nulls first.
31pub fn asc_nulls_first(column: &Column) -> SortOrder {
32    SortOrder {
33        expr: column.expr().clone(),
34        descending: false,
35        nulls_last: false,
36    }
37}
38
39/// Ascending sort, nulls last.
40pub fn asc_nulls_last(column: &Column) -> SortOrder {
41    SortOrder {
42        expr: column.expr().clone(),
43        descending: false,
44        nulls_last: true,
45    }
46}
47
48/// Descending sort, nulls last (Spark default for DESC).
49pub fn desc(column: &Column) -> SortOrder {
50    SortOrder {
51        expr: column.expr().clone(),
52        descending: true,
53        nulls_last: true,
54    }
55}
56
57/// Descending sort, nulls first.
58pub fn desc_nulls_first(column: &Column) -> SortOrder {
59    SortOrder {
60        expr: column.expr().clone(),
61        descending: true,
62        nulls_last: false,
63    }
64}
65
66/// Descending sort, nulls last.
67pub fn desc_nulls_last(column: &Column) -> SortOrder {
68    SortOrder {
69        expr: column.expr().clone(),
70        descending: true,
71        nulls_last: true,
72    }
73}
74
75// -----------------------------------------------------------------------------
76
77/// Parse PySpark-like type name to Polars DataType.
78/// Decimal(precision, scale) is mapped to Float64 for schema parity (Polars dtype-decimal not enabled).
79pub fn parse_type_name(name: &str) -> Result<DataType, String> {
80    let s = name.trim().to_lowercase();
81    if s.starts_with("decimal(") && s.contains(')') {
82        return Ok(DataType::Float64);
83    }
84    Ok(match s.as_str() {
85        "int" | "integer" => DataType::Int32,
86        "long" | "bigint" => DataType::Int64,
87        "float" => DataType::Float32,
88        "double" => DataType::Float64,
89        "string" | "str" => DataType::String,
90        "boolean" | "bool" => DataType::Boolean,
91        "date" => DataType::Date,
92        "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
93        _ => return Err(format!("unknown type name: {name}")),
94    })
95}
96
97/// Get a column by name
98pub fn col(name: &str) -> Column {
99    Column::new(name.to_string())
100}
101
102/// Grouping set marker (PySpark grouping). Stub: returns 0 (no GROUPING SETS in robin-sparkless).
103pub fn grouping(column: &Column) -> Column {
104    let _ = column;
105    Column::from_expr(lit(0i32), Some("grouping".to_string()))
106}
107
108/// Grouping set id (PySpark grouping_id). Stub: returns 0.
109pub fn grouping_id(_columns: &[Column]) -> Column {
110    Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
111}
112
113/// Create a literal column from a value
114pub fn lit_i32(value: i32) -> Column {
115    let expr: Expr = lit(value);
116    Column::from_expr(expr, None)
117}
118
119pub fn lit_i64(value: i64) -> Column {
120    let expr: Expr = lit(value);
121    Column::from_expr(expr, None)
122}
123
124pub fn lit_f64(value: f64) -> Column {
125    let expr: Expr = lit(value);
126    Column::from_expr(expr, None)
127}
128
129pub fn lit_bool(value: bool) -> Column {
130    let expr: Expr = lit(value);
131    Column::from_expr(expr, None)
132}
133
134pub fn lit_str(value: &str) -> Column {
135    let expr: Expr = lit(value);
136    Column::from_expr(expr, None)
137}
138
139/// Typed null literal column. Returns `Err` on unknown type name.
140/// See [`parse_type_name`] for supported type strings (e.g. `"boolean"`, `"string"`, `"bigint"`).
141pub fn lit_null(dtype: &str) -> Result<Column, String> {
142    Column::lit_null(dtype)
143}
144
145/// Count aggregation
146pub fn count(col: &Column) -> Column {
147    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
148}
149
150/// Sum aggregation
151pub fn sum(col: &Column) -> Column {
152    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
153}
154
155/// Average aggregation
156pub fn avg(col: &Column) -> Column {
157    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
158}
159
160/// Alias for avg (PySpark mean).
161pub fn mean(col: &Column) -> Column {
162    avg(col)
163}
164
165/// Maximum aggregation
166pub fn max(col: &Column) -> Column {
167    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
168}
169
170/// Minimum aggregation
171pub fn min(col: &Column) -> Column {
172    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
173}
174
175/// First value in group (PySpark first). Use in groupBy.agg(). ignorenulls: when true, first non-null; Polars 0.45 uses .first() only (ignorenulls reserved for API compatibility).
176pub fn first(col: &Column, ignorenulls: bool) -> Column {
177    let _ = ignorenulls;
178    Column::from_expr(col.expr().clone().first(), None)
179}
180
181/// Any value from the group (PySpark any_value). Use in groupBy.agg(). ignorenulls reserved for API compatibility.
182pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
183    let _ = ignorenulls;
184    Column::from_expr(col.expr().clone().first(), None)
185}
186
187/// Count rows where condition is true (PySpark count_if). Use in groupBy.agg(); column should be boolean (true=1, false=0).
188pub fn count_if(col: &Column) -> Column {
189    use polars::prelude::DataType;
190    Column::from_expr(
191        col.expr().clone().cast(DataType::Int64).sum(),
192        Some("count_if".to_string()),
193    )
194}
195
196/// Sum aggregation; null on overflow (PySpark try_sum). Use in groupBy.agg(). Polars sum does not overflow; reserved for API.
197pub fn try_sum(col: &Column) -> Column {
198    Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
199}
200
201/// Average aggregation; null on invalid (PySpark try_avg). Use in groupBy.agg(). Maps to mean; reserved for API.
202pub fn try_avg(col: &Column) -> Column {
203    Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
204}
205
206/// Value of value_col in the row where ord_col is maximum (PySpark max_by). Use in groupBy.agg().
207pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
208    use polars::prelude::{SortOptions, as_struct};
209    let st = as_struct(vec![
210        ord_col.expr().clone().alias("_ord"),
211        value_col.expr().clone().alias("_val"),
212    ]);
213    let e = st
214        .sort(SortOptions::default().with_order_descending(true))
215        .first()
216        .struct_()
217        .field_by_name("_val");
218    Column::from_expr(e, None)
219}
220
221/// Value of value_col in the row where ord_col is minimum (PySpark min_by). Use in groupBy.agg().
222pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
223    use polars::prelude::{SortOptions, as_struct};
224    let st = as_struct(vec![
225        ord_col.expr().clone().alias("_ord"),
226        value_col.expr().clone().alias("_val"),
227    ]);
228    let e = st
229        .sort(SortOptions::default())
230        .first()
231        .struct_()
232        .field_by_name("_val");
233    Column::from_expr(e, None)
234}
235
236/// Collect column values into list per group (PySpark collect_list). Use in groupBy.agg().
237pub fn collect_list(col: &Column) -> Column {
238    Column::from_expr(
239        col.expr().clone().implode(),
240        Some("collect_list".to_string()),
241    )
242}
243
244/// Collect distinct column values into list per group (PySpark collect_set). Use in groupBy.agg().
245pub fn collect_set(col: &Column) -> Column {
246    Column::from_expr(
247        col.expr().clone().unique().implode(),
248        Some("collect_set".to_string()),
249    )
250}
251
252/// Boolean AND across group (PySpark bool_and). Use in groupBy.agg(); column should be boolean.
253pub fn bool_and(col: &Column) -> Column {
254    Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
255}
256
257/// Alias for bool_and (PySpark every). Use in groupBy.agg().
258pub fn every(col: &Column) -> Column {
259    Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
260}
261
262/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
263pub fn stddev(col: &Column) -> Column {
264    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
265}
266
267/// Variance (sample) aggregation (PySpark variance / var_samp)
268pub fn variance(col: &Column) -> Column {
269    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
270}
271
272/// Population standard deviation (ddof=0). PySpark stddev_pop.
273pub fn stddev_pop(col: &Column) -> Column {
274    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
275}
276
277/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
278pub fn stddev_samp(col: &Column) -> Column {
279    stddev(col)
280}
281
282/// Alias for stddev (PySpark std).
283pub fn std(col: &Column) -> Column {
284    stddev(col)
285}
286
287/// Population variance (ddof=0). PySpark var_pop.
288pub fn var_pop(col: &Column) -> Column {
289    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
290}
291
292/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
293pub fn var_samp(col: &Column) -> Column {
294    variance(col)
295}
296
297/// Median aggregation. PySpark median.
298pub fn median(col: &Column) -> Column {
299    use polars::prelude::QuantileMethod;
300    Column::from_expr(
301        col.expr()
302            .clone()
303            .quantile(lit(0.5), QuantileMethod::Linear),
304        Some("median".to_string()),
305    )
306}
307
308/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0. accuracy reserved for API compatibility.
309pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
310    use polars::prelude::QuantileMethod;
311    Column::from_expr(
312        col.expr()
313            .clone()
314            .quantile(lit(percentage), QuantileMethod::Linear),
315        Some(format!("approx_percentile({percentage})")),
316    )
317}
318
319/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
320pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
321    approx_percentile(col, percentage, accuracy)
322}
323
324/// Mode aggregation - most frequent value. PySpark mode.
325pub fn mode(col: &Column) -> Column {
326    col.clone().mode()
327}
328
329/// Count distinct aggregation (PySpark countDistinct)
330pub fn count_distinct(col: &Column) -> Column {
331    use polars::prelude::DataType;
332    Column::from_expr(
333        col.expr().clone().n_unique().cast(DataType::Int64),
334        Some("count_distinct".to_string()),
335    )
336}
337
338/// Approximate count distinct (PySpark approx_count_distinct). Use in groupBy.agg(). rsd reserved for API compatibility; Polars uses exact n_unique.
339pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
340    use polars::prelude::DataType;
341    Column::from_expr(
342        col.expr().clone().n_unique().cast(DataType::Int64),
343        Some("approx_count_distinct".to_string()),
344    )
345}
346
347/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
348pub fn kurtosis(col: &Column) -> Column {
349    Column::from_expr(
350        col.expr()
351            .clone()
352            .cast(DataType::Float64)
353            .kurtosis(true, true),
354        Some("kurtosis".to_string()),
355    )
356}
357
358/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
359pub fn skewness(col: &Column) -> Column {
360    Column::from_expr(
361        col.expr().clone().cast(DataType::Float64).skew(true),
362        Some("skewness".to_string()),
363    )
364}
365
366/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
367pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
368    use polars::prelude::{col as pl_col, len};
369    let c1 = pl_col(col1).cast(DataType::Float64);
370    let c2 = pl_col(col2).cast(DataType::Float64);
371    let n = len().cast(DataType::Float64);
372    let sum_ab = (c1.clone() * c2.clone()).sum();
373    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
374    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
375    (sum_ab - sum_a * sum_b / n.clone()) / n
376}
377
378/// Population covariance aggregation (PySpark covar_pop). Module-level; use in groupBy.agg() with two columns.
379pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
380    use polars::prelude::len;
381    let c1 = col1.expr().clone().cast(DataType::Float64);
382    let c2 = col2.expr().clone().cast(DataType::Float64);
383    let n = len().cast(DataType::Float64);
384    let sum_ab = (c1.clone() * c2.clone()).sum();
385    let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
386    let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
387    let e = (sum_ab - sum_a * sum_b / n.clone()) / n;
388    Column::from_expr(e, Some("covar_pop".to_string()))
389}
390
391/// Pearson correlation aggregation (PySpark corr). Module-level; use in groupBy.agg() with two columns.
392pub fn corr(col1: &Column, col2: &Column) -> Column {
393    use polars::prelude::{len, lit, when};
394    let c1 = col1.expr().clone().cast(DataType::Float64);
395    let c2 = col2.expr().clone().cast(DataType::Float64);
396    let n = len().cast(DataType::Float64);
397    let n1 = (len() - lit(1)).cast(DataType::Float64);
398    let sum_ab = (c1.clone() * c2.clone()).sum();
399    let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
400    let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
401    let sum_a2 = (c1.clone() * c1).sum();
402    let sum_b2 = (c2.clone() * c2).sum();
403    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
404    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
405    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
406    let std_a = var_a.sqrt();
407    let std_b = var_b.sqrt();
408    let e = when(len().gt(lit(1)))
409        .then(cov_samp / (std_a * std_b))
410        .otherwise(lit(f64::NAN));
411    Column::from_expr(e, Some("corr".to_string()))
412}
413
414/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
415pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
416    use polars::prelude::{col as pl_col, len, lit, when};
417    let c1 = pl_col(col1).cast(DataType::Float64);
418    let c2 = pl_col(col2).cast(DataType::Float64);
419    let n = len().cast(DataType::Float64);
420    let sum_ab = (c1.clone() * c2.clone()).sum();
421    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
422    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
423    when(len().gt(lit(1)))
424        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
425        .otherwise(lit(f64::NAN))
426}
427
428/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
429pub fn corr_expr(col1: &str, col2: &str) -> Expr {
430    use polars::prelude::{col as pl_col, len, lit, when};
431    let c1 = pl_col(col1).cast(DataType::Float64);
432    let c2 = pl_col(col2).cast(DataType::Float64);
433    let n = len().cast(DataType::Float64);
434    let n1 = (len() - lit(1)).cast(DataType::Float64);
435    let sum_ab = (c1.clone() * c2.clone()).sum();
436    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
437    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
438    let sum_a2 = (c1.clone() * c1).sum();
439    let sum_b2 = (c2.clone() * c2).sum();
440    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
441    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
442    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
443    let std_a = var_a.sqrt();
444    let std_b = var_b.sqrt();
445    when(len().gt(lit(1)))
446        .then(cov_samp / (std_a * std_b))
447        .otherwise(lit(f64::NAN))
448}
449
450// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
451
452fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
453    use polars::prelude::col as pl_col;
454    let y = pl_col(y_col).cast(DataType::Float64);
455    let x = pl_col(x_col).cast(DataType::Float64);
456    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
457    let n = y
458        .clone()
459        .filter(cond.clone())
460        .count()
461        .cast(DataType::Float64);
462    let sum_x = x.clone().filter(cond.clone()).sum();
463    let sum_y = y.clone().filter(cond.clone()).sum();
464    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
465    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
466    let sum_xy = (x * y).filter(cond).sum();
467    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
468}
469
470/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
471pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
472    let (n, ..) = regr_cond_and_sums(y_col, x_col);
473    n
474}
475
476/// Regression: average of x (PySpark regr_avgx).
477pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
478    use polars::prelude::{lit, when};
479    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
480    when(n.clone().gt(lit(0.0)))
481        .then(sum_x / n)
482        .otherwise(lit(f64::NAN))
483}
484
485/// Regression: average of y (PySpark regr_avgy).
486pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
487    use polars::prelude::{lit, when};
488    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
489    when(n.clone().gt(lit(0.0)))
490        .then(sum_y / n)
491        .otherwise(lit(f64::NAN))
492}
493
494/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
495pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
496    use polars::prelude::{lit, when};
497    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
498    when(n.clone().gt(lit(0.0)))
499        .then(sum_xx - sum_x.clone() * sum_x / n)
500        .otherwise(lit(f64::NAN))
501}
502
503/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
504pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
505    use polars::prelude::{lit, when};
506    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
507    when(n.clone().gt(lit(0.0)))
508        .then(sum_yy - sum_y.clone() * sum_y / n)
509        .otherwise(lit(f64::NAN))
510}
511
512/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
513pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
514    use polars::prelude::{lit, when};
515    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
516    when(n.clone().gt(lit(0.0)))
517        .then(sum_xy - sum_x * sum_y / n)
518        .otherwise(lit(f64::NAN))
519}
520
521/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
522pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
523    use polars::prelude::{lit, when};
524    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
525    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
526    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
527    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
528        .then(regr_sxy / regr_sxx)
529        .otherwise(lit(f64::NAN))
530}
531
532/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
533pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
534    use polars::prelude::{lit, when};
535    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
536    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
537    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
538    let slope = regr_sxy.clone() / regr_sxx.clone();
539    let avg_y = sum_y / n.clone();
540    let avg_x = sum_x / n.clone();
541    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
542        .then(avg_y - slope * avg_x)
543        .otherwise(lit(f64::NAN))
544}
545
546/// Regression R-squared (PySpark regr_r2).
547pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
548    use polars::prelude::{lit, when};
549    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
550    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
551    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
552    let regr_sxy = sum_xy - sum_x * sum_y / n;
553    when(
554        regr_sxx
555            .clone()
556            .gt(lit(0.0))
557            .and(regr_syy.clone().gt(lit(0.0))),
558    )
559    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
560    .otherwise(lit(f64::NAN))
561}
562
563/// PySpark-style conditional expression builder.
564///
565/// # Example
566/// ```
567/// use robin_sparkless_expr::functions::{col, lit_i64, lit_str, when};
568///
569/// // when(condition).then(value).otherwise(fallback)
570/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
571///     .then(&lit_str("adult"))
572///     .otherwise(&lit_str("minor"));
573/// ```
574pub fn when(condition: &Column) -> WhenBuilder {
575    WhenBuilder::new(condition.expr().clone())
576}
577
578/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
579pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
580    use polars::prelude::*;
581    let null_expr = lit(NULL);
582    let expr = polars::prelude::when(condition.expr().clone())
583        .then(value.expr().clone())
584        .otherwise(null_expr);
585    crate::column::Column::from_expr(expr, None)
586}
587
588/// Builder for when-then-otherwise expressions
589pub struct WhenBuilder {
590    condition: Expr,
591}
592
593impl WhenBuilder {
594    fn new(condition: Expr) -> Self {
595        WhenBuilder { condition }
596    }
597
598    /// Specify the value when condition is true
599    pub fn then(self, value: &Column) -> ThenBuilder {
600        use polars::prelude::*;
601        let when_then = when(self.condition).then(value.expr().clone());
602        ThenBuilder::new(when_then)
603    }
604
605    /// Specify the value when condition is false
606    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
607    /// For this implementation, we require .then() to be called explicitly.
608    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
609    pub fn otherwise(self, _value: &Column) -> Column {
610        // This should not be called directly - when().otherwise() without .then() is not supported
611        // Users should use when(cond).then(val1).otherwise(val2)
612        panic!(
613            "when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)"
614        );
615    }
616}
617
618/// Builder for chaining when-then clauses before finalizing with otherwise
619pub struct ThenBuilder {
620    state: WhenThenState,
621}
622
623enum WhenThenState {
624    Single(Box<polars::prelude::Then>),
625    Chained(Box<polars::prelude::ChainedThen>),
626}
627
628/// Builder for an additional when-then clause (returned by ThenBuilder::when).
629pub struct ChainedWhenBuilder {
630    inner: polars::prelude::ChainedWhen,
631}
632
633impl ThenBuilder {
634    fn new(when_then: polars::prelude::Then) -> Self {
635        ThenBuilder {
636            state: WhenThenState::Single(Box::new(when_then)),
637        }
638    }
639
640    fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
641        ThenBuilder {
642            state: WhenThenState::Chained(Box::new(chained)),
643        }
644    }
645
646    /// Chain an additional when-then clause (PySpark: when(a).then(x).when(b).then(y).otherwise(z)).
647    pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
648        let chained_when = match self.state {
649            WhenThenState::Single(t) => t.when(condition.expr().clone()),
650            WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
651        };
652        ChainedWhenBuilder {
653            inner: chained_when,
654        }
655    }
656
657    /// Finalize the expression with the fallback value
658    pub fn otherwise(self, value: &Column) -> Column {
659        let expr = match self.state {
660            WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
661            WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
662        };
663        crate::column::Column::from_expr(expr, None)
664    }
665}
666
667impl ChainedWhenBuilder {
668    /// Set the value for the current when clause.
669    pub fn then(self, value: &Column) -> ThenBuilder {
670        ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
671    }
672}
673
674/// Convert string column to uppercase (PySpark upper)
675pub fn upper(column: &Column) -> Column {
676    column.clone().upper()
677}
678
679/// Convert string column to lowercase (PySpark lower)
680pub fn lower(column: &Column) -> Column {
681    column.clone().lower()
682}
683
684/// Substring with 1-based start (PySpark substring semantics)
685pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
686    column.clone().substr(start, length)
687}
688
689/// String length in characters (PySpark length)
690pub fn length(column: &Column) -> Column {
691    column.clone().length()
692}
693
694/// Trim leading and trailing whitespace (PySpark trim)
695pub fn trim(column: &Column) -> Column {
696    column.clone().trim()
697}
698
699/// Trim leading whitespace (PySpark ltrim)
700pub fn ltrim(column: &Column) -> Column {
701    column.clone().ltrim()
702}
703
704/// Trim trailing whitespace (PySpark rtrim)
705pub fn rtrim(column: &Column) -> Column {
706    column.clone().rtrim()
707}
708
709/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
710pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
711    column.clone().btrim(trim_str)
712}
713
714/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
715pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
716    column.clone().locate(substr, pos)
717}
718
719/// Base conversion (PySpark conv). num from from_base to to_base.
720pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
721    column.clone().conv(from_base, to_base)
722}
723
724/// Convert to hex string (PySpark hex).
725pub fn hex(column: &Column) -> Column {
726    column.clone().hex()
727}
728
729/// Convert hex string to binary/string (PySpark unhex).
730pub fn unhex(column: &Column) -> Column {
731    column.clone().unhex()
732}
733
734/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
735pub fn encode(column: &Column, charset: &str) -> Column {
736    column.clone().encode(charset)
737}
738
739/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
740pub fn decode(column: &Column, charset: &str) -> Column {
741    column.clone().decode(charset)
742}
743
744/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
745pub fn to_binary(column: &Column, fmt: &str) -> Column {
746    column.clone().to_binary(fmt)
747}
748
749/// Try convert to binary; null on failure (PySpark try_to_binary).
750pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
751    column.clone().try_to_binary(fmt)
752}
753
754/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
755pub fn aes_encrypt(column: &Column, key: &str) -> Column {
756    column.clone().aes_encrypt(key)
757}
758
759/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
760pub fn aes_decrypt(column: &Column, key: &str) -> Column {
761    column.clone().aes_decrypt(key)
762}
763
764/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
765pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
766    column.clone().try_aes_decrypt(key)
767}
768
769/// Convert integer to binary string (PySpark bin).
770pub fn bin(column: &Column) -> Column {
771    column.clone().bin()
772}
773
774/// Get bit at 0-based position (PySpark getbit).
775pub fn getbit(column: &Column, pos: i64) -> Column {
776    column.clone().getbit(pos)
777}
778
779/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
780pub fn bit_and(left: &Column, right: &Column) -> Column {
781    left.clone().bit_and(right)
782}
783
784/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
785pub fn bit_or(left: &Column, right: &Column) -> Column {
786    left.clone().bit_or(right)
787}
788
789/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
790pub fn bit_xor(left: &Column, right: &Column) -> Column {
791    left.clone().bit_xor(right)
792}
793
794/// Count of set bits in the integer representation (PySpark bit_count).
795pub fn bit_count(column: &Column) -> Column {
796    column.clone().bit_count()
797}
798
799/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
800pub fn bitwise_not(column: &Column) -> Column {
801    column.clone().bitwise_not()
802}
803
804// --- Bitmap (PySpark 3.5+) ---
805
806/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
807pub fn bitmap_bit_position(column: &Column) -> Column {
808    use polars::prelude::DataType;
809    let expr = column.expr().clone().cast(DataType::Int32);
810    Column::from_expr(expr, None)
811}
812
813/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
814pub fn bitmap_bucket_number(column: &Column) -> Column {
815    use polars::prelude::DataType;
816    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
817    Column::from_expr(expr, None)
818}
819
820/// Count set bits in a bitmap binary column (PySpark bitmap_count).
821pub fn bitmap_count(column: &Column) -> Column {
822    use polars::prelude::{DataType, Field};
823    let expr = column.expr().clone().map(
824        |s| crate::column::expect_col(crate::udfs::apply_bitmap_count(s)),
825        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
826    );
827    Column::from_expr(expr, None)
828}
829
830/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
831/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
832pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
833    use polars::prelude::{DataType, Field};
834    column.expr().clone().implode().map(
835        |s| crate::column::expect_col(crate::udfs::apply_bitmap_construct_agg(s)),
836        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
837    )
838}
839
840/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
841pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
842    use polars::prelude::{DataType, Field};
843    column.expr().clone().implode().map(
844        |s| crate::column::expect_col(crate::udfs::apply_bitmap_or_agg(s)),
845        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
846    )
847}
848
849/// Alias for getbit (PySpark bit_get).
850pub fn bit_get(column: &Column, pos: i64) -> Column {
851    getbit(column, pos)
852}
853
854/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
855/// When err_msg is Some, it is used in the error message when assertion fails.
856pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
857    column.clone().assert_true(err_msg)
858}
859
860/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
861pub fn raise_error(message: &str) -> Column {
862    let msg = message.to_string();
863    let expr = lit(0i64).map(
864        move |_col| -> PolarsResult<polars::prelude::Column> {
865            Err(PolarsError::ComputeError(msg.clone().into()))
866        },
867        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
868    );
869    Column::from_expr(expr, Some("raise_error".to_string()))
870}
871
872/// Stub partition id - always 0 (PySpark spark_partition_id).
873pub fn spark_partition_id() -> Column {
874    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
875}
876
877/// Stub input file name - empty string (PySpark input_file_name).
878pub fn input_file_name() -> Column {
879    Column::from_expr(lit(""), Some("input_file_name".to_string()))
880}
881
882/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
883/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
884pub fn monotonically_increasing_id() -> Column {
885    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
886}
887
888/// Current catalog name stub (PySpark current_catalog).
889pub fn current_catalog() -> Column {
890    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
891}
892
893/// Current database/schema name stub (PySpark current_database).
894pub fn current_database() -> Column {
895    Column::from_expr(lit("default"), Some("current_database".to_string()))
896}
897
898/// Current schema name stub (PySpark current_schema).
899pub fn current_schema() -> Column {
900    Column::from_expr(lit("default"), Some("current_schema".to_string()))
901}
902
903/// Current user stub (PySpark current_user).
904pub fn current_user() -> Column {
905    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
906}
907
908/// User stub (PySpark user).
909pub fn user() -> Column {
910    Column::from_expr(lit("unknown"), Some("user".to_string()))
911}
912
913/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
914/// When added via with_column, generates one distinct value per row (PySpark-like).
915pub fn rand(seed: Option<u64>) -> Column {
916    Column::from_rand(seed)
917}
918
919/// Random standard normal per row, with optional seed (PySpark randn).
920/// When added via with_column, generates one distinct value per row (PySpark-like).
921pub fn randn(seed: Option<u64>) -> Column {
922    Column::from_randn(seed)
923}
924
925/// Call a registered UDF by name. PySpark: F.call_udf(udfName, *cols).
926/// Requires a session (set by get_or_create). Raises if UDF not found.
927pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
928    use polars::prelude::Column as PlColumn;
929
930    let (registry, case_sensitive) =
931        crate::udf_context::get_thread_udf_context().ok_or_else(|| {
932            PolarsError::InvalidOperation(
933                "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
934            )
935        })?;
936
937    // Rust UDF: build lazy Expr
938    let udf = registry.get_rust_udf(name, case_sensitive).ok_or_else(|| {
939        PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
940    })?;
941
942    let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
943    let output_type = DataType::String; // PySpark default
944
945    let expr = if exprs.len() == 1 {
946        let udf = udf.clone();
947        exprs.into_iter().next().unwrap().map(
948            move |c| {
949                let s = c.take_materialized_series();
950                udf.apply(&[s]).map(|out| PlColumn::new("_".into(), out))
951            },
952            move |_schema, field| Ok(Field::new(field.name().clone(), output_type.clone())),
953        )
954    } else {
955        let udf = udf.clone();
956        let first = exprs[0].clone();
957        let rest: Vec<Expr> = exprs[1..].to_vec();
958        first.map_many(
959            move |columns| {
960                let series: Vec<Series> = columns
961                    .iter_mut()
962                    .map(|c| std::mem::take(c).take_materialized_series())
963                    .collect();
964                udf.apply(&series).map(|out| PlColumn::new("_".into(), out))
965            },
966            &rest,
967            move |_schema, fields| Ok(Field::new(fields[0].name().clone(), output_type.clone())),
968        )
969    };
970
971    Ok(Column::from_expr(expr, Some(format!("{name}()"))))
972}
973
974/// True if two arrays have any element in common (PySpark arrays_overlap).
975pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
976    left.clone().arrays_overlap(right)
977}
978
979/// Zip arrays into array of structs (PySpark arrays_zip).
980pub fn arrays_zip(left: &Column, right: &Column) -> Column {
981    left.clone().arrays_zip(right)
982}
983
984/// Explode; null/empty yields one row with null (PySpark explode_outer).
985pub fn explode_outer(column: &Column) -> Column {
986    column.clone().explode_outer()
987}
988
989/// Posexplode with null preservation (PySpark posexplode_outer).
990pub fn posexplode_outer(column: &Column) -> (Column, Column) {
991    column.clone().posexplode_outer()
992}
993
994/// Collect to array (PySpark array_agg).
995pub fn array_agg(column: &Column) -> Column {
996    column.clone().array_agg()
997}
998
999/// Transform map keys by expr (PySpark transform_keys).
1000pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
1001    column.clone().transform_keys(key_expr)
1002}
1003
1004/// Transform map values by expr (PySpark transform_values).
1005pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
1006    column.clone().transform_values(value_expr)
1007}
1008
1009/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
1010pub fn str_to_map(
1011    column: &Column,
1012    pair_delim: Option<&str>,
1013    key_value_delim: Option<&str>,
1014) -> Column {
1015    let pd = pair_delim.unwrap_or(",");
1016    let kvd = key_value_delim.unwrap_or(":");
1017    column.clone().str_to_map(pd, kvd)
1018}
1019
1020/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
1021pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
1022    column.clone().regexp_extract(pattern, group_index)
1023}
1024
1025/// Replace first match of regex (PySpark regexp_replace)
1026pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
1027    column.clone().regexp_replace(pattern, replacement)
1028}
1029
1030/// Split string by delimiter (PySpark split). Optional limit: at most that many parts (remainder in last).
1031pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
1032    column.clone().split(delimiter, limit)
1033}
1034
1035/// Title case (PySpark initcap)
1036pub fn initcap(column: &Column) -> Column {
1037    column.clone().initcap()
1038}
1039
1040/// Extract all matches of regex (PySpark regexp_extract_all).
1041pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
1042    column.clone().regexp_extract_all(pattern)
1043}
1044
1045/// Check if string matches regex (PySpark regexp_like / rlike).
1046pub fn regexp_like(column: &Column, pattern: &str) -> Column {
1047    column.clone().regexp_like(pattern)
1048}
1049
1050/// Count of non-overlapping regex matches (PySpark regexp_count).
1051pub fn regexp_count(column: &Column, pattern: &str) -> Column {
1052    column.clone().regexp_count(pattern)
1053}
1054
1055/// First substring matching regex (PySpark regexp_substr). Null if no match.
1056pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
1057    column.clone().regexp_substr(pattern)
1058}
1059
1060/// Split by delimiter and return 1-based part (PySpark split_part).
1061pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
1062    column.clone().split_part(delimiter, part_num)
1063}
1064
1065/// 1-based position of first regex match (PySpark regexp_instr).
1066pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
1067    column.clone().regexp_instr(pattern, group_idx)
1068}
1069
1070/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
1071pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
1072    str_column.clone().find_in_set(set_column)
1073}
1074
1075/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%.
1076pub fn format_string(format: &str, columns: &[&Column]) -> Column {
1077    use polars::prelude::*;
1078    if columns.is_empty() {
1079        panic!("format_string needs at least one column");
1080    }
1081    let format_owned = format.to_string();
1082    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
1083    let expr = columns[0].expr().clone().map_many(
1084        move |cols| {
1085            crate::column::expect_col(crate::udfs::apply_format_string(cols, &format_owned))
1086        },
1087        &args,
1088        |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::String)),
1089    );
1090    crate::column::Column::from_expr(expr, None)
1091}
1092
1093/// Alias for format_string (PySpark printf).
1094pub fn printf(format: &str, columns: &[&Column]) -> Column {
1095    format_string(format, columns)
1096}
1097
1098/// Repeat string n times (PySpark repeat).
1099pub fn repeat(column: &Column, n: i32) -> Column {
1100    column.clone().repeat(n)
1101}
1102
1103/// Reverse string (PySpark reverse).
1104pub fn reverse(column: &Column) -> Column {
1105    column.clone().reverse()
1106}
1107
1108/// Find substring position 1-based; 0 if not found (PySpark instr).
1109pub fn instr(column: &Column, substr: &str) -> Column {
1110    column.clone().instr(substr)
1111}
1112
1113/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
1114pub fn position(substr: &str, column: &Column) -> Column {
1115    column.clone().instr(substr)
1116}
1117
1118/// ASCII value of first character (PySpark ascii). Returns Int32.
1119pub fn ascii(column: &Column) -> Column {
1120    column.clone().ascii()
1121}
1122
1123/// Format numeric as string with fixed decimal places (PySpark format_number).
1124pub fn format_number(column: &Column, decimals: u32) -> Column {
1125    column.clone().format_number(decimals)
1126}
1127
1128/// Replace substring at 1-based position (PySpark overlay). replace is literal.
1129pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
1130    column.clone().overlay(replace, pos, length)
1131}
1132
1133/// Int to single-character string (PySpark char). Valid codepoint only.
1134pub fn char(column: &Column) -> Column {
1135    column.clone().char()
1136}
1137
1138/// Alias for char (PySpark chr).
1139pub fn chr(column: &Column) -> Column {
1140    column.clone().chr()
1141}
1142
1143/// Base64 encode string bytes (PySpark base64).
1144pub fn base64(column: &Column) -> Column {
1145    column.clone().base64()
1146}
1147
1148/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1149pub fn unbase64(column: &Column) -> Column {
1150    column.clone().unbase64()
1151}
1152
1153/// SHA1 hash of string bytes, return hex string (PySpark sha1).
1154pub fn sha1(column: &Column) -> Column {
1155    column.clone().sha1()
1156}
1157
1158/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
1159pub fn sha2(column: &Column, bit_length: i32) -> Column {
1160    column.clone().sha2(bit_length)
1161}
1162
1163/// MD5 hash of string bytes, return hex string (PySpark md5).
1164pub fn md5(column: &Column) -> Column {
1165    column.clone().md5()
1166}
1167
1168/// Left-pad string to length with pad char (PySpark lpad).
1169pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1170    column.clone().lpad(length, pad)
1171}
1172
1173/// Right-pad string to length with pad char (PySpark rpad).
1174pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1175    column.clone().rpad(length, pad)
1176}
1177
1178/// Character-by-character translation (PySpark translate).
1179pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1180    column.clone().translate(from_str, to_str)
1181}
1182
1183/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
1184pub fn mask(
1185    column: &Column,
1186    upper_char: Option<char>,
1187    lower_char: Option<char>,
1188    digit_char: Option<char>,
1189    other_char: Option<char>,
1190) -> Column {
1191    column
1192        .clone()
1193        .mask(upper_char, lower_char, digit_char, other_char)
1194}
1195
1196/// Substring before/after nth delimiter (PySpark substring_index).
1197pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1198    column.clone().substring_index(delimiter, count)
1199}
1200
1201/// Leftmost n characters (PySpark left).
1202pub fn left(column: &Column, n: i64) -> Column {
1203    column.clone().left(n)
1204}
1205
1206/// Rightmost n characters (PySpark right).
1207pub fn right(column: &Column, n: i64) -> Column {
1208    column.clone().right(n)
1209}
1210
1211/// Replace all occurrences of search with replacement (literal). PySpark replace.
1212pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1213    column.clone().replace(search, replacement)
1214}
1215
1216/// True if string starts with prefix (PySpark startswith).
1217pub fn startswith(column: &Column, prefix: &str) -> Column {
1218    column.clone().startswith(prefix)
1219}
1220
1221/// True if string ends with suffix (PySpark endswith).
1222pub fn endswith(column: &Column, suffix: &str) -> Column {
1223    column.clone().endswith(suffix)
1224}
1225
1226/// True if string contains substring (literal). PySpark contains.
1227pub fn contains(column: &Column, substring: &str) -> Column {
1228    column.clone().contains(substring)
1229}
1230
1231/// SQL LIKE pattern (% any, _ one char). PySpark like.
1232/// When escape_char is Some(esc), esc + char treats that char as literal.
1233pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1234    column.clone().like(pattern, escape_char)
1235}
1236
1237/// Case-insensitive LIKE. PySpark ilike.
1238/// When escape_char is Some(esc), esc + char treats that char as literal.
1239pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1240    column.clone().ilike(pattern, escape_char)
1241}
1242
1243/// Alias for regexp_like. PySpark rlike / regexp.
1244pub fn rlike(column: &Column, pattern: &str) -> Column {
1245    column.clone().regexp_like(pattern)
1246}
1247
1248/// Alias for rlike (PySpark regexp).
1249pub fn regexp(column: &Column, pattern: &str) -> Column {
1250    rlike(column, pattern)
1251}
1252
1253/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1254pub fn soundex(column: &Column) -> Column {
1255    column.clone().soundex()
1256}
1257
1258/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1259pub fn levenshtein(column: &Column, other: &Column) -> Column {
1260    column.clone().levenshtein(other)
1261}
1262
1263/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1264pub fn crc32(column: &Column) -> Column {
1265    column.clone().crc32()
1266}
1267
1268/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1269pub fn xxhash64(column: &Column) -> Column {
1270    column.clone().xxhash64()
1271}
1272
1273/// Absolute value (PySpark abs)
1274pub fn abs(column: &Column) -> Column {
1275    column.clone().abs()
1276}
1277
1278/// Ceiling (PySpark ceil)
1279pub fn ceil(column: &Column) -> Column {
1280    column.clone().ceil()
1281}
1282
1283/// Floor (PySpark floor)
1284pub fn floor(column: &Column) -> Column {
1285    column.clone().floor()
1286}
1287
1288/// Round (PySpark round)
1289pub fn round(column: &Column, decimals: u32) -> Column {
1290    column.clone().round(decimals)
1291}
1292
1293/// Banker's rounding - round half to even (PySpark bround).
1294pub fn bround(column: &Column, scale: i32) -> Column {
1295    column.clone().bround(scale)
1296}
1297
1298/// Unary minus / negate (PySpark negate, negative).
1299pub fn negate(column: &Column) -> Column {
1300    column.clone().negate()
1301}
1302
1303/// Alias for negate. PySpark negative.
1304pub fn negative(column: &Column) -> Column {
1305    negate(column)
1306}
1307
1308/// Unary plus - no-op, returns column as-is (PySpark positive).
1309pub fn positive(column: &Column) -> Column {
1310    column.clone()
1311}
1312
1313/// Cotangent: 1/tan (PySpark cot).
1314pub fn cot(column: &Column) -> Column {
1315    column.clone().cot()
1316}
1317
1318/// Cosecant: 1/sin (PySpark csc).
1319pub fn csc(column: &Column) -> Column {
1320    column.clone().csc()
1321}
1322
1323/// Secant: 1/cos (PySpark sec).
1324pub fn sec(column: &Column) -> Column {
1325    column.clone().sec()
1326}
1327
1328/// Constant e = 2.718... (PySpark e).
1329pub fn e() -> Column {
1330    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1331}
1332
1333/// Constant pi = 3.14159... (PySpark pi).
1334pub fn pi() -> Column {
1335    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1336}
1337
1338/// Square root (PySpark sqrt)
1339pub fn sqrt(column: &Column) -> Column {
1340    column.clone().sqrt()
1341}
1342
1343/// Power (PySpark pow)
1344pub fn pow(column: &Column, exp: i64) -> Column {
1345    column.clone().pow(exp)
1346}
1347
1348/// Exponential (PySpark exp)
1349pub fn exp(column: &Column) -> Column {
1350    column.clone().exp()
1351}
1352
1353/// Natural logarithm (PySpark log with one arg)
1354pub fn log(column: &Column) -> Column {
1355    column.clone().log()
1356}
1357
1358/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1359pub fn log_with_base(column: &Column, base: f64) -> Column {
1360    crate::column::Column::from_expr(column.expr().clone().log(lit(base)), None)
1361}
1362
1363/// Sine in radians (PySpark sin)
1364pub fn sin(column: &Column) -> Column {
1365    column.clone().sin()
1366}
1367
1368/// Cosine in radians (PySpark cos)
1369pub fn cos(column: &Column) -> Column {
1370    column.clone().cos()
1371}
1372
1373/// Tangent in radians (PySpark tan)
1374pub fn tan(column: &Column) -> Column {
1375    column.clone().tan()
1376}
1377
1378/// Arc sine (PySpark asin)
1379pub fn asin(column: &Column) -> Column {
1380    column.clone().asin()
1381}
1382
1383/// Arc cosine (PySpark acos)
1384pub fn acos(column: &Column) -> Column {
1385    column.clone().acos()
1386}
1387
1388/// Arc tangent (PySpark atan)
1389pub fn atan(column: &Column) -> Column {
1390    column.clone().atan()
1391}
1392
1393/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1394pub fn atan2(y: &Column, x: &Column) -> Column {
1395    y.clone().atan2(x)
1396}
1397
1398/// Convert radians to degrees (PySpark degrees)
1399pub fn degrees(column: &Column) -> Column {
1400    column.clone().degrees()
1401}
1402
1403/// Convert degrees to radians (PySpark radians)
1404pub fn radians(column: &Column) -> Column {
1405    column.clone().radians()
1406}
1407
1408/// Sign of the number: -1, 0, or 1 (PySpark signum)
1409pub fn signum(column: &Column) -> Column {
1410    column.clone().signum()
1411}
1412
1413/// Alias for signum (PySpark sign).
1414pub fn sign(column: &Column) -> Column {
1415    signum(column)
1416}
1417
1418/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1419/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1420/// String-to-date accepts date and datetime strings (e.g. "2025-01-01 10:30:00" truncates to date) for Spark parity.
1421pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1422    let dtype = parse_type_name(type_name)?;
1423    if dtype == DataType::Boolean {
1424        let expr = column.expr().clone().map(
1425            |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, true)),
1426            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1427        );
1428        return Ok(Column::from_expr(expr, None));
1429    }
1430    if dtype == DataType::Date {
1431        let expr = column.expr().clone().map(
1432            |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, true)),
1433            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1434        );
1435        return Ok(Column::from_expr(expr, None));
1436    }
1437    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1438        let target = dtype.clone();
1439        // cast: strict=true – invalid strings should error (PySpark parity).
1440        let expr = column.expr().clone().map(
1441            move |col| {
1442                crate::column::expect_col(crate::udfs::apply_string_to_int(
1443                    col,
1444                    true,
1445                    target.clone(),
1446                ))
1447            },
1448            move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1449        );
1450        return Ok(Column::from_expr(expr, None));
1451    }
1452    if dtype == DataType::Float64 {
1453        // String-to-double uses custom parsing for Spark-style to_number semantics.
1454        let expr = column.expr().clone().map(
1455            |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, true)),
1456            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1457        );
1458        return Ok(Column::from_expr(expr, None));
1459    }
1460    Ok(Column::from_expr(
1461        column.expr().clone().strict_cast(dtype),
1462        None,
1463    ))
1464}
1465
1466/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1467/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1468/// String-to-date accepts date and datetime strings; invalid strings become null.
1469pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1470    let dtype = parse_type_name(type_name)?;
1471    if dtype == DataType::Boolean {
1472        let expr = column.expr().clone().map(
1473            |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, false)),
1474            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1475        );
1476        return Ok(Column::from_expr(expr, None));
1477    }
1478    if dtype == DataType::Date {
1479        let expr = column.expr().clone().map(
1480            |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, false)),
1481            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1482        );
1483        return Ok(Column::from_expr(expr, None));
1484    }
1485    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1486        let target = dtype.clone();
1487        let expr = column.expr().clone().map(
1488            move |col| {
1489                crate::column::expect_col(crate::udfs::apply_string_to_int(
1490                    col,
1491                    false,
1492                    target.clone(),
1493                ))
1494            },
1495            move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1496        );
1497        return Ok(Column::from_expr(expr, None));
1498    }
1499    if dtype == DataType::Float64 {
1500        let expr = column.expr().clone().map(
1501            |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, false)),
1502            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1503        );
1504        return Ok(Column::from_expr(expr, None));
1505    }
1506    Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1507}
1508
1509/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1510/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1511/// Returns Err if the cast to string fails (invalid type name or unsupported column type).
1512pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1513    match format {
1514        Some(fmt) => Ok(column
1515            .clone()
1516            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1517        None => cast(column, "string"),
1518    }
1519}
1520
1521/// Alias for to_char (PySpark to_varchar).
1522pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1523    to_char(column, format)
1524}
1525
1526/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1527/// Returns Err if the cast to double fails (invalid type name or unsupported column type).
1528pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1529    cast(column, "double")
1530}
1531
1532/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1533/// Returns Err if the try_cast setup fails (invalid type name); column values that cannot be parsed become null.
1534pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1535    try_cast(column, "double")
1536}
1537
1538/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1539/// When format is None, parses string columns with default format "%Y-%m-%d %H:%M:%S" (PySpark parity #273).
1540pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1541    use polars::prelude::{DataType, Field, TimeUnit};
1542    let fmt_owned = format.map(|s| s.to_string());
1543    let expr = column.expr().clone().map(
1544        move |s| {
1545            crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1546                s,
1547                fmt_owned.as_deref(),
1548                true,
1549            ))
1550        },
1551        |_schema, field| {
1552            Ok(Field::new(
1553                field.name().clone(),
1554                DataType::Datetime(TimeUnit::Microseconds, None),
1555            ))
1556        },
1557    );
1558    Ok(crate::column::Column::from_expr(expr, None))
1559}
1560
1561/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1562/// When format is None, parses string columns with default format (null on invalid). #273
1563pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1564    use polars::prelude::*;
1565    let fmt_owned = format.map(|s| s.to_string());
1566    let expr = column.expr().clone().map(
1567        move |s| {
1568            crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1569                s,
1570                fmt_owned.as_deref(),
1571                false,
1572            ))
1573        },
1574        |_schema, field| {
1575            Ok(Field::new(
1576                field.name().clone(),
1577                DataType::Datetime(TimeUnit::Microseconds, None),
1578            ))
1579        },
1580    );
1581    Ok(crate::column::Column::from_expr(expr, None))
1582}
1583
1584/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1585pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1586    use polars::prelude::{DataType, Field, TimeUnit};
1587    match format {
1588        None => crate::cast(column, "timestamp"),
1589        Some(fmt) => {
1590            let fmt_owned = fmt.to_string();
1591            let expr = column.expr().clone().map(
1592                move |s| {
1593                    crate::column::expect_col(crate::udfs::apply_to_timestamp_ltz_format(
1594                        s,
1595                        Some(&fmt_owned),
1596                        true,
1597                    ))
1598                },
1599                |_schema, field| {
1600                    Ok(Field::new(
1601                        field.name().clone(),
1602                        DataType::Datetime(TimeUnit::Microseconds, None),
1603                    ))
1604                },
1605            );
1606            Ok(crate::column::Column::from_expr(expr, None))
1607        }
1608    }
1609}
1610
1611/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1612pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1613    use polars::prelude::{DataType, Field, TimeUnit};
1614    match format {
1615        None => crate::cast(column, "timestamp"),
1616        Some(fmt) => {
1617            let fmt_owned = fmt.to_string();
1618            let expr = column.expr().clone().map(
1619                move |s| {
1620                    crate::column::expect_col(crate::udfs::apply_to_timestamp_ntz_format(
1621                        s,
1622                        Some(&fmt_owned),
1623                        true,
1624                    ))
1625                },
1626                |_schema, field| {
1627                    Ok(Field::new(
1628                        field.name().clone(),
1629                        DataType::Datetime(TimeUnit::Microseconds, None),
1630                    ))
1631                },
1632            );
1633            Ok(crate::column::Column::from_expr(expr, None))
1634        }
1635    }
1636}
1637
1638/// Division that returns null on divide-by-zero (PySpark try_divide).
1639pub fn try_divide(left: &Column, right: &Column) -> Column {
1640    use polars::prelude::*;
1641    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1642    let null_expr = lit(NULL);
1643    let div_expr =
1644        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1645    let expr = polars::prelude::when(zero_cond)
1646        .then(null_expr)
1647        .otherwise(div_expr);
1648    crate::column::Column::from_expr(expr, None)
1649}
1650
1651/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1652pub fn try_add(left: &Column, right: &Column) -> Column {
1653    let args = [right.expr().clone()];
1654    let expr = left.expr().clone().map_many(
1655        |cols| crate::column::expect_col(crate::udfs::apply_try_add(cols)),
1656        &args,
1657        |_schema, fields| Ok(fields[0].clone()),
1658    );
1659    Column::from_expr(expr, None)
1660}
1661
1662/// Subtract that returns null on overflow (PySpark try_subtract).
1663pub fn try_subtract(left: &Column, right: &Column) -> Column {
1664    let args = [right.expr().clone()];
1665    let expr = left.expr().clone().map_many(
1666        |cols| crate::column::expect_col(crate::udfs::apply_try_subtract(cols)),
1667        &args,
1668        |_schema, fields| Ok(fields[0].clone()),
1669    );
1670    Column::from_expr(expr, None)
1671}
1672
1673/// Multiply that returns null on overflow (PySpark try_multiply).
1674pub fn try_multiply(left: &Column, right: &Column) -> Column {
1675    let args = [right.expr().clone()];
1676    let expr = left.expr().clone().map_many(
1677        |cols| crate::column::expect_col(crate::udfs::apply_try_multiply(cols)),
1678        &args,
1679        |_schema, fields| Ok(fields[0].clone()),
1680    );
1681    Column::from_expr(expr, None)
1682}
1683
1684/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1685pub fn try_element_at(column: &Column, index: i64) -> Column {
1686    column.clone().element_at(index)
1687}
1688
1689/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1690pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1691    if num_bucket <= 0 {
1692        panic!(
1693            "width_bucket: num_bucket must be positive, got {}",
1694            num_bucket
1695        );
1696    }
1697    use polars::prelude::*;
1698    let v = value.expr().clone().cast(DataType::Float64);
1699    let min_expr = lit(min_val);
1700    let max_expr = lit(max_val);
1701    let nb = num_bucket as f64;
1702    let width = (max_val - min_val) / nb;
1703    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1704    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1705    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1706    let expr = polars::prelude::when(v.clone().lt(min_expr))
1707        .then(lit(0i64))
1708        .when(v.gt_eq(max_expr))
1709        .then(lit(num_bucket + 1))
1710        .otherwise(bucket_clamped);
1711    crate::column::Column::from_expr(expr, None)
1712}
1713
1714/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1715pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1716    use polars::prelude::*;
1717    if columns.is_empty() {
1718        panic!("elt requires at least one column");
1719    }
1720    let idx_expr = index.expr().clone();
1721    let null_expr = lit(NULL);
1722    let mut expr = null_expr;
1723    for (i, c) in columns.iter().enumerate().rev() {
1724        let n = (i + 1) as i64;
1725        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1726            .then(c.expr().clone())
1727            .otherwise(expr);
1728    }
1729    crate::column::Column::from_expr(expr, None)
1730}
1731
1732/// Bit length of string (bytes * 8) (PySpark bit_length).
1733pub fn bit_length(column: &Column) -> Column {
1734    column.clone().bit_length()
1735}
1736
1737/// Length of string in bytes (PySpark octet_length).
1738pub fn octet_length(column: &Column) -> Column {
1739    column.clone().octet_length()
1740}
1741
1742/// Length of string in characters (PySpark char_length). Alias of length().
1743pub fn char_length(column: &Column) -> Column {
1744    column.clone().char_length()
1745}
1746
1747/// Length of string in characters (PySpark character_length). Alias of length().
1748pub fn character_length(column: &Column) -> Column {
1749    column.clone().character_length()
1750}
1751
1752/// Data type of column as string (PySpark typeof). Constant per column from schema.
1753pub fn typeof_(column: &Column) -> Column {
1754    column.clone().typeof_()
1755}
1756
1757/// True where the float value is NaN (PySpark isnan).
1758pub fn isnan(column: &Column) -> Column {
1759    column.clone().is_nan()
1760}
1761
1762/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1763pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1764    if columns.is_empty() {
1765        return Err("greatest requires at least one column".to_string());
1766    }
1767    if columns.len() == 1 {
1768        return Ok((*columns[0]).clone());
1769    }
1770    let mut expr = columns[0].expr().clone();
1771    for c in columns.iter().skip(1) {
1772        let args = [c.expr().clone()];
1773        expr = expr.map_many(
1774            |cols| crate::column::expect_col(crate::udfs::apply_greatest2(cols)),
1775            &args,
1776            |_schema, fields| Ok(fields[0].clone()),
1777        );
1778    }
1779    Ok(Column::from_expr(expr, None))
1780}
1781
1782/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1783pub fn least(columns: &[&Column]) -> Result<Column, String> {
1784    if columns.is_empty() {
1785        return Err("least requires at least one column".to_string());
1786    }
1787    if columns.len() == 1 {
1788        return Ok((*columns[0]).clone());
1789    }
1790    let mut expr = columns[0].expr().clone();
1791    for c in columns.iter().skip(1) {
1792        let args = [c.expr().clone()];
1793        expr = expr.map_many(
1794            |cols| crate::column::expect_col(crate::udfs::apply_least2(cols)),
1795            &args,
1796            |_schema, fields| Ok(fields[0].clone()),
1797        );
1798    }
1799    Ok(Column::from_expr(expr, None))
1800}
1801
1802/// Extract year from datetime column (PySpark year)
1803pub fn year(column: &Column) -> Column {
1804    column.clone().year()
1805}
1806
1807/// Extract month from datetime column (PySpark month)
1808pub fn month(column: &Column) -> Column {
1809    column.clone().month()
1810}
1811
1812/// Extract day of month from datetime column (PySpark day)
1813pub fn day(column: &Column) -> Column {
1814    column.clone().day()
1815}
1816
1817/// Cast or parse to date (PySpark to_date). When format is None: cast date/datetime to date, parse string with default formats. When format is Some: parse string with given format.
1818pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1819    let fmt = format.map(|s| s.to_string());
1820    let expr = column.expr().clone().map(
1821        move |col| {
1822            crate::column::expect_col(crate::udfs::apply_string_to_date_format(
1823                col,
1824                fmt.as_deref(),
1825                false,
1826            ))
1827        },
1828        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1829    );
1830    Ok(Column::from_expr(expr, None))
1831}
1832
1833/// Format date/datetime as string (PySpark date_format). Accepts PySpark/Java SimpleDateFormat style (e.g. "yyyy-MM") and converts to chrono strftime internally.
1834pub fn date_format(column: &Column, format: &str) -> Column {
1835    column
1836        .clone()
1837        .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1838}
1839
1840/// Current date (evaluation time). PySpark current_date.
1841pub fn current_date() -> Column {
1842    use polars::prelude::*;
1843    let today = chrono::Utc::now().date_naive();
1844    let days = (today - robin_sparkless_core::date_utils::epoch_naive_date()).num_days() as i32;
1845    crate::column::Column::from_expr(
1846        Expr::Literal(LiteralValue::Scalar(Scalar::new_date(days))),
1847        None,
1848    )
1849}
1850
1851/// Current timestamp (evaluation time). PySpark current_timestamp.
1852pub fn current_timestamp() -> Column {
1853    use polars::prelude::*;
1854    let ts = chrono::Utc::now().timestamp_micros();
1855    crate::column::Column::from_expr(
1856        Expr::Literal(LiteralValue::Scalar(Scalar::new_datetime(
1857            ts,
1858            TimeUnit::Microseconds,
1859            None,
1860        ))),
1861        None,
1862    )
1863}
1864
1865/// Alias for current_date (PySpark curdate).
1866pub fn curdate() -> Column {
1867    current_date()
1868}
1869
1870/// Alias for current_timestamp (PySpark now).
1871pub fn now() -> Column {
1872    current_timestamp()
1873}
1874
1875/// Alias for current_timestamp (PySpark localtimestamp).
1876pub fn localtimestamp() -> Column {
1877    current_timestamp()
1878}
1879
1880/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1881pub fn date_diff(end: &Column, start: &Column) -> Column {
1882    datediff(end, start)
1883}
1884
1885/// Alias for date_add (PySpark dateadd).
1886pub fn dateadd(column: &Column, n: i32) -> Column {
1887    date_add(column, n)
1888}
1889
1890/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1891pub fn extract(column: &Column, field: &str) -> Column {
1892    column.clone().extract(field)
1893}
1894
1895/// Alias for extract (PySpark date_part).
1896pub fn date_part(column: &Column, field: &str) -> Column {
1897    extract(column, field)
1898}
1899
1900/// Alias for extract (PySpark datepart).
1901pub fn datepart(column: &Column, field: &str) -> Column {
1902    extract(column, field)
1903}
1904
1905/// Timestamp to microseconds since epoch (PySpark unix_micros).
1906pub fn unix_micros(column: &Column) -> Column {
1907    column.clone().unix_micros()
1908}
1909
1910/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1911pub fn unix_millis(column: &Column) -> Column {
1912    column.clone().unix_millis()
1913}
1914
1915/// Timestamp to seconds since epoch (PySpark unix_seconds).
1916pub fn unix_seconds(column: &Column) -> Column {
1917    column.clone().unix_seconds()
1918}
1919
1920/// Weekday name "Mon","Tue",... (PySpark dayname).
1921pub fn dayname(column: &Column) -> Column {
1922    column.clone().dayname()
1923}
1924
1925/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1926pub fn weekday(column: &Column) -> Column {
1927    column.clone().weekday()
1928}
1929
1930/// Extract hour from datetime column (PySpark hour).
1931pub fn hour(column: &Column) -> Column {
1932    column.clone().hour()
1933}
1934
1935/// Extract minute from datetime column (PySpark minute).
1936pub fn minute(column: &Column) -> Column {
1937    column.clone().minute()
1938}
1939
1940/// Extract second from datetime column (PySpark second).
1941pub fn second(column: &Column) -> Column {
1942    column.clone().second()
1943}
1944
1945/// Add n days to date column (PySpark date_add).
1946pub fn date_add(column: &Column, n: i32) -> Column {
1947    column.clone().date_add(n)
1948}
1949
1950/// Subtract n days from date column (PySpark date_sub).
1951pub fn date_sub(column: &Column, n: i32) -> Column {
1952    column.clone().date_sub(n)
1953}
1954
1955/// Number of days between two date columns (PySpark datediff).
1956pub fn datediff(end: &Column, start: &Column) -> Column {
1957    start.clone().datediff(end)
1958}
1959
1960/// Last day of month for date column (PySpark last_day).
1961pub fn last_day(column: &Column) -> Column {
1962    column.clone().last_day()
1963}
1964
1965/// Truncate date/datetime to unit (PySpark trunc).
1966pub fn trunc(column: &Column, format: &str) -> Column {
1967    column.clone().trunc(format)
1968}
1969
1970/// Alias for trunc (PySpark date_trunc).
1971pub fn date_trunc(format: &str, column: &Column) -> Column {
1972    trunc(column, format)
1973}
1974
1975/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1976pub fn quarter(column: &Column) -> Column {
1977    column.clone().quarter()
1978}
1979
1980/// Extract ISO week of year (1-53) (PySpark weekofyear).
1981pub fn weekofyear(column: &Column) -> Column {
1982    column.clone().weekofyear()
1983}
1984
1985/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1986pub fn dayofweek(column: &Column) -> Column {
1987    column.clone().dayofweek()
1988}
1989
1990/// Extract day of year (1-366) (PySpark dayofyear).
1991pub fn dayofyear(column: &Column) -> Column {
1992    column.clone().dayofyear()
1993}
1994
1995/// Add n months to date column (PySpark add_months).
1996pub fn add_months(column: &Column, n: i32) -> Column {
1997    column.clone().add_months(n)
1998}
1999
2000/// Months between end and start dates as fractional (PySpark months_between).
2001/// When round_off is true, rounds to 8 decimal places (PySpark default).
2002pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
2003    end.clone().months_between(start, round_off)
2004}
2005
2006/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
2007pub fn next_day(column: &Column, day_of_week: &str) -> Column {
2008    column.clone().next_day(day_of_week)
2009}
2010
2011/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
2012pub fn unix_timestamp_now() -> Column {
2013    use polars::prelude::*;
2014    let secs = chrono::Utc::now().timestamp();
2015    crate::column::Column::from_expr(lit(secs), None)
2016}
2017
2018/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
2019pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2020    column.clone().unix_timestamp(format)
2021}
2022
2023/// Alias for unix_timestamp.
2024pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2025    unix_timestamp(column, format)
2026}
2027
2028/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
2029pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
2030    column.clone().from_unixtime(format)
2031}
2032
2033/// Build date from year, month, day columns (PySpark make_date).
2034pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
2035    use polars::prelude::*;
2036    let args = [month.expr().clone(), day.expr().clone()];
2037    let expr = year.expr().clone().map_many(
2038        |cols| crate::column::expect_col(crate::udfs::apply_make_date(cols)),
2039        &args,
2040        |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Date)),
2041    );
2042    crate::column::Column::from_expr(expr, None)
2043}
2044
2045/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
2046/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
2047pub fn make_timestamp(
2048    year: &Column,
2049    month: &Column,
2050    day: &Column,
2051    hour: &Column,
2052    minute: &Column,
2053    sec: &Column,
2054    timezone: Option<&str>,
2055) -> Column {
2056    use polars::prelude::*;
2057    let tz_owned = timezone.map(|s| s.to_string());
2058    let args = [
2059        month.expr().clone(),
2060        day.expr().clone(),
2061        hour.expr().clone(),
2062        minute.expr().clone(),
2063        sec.expr().clone(),
2064    ];
2065    let expr = year.expr().clone().map_many(
2066        move |cols| {
2067            crate::column::expect_col(crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()))
2068        },
2069        &args,
2070        |_schema, fields| {
2071            Ok(Field::new(
2072                fields[0].name().clone(),
2073                DataType::Datetime(TimeUnit::Microseconds, None),
2074            ))
2075        },
2076    );
2077    crate::column::Column::from_expr(expr, None)
2078}
2079
2080/// Add amount of unit to timestamp (PySpark timestampadd).
2081pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
2082    ts.clone().timestampadd(unit, amount)
2083}
2084
2085/// Difference between timestamps in unit (PySpark timestampdiff).
2086pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
2087    start.clone().timestampdiff(unit, end)
2088}
2089
2090/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
2091pub fn days(n: i64) -> Column {
2092    make_interval(0, 0, 0, n, 0, 0, 0)
2093}
2094
2095/// Interval of n hours (PySpark hours).
2096pub fn hours(n: i64) -> Column {
2097    make_interval(0, 0, 0, 0, n, 0, 0)
2098}
2099
2100/// Interval of n minutes (PySpark minutes).
2101pub fn minutes(n: i64) -> Column {
2102    make_interval(0, 0, 0, 0, 0, n, 0)
2103}
2104
2105/// Interval of n months (PySpark months). Approximated as 30*n days.
2106pub fn months(n: i64) -> Column {
2107    make_interval(0, n, 0, 0, 0, 0, 0)
2108}
2109
2110/// Interval of n years (PySpark years). Approximated as 365*n days.
2111pub fn years(n: i64) -> Column {
2112    make_interval(n, 0, 0, 0, 0, 0, 0)
2113}
2114
2115/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
2116pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
2117    column.clone().from_utc_timestamp(tz)
2118}
2119
2120/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
2121pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
2122    column.clone().to_utc_timestamp(tz)
2123}
2124
2125/// Convert timestamp between timezones (PySpark convert_timezone).
2126pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
2127    let source_tz = source_tz.to_string();
2128    let target_tz = target_tz.to_string();
2129    let expr = column.expr().clone().map(
2130        move |s| {
2131            crate::column::expect_col(crate::udfs::apply_convert_timezone(
2132                s, &source_tz, &target_tz,
2133            ))
2134        },
2135        |_schema, field| Ok(field.clone()),
2136    );
2137    crate::column::Column::from_expr(expr, None)
2138}
2139
2140/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
2141pub fn current_timezone() -> Column {
2142    use polars::prelude::*;
2143    crate::column::Column::from_expr(lit("UTC"), None)
2144}
2145
2146/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
2147pub fn make_interval(
2148    years: i64,
2149    months: i64,
2150    weeks: i64,
2151    days: i64,
2152    hours: i64,
2153    mins: i64,
2154    secs: i64,
2155) -> Column {
2156    use polars::prelude::*;
2157    // Approximate: 1 year = 365 days, 1 month = 30 days
2158    let total_days = years * 365 + months * 30 + weeks * 7 + days;
2159    let args = DurationArgs::new()
2160        .with_days(lit(total_days))
2161        .with_hours(lit(hours))
2162        .with_minutes(lit(mins))
2163        .with_seconds(lit(secs));
2164    let dur = duration(args);
2165    crate::column::Column::from_expr(dur, None)
2166}
2167
2168/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
2169pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2170    use polars::prelude::*;
2171    let args = DurationArgs::new()
2172        .with_days(lit(days))
2173        .with_hours(lit(hours))
2174        .with_minutes(lit(minutes))
2175        .with_seconds(lit(seconds));
2176    let dur = duration(args);
2177    crate::column::Column::from_expr(dur, None)
2178}
2179
2180/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
2181pub fn make_ym_interval(years: i32, months: i32) -> Column {
2182    use polars::prelude::*;
2183    let total_months = years * 12 + months;
2184    crate::column::Column::from_expr(lit(total_months), None)
2185}
2186
2187/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
2188pub fn make_timestamp_ntz(
2189    year: &Column,
2190    month: &Column,
2191    day: &Column,
2192    hour: &Column,
2193    minute: &Column,
2194    sec: &Column,
2195) -> Column {
2196    make_timestamp(year, month, day, hour, minute, sec, None)
2197}
2198
2199/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
2200pub fn timestamp_seconds(column: &Column) -> Column {
2201    column.clone().timestamp_seconds()
2202}
2203
2204/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
2205pub fn timestamp_millis(column: &Column) -> Column {
2206    column.clone().timestamp_millis()
2207}
2208
2209/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
2210pub fn timestamp_micros(column: &Column) -> Column {
2211    column.clone().timestamp_micros()
2212}
2213
2214/// Date to days since 1970-01-01 (PySpark unix_date).
2215pub fn unix_date(column: &Column) -> Column {
2216    column.clone().unix_date()
2217}
2218
2219/// Days since epoch to date (PySpark date_from_unix_date).
2220pub fn date_from_unix_date(column: &Column) -> Column {
2221    column.clone().date_from_unix_date()
2222}
2223
2224/// Positive modulus (PySpark pmod).
2225pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2226    dividend.clone().pmod(divisor)
2227}
2228
2229/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
2230pub fn factorial(column: &Column) -> Column {
2231    column.clone().factorial()
2232}
2233
2234/// Concatenate string columns without separator (PySpark concat)
2235pub fn concat(columns: &[&Column]) -> Column {
2236    use polars::prelude::*;
2237    if columns.is_empty() {
2238        panic!("concat requires at least one column");
2239    }
2240    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2241    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2242}
2243
2244/// Concatenate string columns with separator (PySpark concat_ws)
2245pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2246    use polars::prelude::*;
2247    if columns.is_empty() {
2248        panic!("concat_ws requires at least one column");
2249    }
2250    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2251    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2252}
2253
2254/// Row number window function (1, 2, 3 by order within partition).
2255/// Use with `.over(partition_by)` after ranking by an order column.
2256///
2257/// # Example
2258/// ```
2259/// use robin_sparkless_expr::{col, Column};
2260/// let salary_col = col("salary");
2261/// let rn = salary_col.row_number(true).over(&["dept"]);
2262/// ```
2263pub fn row_number(column: &Column) -> Column {
2264    column.clone().row_number(false)
2265}
2266
2267/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
2268pub fn rank(column: &Column, descending: bool) -> Column {
2269    column.clone().rank(descending)
2270}
2271
2272/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
2273pub fn dense_rank(column: &Column, descending: bool) -> Column {
2274    column.clone().dense_rank(descending)
2275}
2276
2277/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
2278pub fn lag(column: &Column, n: i64) -> Column {
2279    column.clone().lag(n)
2280}
2281
2282/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
2283pub fn lead(column: &Column, n: i64) -> Column {
2284    column.clone().lead(n)
2285}
2286
2287/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
2288pub fn first_value(column: &Column) -> Column {
2289    column.clone().first_value()
2290}
2291
2292/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
2293pub fn last_value(column: &Column) -> Column {
2294    column.clone().last_value()
2295}
2296
2297/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
2298pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2299    column.clone().percent_rank(partition_by, descending)
2300}
2301
2302/// Cumulative distribution in partition: row_number / count. Window is applied.
2303pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2304    column.clone().cume_dist(partition_by, descending)
2305}
2306
2307/// Ntile: bucket 1..n by rank within partition. Window is applied.
2308pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2309    column.clone().ntile(n, partition_by, descending)
2310}
2311
2312/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
2313pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2314    column.clone().nth_value(n, partition_by, descending)
2315}
2316
2317/// Coalesce - returns the first non-null value from multiple columns.
2318///
2319/// # Example
2320/// ```
2321/// use robin_sparkless_expr::functions::{col, lit_i64, coalesce};
2322///
2323/// // coalesce(col("a"), col("b"), lit(0))
2324/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
2325/// ```
2326pub fn coalesce(columns: &[&Column]) -> Column {
2327    use polars::prelude::*;
2328    if columns.is_empty() {
2329        panic!("coalesce requires at least one column");
2330    }
2331    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2332    let expr = coalesce(&exprs);
2333    crate::column::Column::from_expr(expr, None)
2334}
2335
2336/// Alias for coalesce(col, value). PySpark nvl / ifnull.
2337pub fn nvl(column: &Column, value: &Column) -> Column {
2338    coalesce(&[column, value])
2339}
2340
2341/// Alias for nvl. PySpark ifnull.
2342pub fn ifnull(column: &Column, value: &Column) -> Column {
2343    nvl(column, value)
2344}
2345
2346/// Return null if column equals value, else column. PySpark nullif.
2347pub fn nullif(column: &Column, value: &Column) -> Column {
2348    use polars::prelude::*;
2349    let cond = column.expr().clone().eq(value.expr().clone());
2350    let null_lit = lit(NULL);
2351    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2352    crate::column::Column::from_expr(expr, None)
2353}
2354
2355/// Replace NaN with value. PySpark nanvl.
2356pub fn nanvl(column: &Column, value: &Column) -> Column {
2357    use polars::prelude::*;
2358    let cond = column.expr().clone().is_nan();
2359    let expr = when(cond)
2360        .then(value.expr().clone())
2361        .otherwise(column.expr().clone());
2362    crate::column::Column::from_expr(expr, None)
2363}
2364
2365/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
2366pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2367    use polars::prelude::*;
2368    let cond = col1.expr().clone().is_not_null();
2369    let expr = when(cond)
2370        .then(col2.expr().clone())
2371        .otherwise(col3.expr().clone());
2372    crate::column::Column::from_expr(expr, None)
2373}
2374
2375/// Alias for substring. PySpark substr.
2376pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2377    substring(column, start, length)
2378}
2379
2380/// Alias for pow. PySpark power.
2381pub fn power(column: &Column, exp: i64) -> Column {
2382    pow(column, exp)
2383}
2384
2385/// Alias for log (natural log). PySpark ln.
2386pub fn ln(column: &Column) -> Column {
2387    log(column)
2388}
2389
2390/// Alias for ceil. PySpark ceiling.
2391pub fn ceiling(column: &Column) -> Column {
2392    ceil(column)
2393}
2394
2395/// Alias for lower. PySpark lcase.
2396pub fn lcase(column: &Column) -> Column {
2397    lower(column)
2398}
2399
2400/// Alias for upper. PySpark ucase.
2401pub fn ucase(column: &Column) -> Column {
2402    upper(column)
2403}
2404
2405/// Alias for day. PySpark dayofmonth.
2406pub fn dayofmonth(column: &Column) -> Column {
2407    day(column)
2408}
2409
2410/// Alias for degrees. PySpark toDegrees.
2411pub fn to_degrees(column: &Column) -> Column {
2412    degrees(column)
2413}
2414
2415/// Alias for radians. PySpark toRadians.
2416pub fn to_radians(column: &Column) -> Column {
2417    radians(column)
2418}
2419
2420/// Hyperbolic cosine (PySpark cosh).
2421pub fn cosh(column: &Column) -> Column {
2422    column.clone().cosh()
2423}
2424/// Hyperbolic sine (PySpark sinh).
2425pub fn sinh(column: &Column) -> Column {
2426    column.clone().sinh()
2427}
2428/// Hyperbolic tangent (PySpark tanh).
2429pub fn tanh(column: &Column) -> Column {
2430    column.clone().tanh()
2431}
2432/// Inverse hyperbolic cosine (PySpark acosh).
2433pub fn acosh(column: &Column) -> Column {
2434    column.clone().acosh()
2435}
2436/// Inverse hyperbolic sine (PySpark asinh).
2437pub fn asinh(column: &Column) -> Column {
2438    column.clone().asinh()
2439}
2440/// Inverse hyperbolic tangent (PySpark atanh).
2441pub fn atanh(column: &Column) -> Column {
2442    column.clone().atanh()
2443}
2444/// Cube root (PySpark cbrt).
2445pub fn cbrt(column: &Column) -> Column {
2446    column.clone().cbrt()
2447}
2448/// exp(x) - 1 (PySpark expm1).
2449pub fn expm1(column: &Column) -> Column {
2450    column.clone().expm1()
2451}
2452/// log(1 + x) (PySpark log1p).
2453pub fn log1p(column: &Column) -> Column {
2454    column.clone().log1p()
2455}
2456/// Base-10 log (PySpark log10).
2457pub fn log10(column: &Column) -> Column {
2458    column.clone().log10()
2459}
2460/// Base-2 log (PySpark log2).
2461pub fn log2(column: &Column) -> Column {
2462    column.clone().log2()
2463}
2464/// Round to nearest integer (PySpark rint).
2465pub fn rint(column: &Column) -> Column {
2466    column.clone().rint()
2467}
2468/// sqrt(x*x + y*y) (PySpark hypot).
2469pub fn hypot(x: &Column, y: &Column) -> Column {
2470    let xx = x.expr().clone() * x.expr().clone();
2471    let yy = y.expr().clone() * y.expr().clone();
2472    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2473}
2474
2475/// True if column is null. PySpark isnull.
2476pub fn isnull(column: &Column) -> Column {
2477    column.clone().is_null()
2478}
2479
2480/// True if column is not null. PySpark isnotnull.
2481pub fn isnotnull(column: &Column) -> Column {
2482    column.clone().is_not_null()
2483}
2484
2485/// Create an array column from multiple columns (PySpark array).
2486/// With no arguments, returns a column of empty arrays (one per row); PySpark parity.
2487pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2488    use polars::prelude::*;
2489    if columns.is_empty() {
2490        // PySpark F.array() with no args: one empty list per row (broadcast literal).
2491        // Use .first() so the single-row literal is treated as a scalar and broadcasts to frame height.
2492        let empty_inner = Series::new("".into(), Vec::<i64>::new());
2493        let list_series = ListChunked::from_iter([Some(empty_inner)])
2494            .with_name("array".into())
2495            .into_series();
2496        let expr = lit(list_series).first();
2497        return Ok(crate::column::Column::from_expr(expr, None));
2498    }
2499    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2500    let expr = concat_list(exprs)
2501        .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2502    Ok(crate::column::Column::from_expr(expr, None))
2503}
2504
2505/// Number of elements in list (PySpark size / array_size). Returns Int32.
2506pub fn array_size(column: &Column) -> Column {
2507    column.clone().array_size()
2508}
2509
2510/// Alias for array_size (PySpark size).
2511pub fn size(column: &Column) -> Column {
2512    column.clone().array_size()
2513}
2514
2515/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2516pub fn cardinality(column: &Column) -> Column {
2517    column.clone().cardinality()
2518}
2519
2520/// Check if list contains value (PySpark array_contains).
2521pub fn array_contains(column: &Column, value: &Column) -> Column {
2522    column.clone().array_contains(value.expr().clone())
2523}
2524
2525/// Join list of strings with separator (PySpark array_join).
2526pub fn array_join(column: &Column, separator: &str) -> Column {
2527    column.clone().array_join(separator)
2528}
2529
2530/// Maximum element in list (PySpark array_max).
2531pub fn array_max(column: &Column) -> Column {
2532    column.clone().array_max()
2533}
2534
2535/// Minimum element in list (PySpark array_min).
2536pub fn array_min(column: &Column) -> Column {
2537    column.clone().array_min()
2538}
2539
2540/// Get element at 1-based index (PySpark element_at).
2541pub fn element_at(column: &Column, index: i64) -> Column {
2542    column.clone().element_at(index)
2543}
2544
2545/// Sort list elements (PySpark array_sort).
2546pub fn array_sort(column: &Column) -> Column {
2547    column.clone().array_sort()
2548}
2549
2550/// Distinct elements in list (PySpark array_distinct).
2551pub fn array_distinct(column: &Column) -> Column {
2552    column.clone().array_distinct()
2553}
2554
2555/// Slice list from 1-based start with optional length (PySpark slice).
2556pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2557    column.clone().array_slice(start, length)
2558}
2559
2560/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2561/// step defaults to 1.
2562pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2563    use polars::prelude::{DataType, Field, as_struct, lit};
2564    let step_expr = step
2565        .map(|c| c.expr().clone().alias("2"))
2566        .unwrap_or_else(|| lit(1i64).alias("2"));
2567    let struct_expr = as_struct(vec![
2568        start.expr().clone().alias("0"),
2569        stop.expr().clone().alias("1"),
2570        step_expr,
2571    ]);
2572    let out_dtype = DataType::List(Box::new(DataType::Int64));
2573    let expr = struct_expr.map(
2574        |s| crate::column::expect_col(crate::udfs::apply_sequence(s)),
2575        move |_schema, field| Ok(Field::new(field.name().clone(), out_dtype.clone())),
2576    );
2577    crate::column::Column::from_expr(expr, None)
2578}
2579
2580/// Random permutation of list elements (PySpark shuffle).
2581pub fn shuffle(column: &Column) -> Column {
2582    let expr = column.expr().clone().map(
2583        |s| crate::column::expect_col(crate::udfs::apply_shuffle(s)),
2584        |_schema, field| Ok(field.clone()),
2585    );
2586    crate::column::Column::from_expr(expr, None)
2587}
2588
2589/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2590/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2591pub fn inline(column: &Column) -> Column {
2592    column.clone().explode()
2593}
2594
2595/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2596pub fn inline_outer(column: &Column) -> Column {
2597    column.clone().explode_outer()
2598}
2599
2600/// Explode list into one row per element (PySpark explode).
2601pub fn explode(column: &Column) -> Column {
2602    column.clone().explode()
2603}
2604
2605/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2606/// Implemented via Polars list.eval with col("") as element.
2607pub fn array_position(column: &Column, value: &Column) -> Column {
2608    column.clone().array_position(value.expr().clone())
2609}
2610
2611/// Remove null elements from list (PySpark array_compact).
2612pub fn array_compact(column: &Column) -> Column {
2613    column.clone().array_compact()
2614}
2615
2616/// New list with all elements equal to value removed (PySpark array_remove).
2617/// Implemented via Polars list.eval + list.drop_nulls.
2618pub fn array_remove(column: &Column, value: &Column) -> Column {
2619    column.clone().array_remove(value.expr().clone())
2620}
2621
2622/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2623pub fn array_repeat(column: &Column, n: i64) -> Column {
2624    column.clone().array_repeat(n)
2625}
2626
2627/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2628pub fn array_flatten(column: &Column) -> Column {
2629    column.clone().array_flatten()
2630}
2631
2632/// True if any list element satisfies the predicate (PySpark exists).
2633pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2634    column.clone().array_exists(predicate)
2635}
2636
2637/// True if all list elements satisfy the predicate (PySpark forall).
2638pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2639    column.clone().array_forall(predicate)
2640}
2641
2642/// Filter list elements by predicate (PySpark filter).
2643pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2644    column.clone().array_filter(predicate)
2645}
2646
2647/// Transform list elements by expression (PySpark transform).
2648pub fn array_transform(column: &Column, f: Expr) -> Column {
2649    column.clone().array_transform(f)
2650}
2651
2652/// Sum of list elements (PySpark aggregate sum).
2653pub fn array_sum(column: &Column) -> Column {
2654    column.clone().array_sum()
2655}
2656
2657/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2658pub fn aggregate(column: &Column, zero: &Column) -> Column {
2659    column.clone().array_aggregate(zero)
2660}
2661
2662/// Mean of list elements (PySpark aggregate avg).
2663pub fn array_mean(column: &Column) -> Column {
2664    column.clone().array_mean()
2665}
2666
2667/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2668/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2669pub fn posexplode(column: &Column) -> (Column, Column) {
2670    column.clone().posexplode()
2671}
2672
2673/// Build a map column from alternating key/value expressions (PySpark create_map).
2674/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2675/// With no args (or empty slice), returns a column of empty maps per row (PySpark parity #275).
2676pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2677    use polars::chunked_array::StructChunked;
2678    use polars::prelude::{IntoSeries, ListChunked, as_struct, concat_list, lit};
2679    if key_values.is_empty() {
2680        // PySpark F.create_map() with no args: one empty map {} per row (broadcast literal).
2681        let key_s = Series::new("key".into(), Vec::<String>::new());
2682        let value_s = Series::new("value".into(), Vec::<String>::new());
2683        let fields: [&Series; 2] = [&key_s, &value_s];
2684        let empty_struct = StructChunked::from_series(
2685            polars::prelude::PlSmallStr::EMPTY,
2686            0,
2687            fields.iter().copied(),
2688        )
2689        .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2690        .into_series();
2691        let list_series = ListChunked::from_iter([Some(empty_struct)])
2692            .with_name("create_map".into())
2693            .into_series();
2694        let expr = lit(list_series).first();
2695        return Ok(crate::column::Column::from_expr(expr, None));
2696    }
2697    let mut struct_exprs: Vec<Expr> = Vec::new();
2698    for i in (0..key_values.len()).step_by(2) {
2699        if i + 1 < key_values.len() {
2700            let k = key_values[i].expr().clone().alias("key");
2701            let v = key_values[i + 1].expr().clone().alias("value");
2702            struct_exprs.push(as_struct(vec![k, v]));
2703        }
2704    }
2705    let expr = concat_list(struct_exprs)
2706        .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2707    Ok(crate::column::Column::from_expr(expr, None))
2708}
2709
2710/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2711pub fn map_keys(column: &Column) -> Column {
2712    column.clone().map_keys()
2713}
2714
2715/// Extract values from a map column (PySpark map_values).
2716pub fn map_values(column: &Column) -> Column {
2717    column.clone().map_values()
2718}
2719
2720/// Return map as list of structs {key, value} (PySpark map_entries).
2721pub fn map_entries(column: &Column) -> Column {
2722    column.clone().map_entries()
2723}
2724
2725/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2726pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2727    keys.clone().map_from_arrays(values)
2728}
2729
2730/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2731pub fn map_concat(a: &Column, b: &Column) -> Column {
2732    a.clone().map_concat(b)
2733}
2734
2735/// Array of structs {key, value} to map (PySpark map_from_entries).
2736pub fn map_from_entries(column: &Column) -> Column {
2737    column.clone().map_from_entries()
2738}
2739
2740/// True if map contains key (PySpark map_contains_key).
2741pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2742    map_col.clone().map_contains_key(key)
2743}
2744
2745/// Get value for key from map, or null (PySpark get).
2746pub fn get(map_col: &Column, key: &Column) -> Column {
2747    map_col.clone().get(key)
2748}
2749
2750/// Filter map entries by predicate (PySpark map_filter).
2751pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2752    map_col.clone().map_filter(predicate)
2753}
2754
2755/// Merge two maps by key with merge function (PySpark map_zip_with).
2756pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2757    map1.clone().map_zip_with(map2, merge)
2758}
2759
2760/// Convenience: zip_with with coalesce(left, right) merge.
2761pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2762    use polars::prelude::col;
2763    let left_field = col("").struct_().field_by_name("left");
2764    let right_field = col("").struct_().field_by_name("right");
2765    let merge = crate::column::Column::from_expr(
2766        coalesce(&[
2767            &crate::column::Column::from_expr(left_field, None),
2768            &crate::column::Column::from_expr(right_field, None),
2769        ])
2770        .into_expr(),
2771        None,
2772    );
2773    left.clone().zip_with(right, merge.into_expr())
2774}
2775
2776/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2777pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2778    use polars::prelude::col;
2779    let v1 = col("").struct_().field_by_name("value1");
2780    let v2 = col("").struct_().field_by_name("value2");
2781    let merge = coalesce(&[
2782        &crate::column::Column::from_expr(v1, None),
2783        &crate::column::Column::from_expr(v2, None),
2784    ])
2785    .into_expr();
2786    map1.clone().map_zip_with(map2, merge)
2787}
2788
2789/// Convenience: map_filter with value > threshold predicate.
2790pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2791    use polars::prelude::{col, lit};
2792    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2793    map_col.clone().map_filter(pred)
2794}
2795
2796/// Create struct from columns using column names as field names (PySpark struct).
2797pub fn struct_(columns: &[&Column]) -> Column {
2798    use polars::prelude::as_struct;
2799    if columns.is_empty() {
2800        panic!("struct requires at least one column");
2801    }
2802    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2803    crate::column::Column::from_expr(as_struct(exprs), None)
2804}
2805
2806/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2807pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2808    use polars::prelude::as_struct;
2809    if pairs.is_empty() {
2810        panic!("named_struct requires at least one (name, column) pair");
2811    }
2812    let exprs: Vec<Expr> = pairs
2813        .iter()
2814        .map(|(name, col)| col.expr().clone().alias(*name))
2815        .collect();
2816    crate::column::Column::from_expr(as_struct(exprs), None)
2817}
2818
2819/// Append element to end of list (PySpark array_append).
2820pub fn array_append(array: &Column, elem: &Column) -> Column {
2821    array.clone().array_append(elem)
2822}
2823
2824/// Prepend element to start of list (PySpark array_prepend).
2825pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2826    array.clone().array_prepend(elem)
2827}
2828
2829/// Insert element at 1-based position (PySpark array_insert).
2830pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2831    array.clone().array_insert(pos, elem)
2832}
2833
2834/// Elements in first array not in second (PySpark array_except).
2835pub fn array_except(a: &Column, b: &Column) -> Column {
2836    a.clone().array_except(b)
2837}
2838
2839/// Elements in both arrays (PySpark array_intersect).
2840pub fn array_intersect(a: &Column, b: &Column) -> Column {
2841    a.clone().array_intersect(b)
2842}
2843
2844/// Distinct elements from both arrays (PySpark array_union).
2845pub fn array_union(a: &Column, b: &Column) -> Column {
2846    a.clone().array_union(b)
2847}
2848
2849/// Zip two arrays element-wise with merge function (PySpark zip_with).
2850pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2851    left.clone().zip_with(right, merge)
2852}
2853
2854/// Extract JSON path from string column (PySpark get_json_object).
2855pub fn get_json_object(column: &Column, path: &str) -> Column {
2856    column.clone().get_json_object(path)
2857}
2858
2859/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2860pub fn json_object_keys(column: &Column) -> Column {
2861    column.clone().json_object_keys()
2862}
2863
2864/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2865pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2866    column.clone().json_tuple(keys)
2867}
2868
2869/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2870pub fn from_csv(column: &Column) -> Column {
2871    column.clone().from_csv()
2872}
2873
2874/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2875pub fn to_csv(column: &Column) -> Column {
2876    column.clone().to_csv()
2877}
2878
2879/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2880pub fn schema_of_csv(_column: &Column) -> Column {
2881    Column::from_expr(
2882        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2883        Some("schema_of_csv".to_string()),
2884    )
2885}
2886
2887/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2888pub fn schema_of_json(_column: &Column) -> Column {
2889    Column::from_expr(
2890        lit("STRUCT<>".to_string()),
2891        Some("schema_of_json".to_string()),
2892    )
2893}
2894
2895/// Parse string column as JSON into struct (PySpark from_json).
2896pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2897    column.clone().from_json(schema)
2898}
2899
2900/// Serialize struct column to JSON string (PySpark to_json).
2901pub fn to_json(column: &Column) -> Column {
2902    column.clone().to_json()
2903}
2904
2905/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2906pub fn isin(column: &Column, other: &Column) -> Column {
2907    column.clone().isin(other)
2908}
2909
2910/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2911pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2912    let s = Series::from_iter(values.iter().cloned());
2913    Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2914}
2915
2916/// Check if column values are in the given string slice (PySpark isin with literal list).
2917pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2918    let s: Series = Series::from_iter(values.iter().copied());
2919    Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2920}
2921
2922/// Percent-decode URL-encoded string (PySpark url_decode).
2923pub fn url_decode(column: &Column) -> Column {
2924    column.clone().url_decode()
2925}
2926
2927/// Percent-encode string for URL (PySpark url_encode).
2928pub fn url_encode(column: &Column) -> Column {
2929    column.clone().url_encode()
2930}
2931
2932/// Bitwise left shift (PySpark shiftLeft). col << n.
2933pub fn shift_left(column: &Column, n: i32) -> Column {
2934    column.clone().shift_left(n)
2935}
2936
2937/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2938pub fn shift_right(column: &Column, n: i32) -> Column {
2939    column.clone().shift_right(n)
2940}
2941
2942/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2943pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2944    column.clone().shift_right_unsigned(n)
2945}
2946
2947/// Session/library version string (PySpark version).
2948pub fn version() -> Column {
2949    Column::from_expr(
2950        lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2951        None,
2952    )
2953}
2954
2955/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2956pub fn equal_null(left: &Column, right: &Column) -> Column {
2957    left.clone().eq_null_safe(right)
2958}
2959
2960/// Length of JSON array at path (PySpark json_array_length).
2961pub fn json_array_length(column: &Column, path: &str) -> Column {
2962    column.clone().json_array_length(path)
2963}
2964
2965/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2966/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2967pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2968    column.clone().parse_url(part, key)
2969}
2970
2971/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2972pub fn hash(columns: &[&Column]) -> Column {
2973    use polars::prelude::*;
2974    if columns.is_empty() {
2975        return crate::column::Column::from_expr(lit(0i64), None);
2976    }
2977    if columns.len() == 1 {
2978        return columns[0].clone().hash();
2979    }
2980    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2981    let struct_expr = polars::prelude::as_struct(exprs);
2982    let name = columns[0].name().to_string();
2983    let expr = struct_expr.map(
2984        |s| crate::column::expect_col(crate::udfs::apply_hash_struct(s)),
2985        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2986    );
2987    crate::column::Column::from_expr(expr, Some(name))
2988}
2989
2990/// Stack columns into struct (PySpark stack). Alias for struct_.
2991pub fn stack(columns: &[&Column]) -> Column {
2992    struct_(columns)
2993}
2994
2995#[cfg(test)]
2996mod tests {
2997    use super::*;
2998    use polars::prelude::{IntoLazy, df};
2999
3000    #[test]
3001    fn test_col_creates_column() {
3002        let column = col("test");
3003        assert_eq!(column.name(), "test");
3004    }
3005
3006    #[test]
3007    fn test_lit_i32() {
3008        let column = lit_i32(42);
3009        // The column should have a default name since it's a literal
3010        assert_eq!(column.name(), "<expr>");
3011    }
3012
3013    #[test]
3014    fn test_lit_i64() {
3015        let column = lit_i64(123456789012345i64);
3016        assert_eq!(column.name(), "<expr>");
3017    }
3018
3019    #[test]
3020    fn test_lit_f64() {
3021        let column = lit_f64(std::f64::consts::PI);
3022        assert_eq!(column.name(), "<expr>");
3023    }
3024
3025    #[test]
3026    fn test_lit_bool() {
3027        let column = lit_bool(true);
3028        assert_eq!(column.name(), "<expr>");
3029    }
3030
3031    #[test]
3032    fn test_lit_str() {
3033        let column = lit_str("hello");
3034        assert_eq!(column.name(), "<expr>");
3035    }
3036
3037    #[test]
3038    fn test_create_map_empty() {
3039        // PySpark F.create_map() with no args: column of empty maps (#275).
3040        let empty_col = create_map(&[]).unwrap();
3041        let df = df!("id" => &[1i64, 2i64]).unwrap();
3042        let out = df
3043            .lazy()
3044            .with_columns([empty_col.into_expr().alias("m")])
3045            .collect()
3046            .unwrap();
3047        assert_eq!(out.height(), 2);
3048        let m = out.column("m").unwrap();
3049        assert_eq!(m.len(), 2);
3050        let list = m.list().unwrap();
3051        for i in 0..2 {
3052            let row = list.get(i).unwrap();
3053            assert_eq!(row.len(), 0);
3054        }
3055    }
3056
3057    #[test]
3058    fn test_count_aggregation() {
3059        let column = col("value");
3060        let result = count(&column);
3061        assert_eq!(result.name(), "count");
3062    }
3063
3064    #[test]
3065    fn test_sum_aggregation() {
3066        let column = col("value");
3067        let result = sum(&column);
3068        assert_eq!(result.name(), "sum");
3069    }
3070
3071    #[test]
3072    fn test_avg_aggregation() {
3073        let column = col("value");
3074        let result = avg(&column);
3075        assert_eq!(result.name(), "avg");
3076    }
3077
3078    #[test]
3079    fn test_max_aggregation() {
3080        let column = col("value");
3081        let result = max(&column);
3082        assert_eq!(result.name(), "max");
3083    }
3084
3085    #[test]
3086    fn test_min_aggregation() {
3087        let column = col("value");
3088        let result = min(&column);
3089        assert_eq!(result.name(), "min");
3090    }
3091
3092    #[test]
3093    fn test_when_then_otherwise() {
3094        // Create a simple DataFrame
3095        let df = df!(
3096            "age" => &[15, 25, 35]
3097        )
3098        .unwrap();
3099
3100        // Build a when-then-otherwise expression
3101        let age_col = col("age");
3102        let condition = age_col.gt(polars::prelude::lit(18));
3103        let result = when(&condition)
3104            .then(&lit_str("adult"))
3105            .otherwise(&lit_str("minor"));
3106
3107        // Apply the expression
3108        let result_df = df
3109            .lazy()
3110            .with_column(result.into_expr().alias("status"))
3111            .collect()
3112            .unwrap();
3113
3114        // Verify the result
3115        let status_col = result_df.column("status").unwrap();
3116        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
3117
3118        assert_eq!(values[0], Some("minor")); // age 15 < 18
3119        assert_eq!(values[1], Some("adult")); // age 25 > 18
3120        assert_eq!(values[2], Some("adult")); // age 35 > 18
3121    }
3122
3123    #[test]
3124    fn test_coalesce_returns_first_non_null() {
3125        // Create a DataFrame with some nulls
3126        let df = df!(
3127            "a" => &[Some(1), None, None],
3128            "b" => &[None, Some(2), None],
3129            "c" => &[None, None, Some(3)]
3130        )
3131        .unwrap();
3132
3133        let col_a = col("a");
3134        let col_b = col("b");
3135        let col_c = col("c");
3136        let result = coalesce(&[&col_a, &col_b, &col_c]);
3137
3138        // Apply the expression
3139        let result_df = df
3140            .lazy()
3141            .with_column(result.into_expr().alias("coalesced"))
3142            .collect()
3143            .unwrap();
3144
3145        // Verify the result
3146        let coalesced_col = result_df.column("coalesced").unwrap();
3147        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3148
3149        assert_eq!(values[0], Some(1)); // First non-null is 'a'
3150        assert_eq!(values[1], Some(2)); // First non-null is 'b'
3151        assert_eq!(values[2], Some(3)); // First non-null is 'c'
3152    }
3153
3154    #[test]
3155    fn test_coalesce_with_literal_fallback() {
3156        // Create a DataFrame with all nulls in one row
3157        let df = df!(
3158            "a" => &[Some(1), None],
3159            "b" => &[None::<i32>, None::<i32>]
3160        )
3161        .unwrap();
3162
3163        let col_a = col("a");
3164        let col_b = col("b");
3165        let fallback = lit_i32(0);
3166        let result = coalesce(&[&col_a, &col_b, &fallback]);
3167
3168        // Apply the expression
3169        let result_df = df
3170            .lazy()
3171            .with_column(result.into_expr().alias("coalesced"))
3172            .collect()
3173            .unwrap();
3174
3175        // Verify the result
3176        let coalesced_col = result_df.column("coalesced").unwrap();
3177        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3178
3179        assert_eq!(values[0], Some(1)); // First non-null is 'a'
3180        assert_eq!(values[1], Some(0)); // All nulls, use fallback
3181    }
3182
3183    #[test]
3184    #[should_panic(expected = "coalesce requires at least one column")]
3185    fn test_coalesce_empty_panics() {
3186        let columns: [&Column; 0] = [];
3187        let _ = coalesce(&columns);
3188    }
3189
3190    #[test]
3191    fn test_cast_double_string_column_strict_ok() {
3192        // All values parse as doubles, so strict cast should succeed.
3193        let df = df!(
3194            "s" => &["123", " 45.5 ", "0"]
3195        )
3196        .unwrap();
3197
3198        let s_col = col("s");
3199        let cast_col = cast(&s_col, "double").unwrap();
3200
3201        let out = df
3202            .lazy()
3203            .with_column(cast_col.into_expr().alias("v"))
3204            .collect()
3205            .unwrap();
3206
3207        let v = out.column("v").unwrap();
3208        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3209        assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3210    }
3211
3212    #[test]
3213    fn test_try_cast_double_string_column_invalid_to_null() {
3214        // Invalid numeric strings should become null under try_cast / try_to_number.
3215        let df = df!(
3216            "s" => &["123", " 45.5 ", "abc", ""]
3217        )
3218        .unwrap();
3219
3220        let s_col = col("s");
3221        let try_cast_col = try_cast(&s_col, "double").unwrap();
3222
3223        let out = df
3224            .lazy()
3225            .with_column(try_cast_col.into_expr().alias("v"))
3226            .collect()
3227            .unwrap();
3228
3229        let v = out.column("v").unwrap();
3230        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3231        assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3232    }
3233
3234    #[test]
3235    fn test_to_number_and_try_to_number_numerics_and_strings() {
3236        // Mixed numeric types should be cast to double; invalid strings become null only for try_to_number.
3237        let df = df!(
3238            "i" => &[1i32, 2, 3],
3239            "f" => &[1.5f64, 2.5, 3.5],
3240            "s" => &["10", "20.5", "xyz"]
3241        )
3242        .unwrap();
3243
3244        let i_col = col("i");
3245        let f_col = col("f");
3246        let s_col = col("s");
3247
3248        let to_number_i = to_number(&i_col, None).unwrap();
3249        let to_number_f = to_number(&f_col, None).unwrap();
3250        let try_to_number_s = try_to_number(&s_col, None).unwrap();
3251
3252        let out = df
3253            .lazy()
3254            .with_columns([
3255                to_number_i.into_expr().alias("i_num"),
3256                to_number_f.into_expr().alias("f_num"),
3257                try_to_number_s.into_expr().alias("s_num"),
3258            ])
3259            .collect()
3260            .unwrap();
3261
3262        let i_num = out.column("i_num").unwrap();
3263        let f_num = out.column("f_num").unwrap();
3264        let s_num = out.column("s_num").unwrap();
3265
3266        let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3267        let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3268        let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3269
3270        assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3271        assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3272        assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3273    }
3274}