Skip to main content

robin_sparkless/
functions.rs

1use crate::column::Column;
2use crate::dataframe::DataFrame;
3use polars::prelude::*;
4
5// -----------------------------------------------------------------------------
6// -----------------------------------------------------------------------------
7
8/// Sort order specification for use in orderBy/sort. Holds expr + direction + null placement.
9#[derive(Debug, Clone)]
10pub struct SortOrder {
11    pub(crate) expr: Expr,
12    pub(crate) descending: bool,
13    pub(crate) nulls_last: bool,
14}
15
16impl SortOrder {
17    pub fn expr(&self) -> &Expr {
18        &self.expr
19    }
20}
21
22/// Ascending sort, nulls first (Spark default for ASC).
23pub fn asc(column: &Column) -> SortOrder {
24    SortOrder {
25        expr: column.expr().clone(),
26        descending: false,
27        nulls_last: false,
28    }
29}
30
31/// Ascending sort, nulls first.
32pub fn asc_nulls_first(column: &Column) -> SortOrder {
33    SortOrder {
34        expr: column.expr().clone(),
35        descending: false,
36        nulls_last: false,
37    }
38}
39
40/// Ascending sort, nulls last.
41pub fn asc_nulls_last(column: &Column) -> SortOrder {
42    SortOrder {
43        expr: column.expr().clone(),
44        descending: false,
45        nulls_last: true,
46    }
47}
48
49/// Descending sort, nulls last (Spark default for DESC).
50pub fn desc(column: &Column) -> SortOrder {
51    SortOrder {
52        expr: column.expr().clone(),
53        descending: true,
54        nulls_last: true,
55    }
56}
57
58/// Descending sort, nulls first.
59pub fn desc_nulls_first(column: &Column) -> SortOrder {
60    SortOrder {
61        expr: column.expr().clone(),
62        descending: true,
63        nulls_last: false,
64    }
65}
66
67/// Descending sort, nulls last.
68pub fn desc_nulls_last(column: &Column) -> SortOrder {
69    SortOrder {
70        expr: column.expr().clone(),
71        descending: true,
72        nulls_last: true,
73    }
74}
75
76// -----------------------------------------------------------------------------
77
78/// Parse PySpark-like type name to Polars DataType.
79/// Decimal(precision, scale) is mapped to Float64 for schema parity (Polars dtype-decimal not enabled).
80pub fn parse_type_name(name: &str) -> Result<DataType, String> {
81    let s = name.trim().to_lowercase();
82    if s.starts_with("decimal(") && s.contains(')') {
83        return Ok(DataType::Float64);
84    }
85    Ok(match s.as_str() {
86        "int" | "integer" => DataType::Int32,
87        "long" | "bigint" => DataType::Int64,
88        "float" => DataType::Float32,
89        "double" => DataType::Float64,
90        "string" | "str" => DataType::String,
91        "boolean" | "bool" => DataType::Boolean,
92        "date" => DataType::Date,
93        "timestamp" => DataType::Datetime(TimeUnit::Microseconds, None),
94        _ => return Err(format!("unknown type name: {name}")),
95    })
96}
97
98/// Get a column by name
99pub fn col(name: &str) -> Column {
100    Column::new(name.to_string())
101}
102
103/// Grouping set marker (PySpark grouping). Stub: returns 0 (no GROUPING SETS in robin-sparkless).
104pub fn grouping(column: &Column) -> Column {
105    let _ = column;
106    Column::from_expr(lit(0i32), Some("grouping".to_string()))
107}
108
109/// Grouping set id (PySpark grouping_id). Stub: returns 0.
110pub fn grouping_id(_columns: &[Column]) -> Column {
111    Column::from_expr(lit(0i64), Some("grouping_id".to_string()))
112}
113
114/// Create a literal column from a value
115pub fn lit_i32(value: i32) -> Column {
116    let expr: Expr = lit(value);
117    Column::from_expr(expr, None)
118}
119
120pub fn lit_i64(value: i64) -> Column {
121    let expr: Expr = lit(value);
122    Column::from_expr(expr, None)
123}
124
125pub fn lit_f64(value: f64) -> Column {
126    let expr: Expr = lit(value);
127    Column::from_expr(expr, None)
128}
129
130pub fn lit_bool(value: bool) -> Column {
131    let expr: Expr = lit(value);
132    Column::from_expr(expr, None)
133}
134
135pub fn lit_str(value: &str) -> Column {
136    let expr: Expr = lit(value);
137    Column::from_expr(expr, None)
138}
139
140/// Typed null literal column. Returns `Err` on unknown type name.
141/// See [`parse_type_name`] for supported type strings (e.g. `"boolean"`, `"string"`, `"bigint"`).
142pub fn lit_null(dtype: &str) -> Result<Column, String> {
143    Column::lit_null(dtype)
144}
145
146/// Count aggregation
147pub fn count(col: &Column) -> Column {
148    Column::from_expr(col.expr().clone().count(), Some("count".to_string()))
149}
150
151/// Sum aggregation
152pub fn sum(col: &Column) -> Column {
153    Column::from_expr(col.expr().clone().sum(), Some("sum".to_string()))
154}
155
156/// Average aggregation
157pub fn avg(col: &Column) -> Column {
158    Column::from_expr(col.expr().clone().mean(), Some("avg".to_string()))
159}
160
161/// Alias for avg (PySpark mean).
162pub fn mean(col: &Column) -> Column {
163    avg(col)
164}
165
166/// Maximum aggregation
167pub fn max(col: &Column) -> Column {
168    Column::from_expr(col.expr().clone().max(), Some("max".to_string()))
169}
170
171/// Minimum aggregation
172pub fn min(col: &Column) -> Column {
173    Column::from_expr(col.expr().clone().min(), Some("min".to_string()))
174}
175
176/// First value in group (PySpark first). Use in groupBy.agg(). ignorenulls: when true, first non-null; Polars 0.45 uses .first() only (ignorenulls reserved for API compatibility).
177pub fn first(col: &Column, ignorenulls: bool) -> Column {
178    let _ = ignorenulls;
179    Column::from_expr(col.expr().clone().first(), None)
180}
181
182/// Any value from the group (PySpark any_value). Use in groupBy.agg(). ignorenulls reserved for API compatibility.
183pub fn any_value(col: &Column, ignorenulls: bool) -> Column {
184    let _ = ignorenulls;
185    Column::from_expr(col.expr().clone().first(), None)
186}
187
188/// Count rows where condition is true (PySpark count_if). Use in groupBy.agg(); column should be boolean (true=1, false=0).
189pub fn count_if(col: &Column) -> Column {
190    use polars::prelude::DataType;
191    Column::from_expr(
192        col.expr().clone().cast(DataType::Int64).sum(),
193        Some("count_if".to_string()),
194    )
195}
196
197/// Sum aggregation; null on overflow (PySpark try_sum). Use in groupBy.agg(). Polars sum does not overflow; reserved for API.
198pub fn try_sum(col: &Column) -> Column {
199    Column::from_expr(col.expr().clone().sum(), Some("try_sum".to_string()))
200}
201
202/// Average aggregation; null on invalid (PySpark try_avg). Use in groupBy.agg(). Maps to mean; reserved for API.
203pub fn try_avg(col: &Column) -> Column {
204    Column::from_expr(col.expr().clone().mean(), Some("try_avg".to_string()))
205}
206
207/// Value of value_col in the row where ord_col is maximum (PySpark max_by). Use in groupBy.agg().
208pub fn max_by(value_col: &Column, ord_col: &Column) -> Column {
209    use polars::prelude::{SortOptions, as_struct};
210    let st = as_struct(vec![
211        ord_col.expr().clone().alias("_ord"),
212        value_col.expr().clone().alias("_val"),
213    ]);
214    let e = st
215        .sort(SortOptions::default().with_order_descending(true))
216        .first()
217        .struct_()
218        .field_by_name("_val");
219    Column::from_expr(e, None)
220}
221
222/// Value of value_col in the row where ord_col is minimum (PySpark min_by). Use in groupBy.agg().
223pub fn min_by(value_col: &Column, ord_col: &Column) -> Column {
224    use polars::prelude::{SortOptions, as_struct};
225    let st = as_struct(vec![
226        ord_col.expr().clone().alias("_ord"),
227        value_col.expr().clone().alias("_val"),
228    ]);
229    let e = st
230        .sort(SortOptions::default())
231        .first()
232        .struct_()
233        .field_by_name("_val");
234    Column::from_expr(e, None)
235}
236
237/// Collect column values into list per group (PySpark collect_list). Use in groupBy.agg().
238pub fn collect_list(col: &Column) -> Column {
239    Column::from_expr(
240        col.expr().clone().implode(),
241        Some("collect_list".to_string()),
242    )
243}
244
245/// Collect distinct column values into list per group (PySpark collect_set). Use in groupBy.agg().
246pub fn collect_set(col: &Column) -> Column {
247    Column::from_expr(
248        col.expr().clone().unique().implode(),
249        Some("collect_set".to_string()),
250    )
251}
252
253/// Boolean AND across group (PySpark bool_and). Use in groupBy.agg(); column should be boolean.
254pub fn bool_and(col: &Column) -> Column {
255    Column::from_expr(col.expr().clone().all(true), Some("bool_and".to_string()))
256}
257
258/// Alias for bool_and (PySpark every). Use in groupBy.agg().
259pub fn every(col: &Column) -> Column {
260    Column::from_expr(col.expr().clone().all(true), Some("every".to_string()))
261}
262
263/// Standard deviation (sample) aggregation (PySpark stddev / stddev_samp)
264pub fn stddev(col: &Column) -> Column {
265    Column::from_expr(col.expr().clone().std(1), Some("stddev".to_string()))
266}
267
268/// Variance (sample) aggregation (PySpark variance / var_samp)
269pub fn variance(col: &Column) -> Column {
270    Column::from_expr(col.expr().clone().var(1), Some("variance".to_string()))
271}
272
273/// Population standard deviation (ddof=0). PySpark stddev_pop.
274pub fn stddev_pop(col: &Column) -> Column {
275    Column::from_expr(col.expr().clone().std(0), Some("stddev_pop".to_string()))
276}
277
278/// Sample standard deviation (ddof=1). Alias for stddev. PySpark stddev_samp.
279pub fn stddev_samp(col: &Column) -> Column {
280    stddev(col)
281}
282
283/// Alias for stddev (PySpark std).
284pub fn std(col: &Column) -> Column {
285    stddev(col)
286}
287
288/// Population variance (ddof=0). PySpark var_pop.
289pub fn var_pop(col: &Column) -> Column {
290    Column::from_expr(col.expr().clone().var(0), Some("var_pop".to_string()))
291}
292
293/// Sample variance (ddof=1). Alias for variance. PySpark var_samp.
294pub fn var_samp(col: &Column) -> Column {
295    variance(col)
296}
297
298/// Median aggregation. PySpark median.
299pub fn median(col: &Column) -> Column {
300    use polars::prelude::QuantileMethod;
301    Column::from_expr(
302        col.expr()
303            .clone()
304            .quantile(lit(0.5), QuantileMethod::Linear),
305        Some("median".to_string()),
306    )
307}
308
309/// Approximate percentile (PySpark approx_percentile). Maps to quantile; percentage in 0.0..=1.0. accuracy reserved for API compatibility.
310pub fn approx_percentile(col: &Column, percentage: f64, _accuracy: Option<i32>) -> Column {
311    use polars::prelude::QuantileMethod;
312    Column::from_expr(
313        col.expr()
314            .clone()
315            .quantile(lit(percentage), QuantileMethod::Linear),
316        Some(format!("approx_percentile({percentage})")),
317    )
318}
319
320/// Approximate percentile (PySpark percentile_approx). Alias for approx_percentile.
321pub fn percentile_approx(col: &Column, percentage: f64, accuracy: Option<i32>) -> Column {
322    approx_percentile(col, percentage, accuracy)
323}
324
325/// Mode aggregation - most frequent value. PySpark mode.
326pub fn mode(col: &Column) -> Column {
327    col.clone().mode()
328}
329
330/// Count distinct aggregation (PySpark countDistinct)
331pub fn count_distinct(col: &Column) -> Column {
332    use polars::prelude::DataType;
333    Column::from_expr(
334        col.expr().clone().n_unique().cast(DataType::Int64),
335        Some("count_distinct".to_string()),
336    )
337}
338
339/// Approximate count distinct (PySpark approx_count_distinct). Use in groupBy.agg(). rsd reserved for API compatibility; Polars uses exact n_unique.
340pub fn approx_count_distinct(col: &Column, _rsd: Option<f64>) -> Column {
341    use polars::prelude::DataType;
342    Column::from_expr(
343        col.expr().clone().n_unique().cast(DataType::Int64),
344        Some("approx_count_distinct".to_string()),
345    )
346}
347
348/// Kurtosis aggregation (PySpark kurtosis). Fisher definition, bias=true. Use in groupBy.agg().
349pub fn kurtosis(col: &Column) -> Column {
350    Column::from_expr(
351        col.expr()
352            .clone()
353            .cast(DataType::Float64)
354            .kurtosis(true, true),
355        Some("kurtosis".to_string()),
356    )
357}
358
359/// Skewness aggregation (PySpark skewness). bias=true. Use in groupBy.agg().
360pub fn skewness(col: &Column) -> Column {
361    Column::from_expr(
362        col.expr().clone().cast(DataType::Float64).skew(true),
363        Some("skewness".to_string()),
364    )
365}
366
367/// Population covariance aggregation (PySpark covar_pop). Returns Expr for use in groupBy.agg().
368pub fn covar_pop_expr(col1: &str, col2: &str) -> Expr {
369    use polars::prelude::{col as pl_col, len};
370    let c1 = pl_col(col1).cast(DataType::Float64);
371    let c2 = pl_col(col2).cast(DataType::Float64);
372    let n = len().cast(DataType::Float64);
373    let sum_ab = (c1.clone() * c2.clone()).sum();
374    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
375    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
376    (sum_ab - sum_a * sum_b / n.clone()) / n
377}
378
379/// Population covariance aggregation (PySpark covar_pop). Module-level; use in groupBy.agg() with two columns.
380pub fn covar_pop(col1: &Column, col2: &Column) -> Column {
381    use polars::prelude::len;
382    let c1 = col1.expr().clone().cast(DataType::Float64);
383    let c2 = col2.expr().clone().cast(DataType::Float64);
384    let n = len().cast(DataType::Float64);
385    let sum_ab = (c1.clone() * c2.clone()).sum();
386    let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
387    let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
388    let e = (sum_ab - sum_a * sum_b / n.clone()) / n;
389    Column::from_expr(e, Some("covar_pop".to_string()))
390}
391
392/// Pearson correlation aggregation (PySpark corr). Module-level; use in groupBy.agg() with two columns.
393pub fn corr(col1: &Column, col2: &Column) -> Column {
394    use polars::prelude::{len, lit, when};
395    let c1 = col1.expr().clone().cast(DataType::Float64);
396    let c2 = col2.expr().clone().cast(DataType::Float64);
397    let n = len().cast(DataType::Float64);
398    let n1 = (len() - lit(1)).cast(DataType::Float64);
399    let sum_ab = (c1.clone() * c2.clone()).sum();
400    let sum_a = col1.expr().clone().sum().cast(DataType::Float64);
401    let sum_b = col2.expr().clone().sum().cast(DataType::Float64);
402    let sum_a2 = (c1.clone() * c1).sum();
403    let sum_b2 = (c2.clone() * c2).sum();
404    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
405    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
406    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
407    let std_a = var_a.sqrt();
408    let std_b = var_b.sqrt();
409    let e = when(len().gt(lit(1)))
410        .then(cov_samp / (std_a * std_b))
411        .otherwise(lit(f64::NAN));
412    Column::from_expr(e, Some("corr".to_string()))
413}
414
415/// Sample covariance aggregation (PySpark covar_samp). Returns Expr for use in groupBy.agg().
416pub fn covar_samp_expr(col1: &str, col2: &str) -> Expr {
417    use polars::prelude::{col as pl_col, len, lit, when};
418    let c1 = pl_col(col1).cast(DataType::Float64);
419    let c2 = pl_col(col2).cast(DataType::Float64);
420    let n = len().cast(DataType::Float64);
421    let sum_ab = (c1.clone() * c2.clone()).sum();
422    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
423    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
424    when(len().gt(lit(1)))
425        .then((sum_ab - sum_a * sum_b / n.clone()) / (len() - lit(1)).cast(DataType::Float64))
426        .otherwise(lit(f64::NAN))
427}
428
429/// Pearson correlation aggregation (PySpark corr). Returns Expr for use in groupBy.agg().
430pub fn corr_expr(col1: &str, col2: &str) -> Expr {
431    use polars::prelude::{col as pl_col, len, lit, when};
432    let c1 = pl_col(col1).cast(DataType::Float64);
433    let c2 = pl_col(col2).cast(DataType::Float64);
434    let n = len().cast(DataType::Float64);
435    let n1 = (len() - lit(1)).cast(DataType::Float64);
436    let sum_ab = (c1.clone() * c2.clone()).sum();
437    let sum_a = pl_col(col1).sum().cast(DataType::Float64);
438    let sum_b = pl_col(col2).sum().cast(DataType::Float64);
439    let sum_a2 = (c1.clone() * c1).sum();
440    let sum_b2 = (c2.clone() * c2).sum();
441    let cov_samp = (sum_ab - sum_a.clone() * sum_b.clone() / n.clone()) / n1.clone();
442    let var_a = (sum_a2 - sum_a.clone() * sum_a / n.clone()) / n1.clone();
443    let var_b = (sum_b2 - sum_b.clone() * sum_b / n.clone()) / n1.clone();
444    let std_a = var_a.sqrt();
445    let std_b = var_b.sqrt();
446    when(len().gt(lit(1)))
447        .then(cov_samp / (std_a * std_b))
448        .otherwise(lit(f64::NAN))
449}
450
451// --- Regression aggregates (PySpark regr_*). y = col1, x = col2; only pairs where both non-null. ---
452
453fn regr_cond_and_sums(y_col: &str, x_col: &str) -> (Expr, Expr, Expr, Expr, Expr, Expr) {
454    use polars::prelude::col as pl_col;
455    let y = pl_col(y_col).cast(DataType::Float64);
456    let x = pl_col(x_col).cast(DataType::Float64);
457    let cond = y.clone().is_not_null().and(x.clone().is_not_null());
458    let n = y
459        .clone()
460        .filter(cond.clone())
461        .count()
462        .cast(DataType::Float64);
463    let sum_x = x.clone().filter(cond.clone()).sum();
464    let sum_y = y.clone().filter(cond.clone()).sum();
465    let sum_xx = (x.clone() * x.clone()).filter(cond.clone()).sum();
466    let sum_yy = (y.clone() * y.clone()).filter(cond.clone()).sum();
467    let sum_xy = (x * y).filter(cond).sum();
468    (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy)
469}
470
471/// Regression: count of (y, x) pairs where both non-null (PySpark regr_count).
472pub fn regr_count_expr(y_col: &str, x_col: &str) -> Expr {
473    let (n, ..) = regr_cond_and_sums(y_col, x_col);
474    n
475}
476
477/// Regression: average of x (PySpark regr_avgx).
478pub fn regr_avgx_expr(y_col: &str, x_col: &str) -> Expr {
479    use polars::prelude::{lit, when};
480    let (n, sum_x, ..) = regr_cond_and_sums(y_col, x_col);
481    when(n.clone().gt(lit(0.0)))
482        .then(sum_x / n)
483        .otherwise(lit(f64::NAN))
484}
485
486/// Regression: average of y (PySpark regr_avgy).
487pub fn regr_avgy_expr(y_col: &str, x_col: &str) -> Expr {
488    use polars::prelude::{lit, when};
489    let (n, _, sum_y, ..) = regr_cond_and_sums(y_col, x_col);
490    when(n.clone().gt(lit(0.0)))
491        .then(sum_y / n)
492        .otherwise(lit(f64::NAN))
493}
494
495/// Regression: sum((x - avg_x)^2) (PySpark regr_sxx).
496pub fn regr_sxx_expr(y_col: &str, x_col: &str) -> Expr {
497    use polars::prelude::{lit, when};
498    let (n, sum_x, _, sum_xx, ..) = regr_cond_and_sums(y_col, x_col);
499    when(n.clone().gt(lit(0.0)))
500        .then(sum_xx - sum_x.clone() * sum_x / n)
501        .otherwise(lit(f64::NAN))
502}
503
504/// Regression: sum((y - avg_y)^2) (PySpark regr_syy).
505pub fn regr_syy_expr(y_col: &str, x_col: &str) -> Expr {
506    use polars::prelude::{lit, when};
507    let (n, _, sum_y, _, sum_yy, _) = regr_cond_and_sums(y_col, x_col);
508    when(n.clone().gt(lit(0.0)))
509        .then(sum_yy - sum_y.clone() * sum_y / n)
510        .otherwise(lit(f64::NAN))
511}
512
513/// Regression: sum((x - avg_x)(y - avg_y)) (PySpark regr_sxy).
514pub fn regr_sxy_expr(y_col: &str, x_col: &str) -> Expr {
515    use polars::prelude::{lit, when};
516    let (n, sum_x, sum_y, _, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
517    when(n.clone().gt(lit(0.0)))
518        .then(sum_xy - sum_x * sum_y / n)
519        .otherwise(lit(f64::NAN))
520}
521
522/// Regression slope: cov_samp(y,x)/var_samp(x) (PySpark regr_slope).
523pub fn regr_slope_expr(y_col: &str, x_col: &str) -> Expr {
524    use polars::prelude::{lit, when};
525    let (n, sum_x, sum_y, sum_xx, _sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
526    let regr_sxx = sum_xx.clone() - sum_x.clone() * sum_x.clone() / n.clone();
527    let regr_sxy = sum_xy - sum_x * sum_y / n.clone();
528    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
529        .then(regr_sxy / regr_sxx)
530        .otherwise(lit(f64::NAN))
531}
532
533/// Regression intercept: avg_y - slope*avg_x (PySpark regr_intercept).
534pub fn regr_intercept_expr(y_col: &str, x_col: &str) -> Expr {
535    use polars::prelude::{lit, when};
536    let (n, sum_x, sum_y, sum_xx, _, sum_xy) = regr_cond_and_sums(y_col, x_col);
537    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
538    let regr_sxy = sum_xy.clone() - sum_x.clone() * sum_y.clone() / n.clone();
539    let slope = regr_sxy.clone() / regr_sxx.clone();
540    let avg_y = sum_y / n.clone();
541    let avg_x = sum_x / n.clone();
542    when(n.gt(lit(1.0)).and(regr_sxx.clone().gt(lit(0.0))))
543        .then(avg_y - slope * avg_x)
544        .otherwise(lit(f64::NAN))
545}
546
547/// Regression R-squared (PySpark regr_r2).
548pub fn regr_r2_expr(y_col: &str, x_col: &str) -> Expr {
549    use polars::prelude::{lit, when};
550    let (n, sum_x, sum_y, sum_xx, sum_yy, sum_xy) = regr_cond_and_sums(y_col, x_col);
551    let regr_sxx = sum_xx - sum_x.clone() * sum_x.clone() / n.clone();
552    let regr_syy = sum_yy - sum_y.clone() * sum_y.clone() / n.clone();
553    let regr_sxy = sum_xy - sum_x * sum_y / n;
554    when(
555        regr_sxx
556            .clone()
557            .gt(lit(0.0))
558            .and(regr_syy.clone().gt(lit(0.0))),
559    )
560    .then(regr_sxy.clone() * regr_sxy / (regr_sxx * regr_syy))
561    .otherwise(lit(f64::NAN))
562}
563
564/// PySpark-style conditional expression builder.
565///
566/// # Example
567/// ```
568/// use robin_sparkless::{col, lit_i64, lit_str, when};
569///
570/// // when(condition).then(value).otherwise(fallback)
571/// let expr = when(&col("age").gt(lit_i64(18).into_expr()))
572///     .then(&lit_str("adult"))
573///     .otherwise(&lit_str("minor"));
574/// ```
575pub fn when(condition: &Column) -> WhenBuilder {
576    WhenBuilder::new(condition.expr().clone())
577}
578
579/// Two-arg when(condition, value): returns value where condition is true, null otherwise (PySpark when(cond, val)).
580pub fn when_then_otherwise_null(condition: &Column, value: &Column) -> Column {
581    use polars::prelude::*;
582    let null_expr = lit(NULL);
583    let expr = polars::prelude::when(condition.expr().clone())
584        .then(value.expr().clone())
585        .otherwise(null_expr);
586    crate::column::Column::from_expr(expr, None)
587}
588
589/// Builder for when-then-otherwise expressions
590pub struct WhenBuilder {
591    condition: Expr,
592}
593
594impl WhenBuilder {
595    fn new(condition: Expr) -> Self {
596        WhenBuilder { condition }
597    }
598
599    /// Specify the value when condition is true
600    pub fn then(self, value: &Column) -> ThenBuilder {
601        use polars::prelude::*;
602        let when_then = when(self.condition).then(value.expr().clone());
603        ThenBuilder::new(when_then)
604    }
605
606    /// Specify the value when condition is false
607    /// Note: In PySpark, when(cond).otherwise(val) requires a .then() first.
608    /// For this implementation, we require .then() to be called explicitly.
609    /// This method will panic if used directly - use when(cond).then(val1).otherwise(val2) instead.
610    pub fn otherwise(self, _value: &Column) -> Column {
611        // This should not be called directly - when().otherwise() without .then() is not supported
612        // Users should use when(cond).then(val1).otherwise(val2)
613        panic!(
614            "when().otherwise() requires .then() to be called first. Use when(cond).then(val1).otherwise(val2)"
615        );
616    }
617}
618
619/// Builder for chaining when-then clauses before finalizing with otherwise
620pub struct ThenBuilder {
621    state: WhenThenState,
622}
623
624enum WhenThenState {
625    Single(Box<polars::prelude::Then>),
626    Chained(Box<polars::prelude::ChainedThen>),
627}
628
629/// Builder for an additional when-then clause (returned by ThenBuilder::when).
630pub struct ChainedWhenBuilder {
631    inner: polars::prelude::ChainedWhen,
632}
633
634impl ThenBuilder {
635    fn new(when_then: polars::prelude::Then) -> Self {
636        ThenBuilder {
637            state: WhenThenState::Single(Box::new(when_then)),
638        }
639    }
640
641    fn new_chained(chained: polars::prelude::ChainedThen) -> Self {
642        ThenBuilder {
643            state: WhenThenState::Chained(Box::new(chained)),
644        }
645    }
646
647    /// Chain an additional when-then clause (PySpark: when(a).then(x).when(b).then(y).otherwise(z)).
648    pub fn when(self, condition: &Column) -> ChainedWhenBuilder {
649        let chained_when = match self.state {
650            WhenThenState::Single(t) => t.when(condition.expr().clone()),
651            WhenThenState::Chained(ct) => ct.when(condition.expr().clone()),
652        };
653        ChainedWhenBuilder {
654            inner: chained_when,
655        }
656    }
657
658    /// Finalize the expression with the fallback value
659    pub fn otherwise(self, value: &Column) -> Column {
660        let expr = match self.state {
661            WhenThenState::Single(t) => t.otherwise(value.expr().clone()),
662            WhenThenState::Chained(ct) => ct.otherwise(value.expr().clone()),
663        };
664        crate::column::Column::from_expr(expr, None)
665    }
666}
667
668impl ChainedWhenBuilder {
669    /// Set the value for the current when clause.
670    pub fn then(self, value: &Column) -> ThenBuilder {
671        ThenBuilder::new_chained(self.inner.then(value.expr().clone()))
672    }
673}
674
675/// Convert string column to uppercase (PySpark upper)
676pub fn upper(column: &Column) -> Column {
677    column.clone().upper()
678}
679
680/// Convert string column to lowercase (PySpark lower)
681pub fn lower(column: &Column) -> Column {
682    column.clone().lower()
683}
684
685/// Substring with 1-based start (PySpark substring semantics)
686pub fn substring(column: &Column, start: i64, length: Option<i64>) -> Column {
687    column.clone().substr(start, length)
688}
689
690/// String length in characters (PySpark length)
691pub fn length(column: &Column) -> Column {
692    column.clone().length()
693}
694
695/// Trim leading and trailing whitespace (PySpark trim)
696pub fn trim(column: &Column) -> Column {
697    column.clone().trim()
698}
699
700/// Trim leading whitespace (PySpark ltrim)
701pub fn ltrim(column: &Column) -> Column {
702    column.clone().ltrim()
703}
704
705/// Trim trailing whitespace (PySpark rtrim)
706pub fn rtrim(column: &Column) -> Column {
707    column.clone().rtrim()
708}
709
710/// Trim leading and trailing chars (PySpark btrim). trim_str defaults to whitespace.
711pub fn btrim(column: &Column, trim_str: Option<&str>) -> Column {
712    column.clone().btrim(trim_str)
713}
714
715/// Find substring position 1-based, starting at pos (PySpark locate). 0 if not found.
716pub fn locate(substr: &str, column: &Column, pos: i64) -> Column {
717    column.clone().locate(substr, pos)
718}
719
720/// Base conversion (PySpark conv). num from from_base to to_base.
721pub fn conv(column: &Column, from_base: i32, to_base: i32) -> Column {
722    column.clone().conv(from_base, to_base)
723}
724
725/// Convert to hex string (PySpark hex).
726pub fn hex(column: &Column) -> Column {
727    column.clone().hex()
728}
729
730/// Convert hex string to binary/string (PySpark unhex).
731pub fn unhex(column: &Column) -> Column {
732    column.clone().unhex()
733}
734
735/// Encode string to binary (PySpark encode). Charset: UTF-8. Returns hex string.
736pub fn encode(column: &Column, charset: &str) -> Column {
737    column.clone().encode(charset)
738}
739
740/// Decode binary (hex string) to string (PySpark decode). Charset: UTF-8.
741pub fn decode(column: &Column, charset: &str) -> Column {
742    column.clone().decode(charset)
743}
744
745/// Convert to binary (PySpark to_binary). fmt: 'utf-8', 'hex'.
746pub fn to_binary(column: &Column, fmt: &str) -> Column {
747    column.clone().to_binary(fmt)
748}
749
750/// Try convert to binary; null on failure (PySpark try_to_binary).
751pub fn try_to_binary(column: &Column, fmt: &str) -> Column {
752    column.clone().try_to_binary(fmt)
753}
754
755/// AES encrypt (PySpark aes_encrypt). Key as string; AES-128-GCM.
756pub fn aes_encrypt(column: &Column, key: &str) -> Column {
757    column.clone().aes_encrypt(key)
758}
759
760/// AES decrypt (PySpark aes_decrypt). Input hex(nonce||ciphertext).
761pub fn aes_decrypt(column: &Column, key: &str) -> Column {
762    column.clone().aes_decrypt(key)
763}
764
765/// Try AES decrypt (PySpark try_aes_decrypt). Returns null on failure.
766pub fn try_aes_decrypt(column: &Column, key: &str) -> Column {
767    column.clone().try_aes_decrypt(key)
768}
769
770/// Convert integer to binary string (PySpark bin).
771pub fn bin(column: &Column) -> Column {
772    column.clone().bin()
773}
774
775/// Get bit at 0-based position (PySpark getbit).
776pub fn getbit(column: &Column, pos: i64) -> Column {
777    column.clone().getbit(pos)
778}
779
780/// Bitwise AND of two integer/boolean columns (PySpark bit_and).
781pub fn bit_and(left: &Column, right: &Column) -> Column {
782    left.clone().bit_and(right)
783}
784
785/// Bitwise OR of two integer/boolean columns (PySpark bit_or).
786pub fn bit_or(left: &Column, right: &Column) -> Column {
787    left.clone().bit_or(right)
788}
789
790/// Bitwise XOR of two integer/boolean columns (PySpark bit_xor).
791pub fn bit_xor(left: &Column, right: &Column) -> Column {
792    left.clone().bit_xor(right)
793}
794
795/// Count of set bits in the integer representation (PySpark bit_count).
796pub fn bit_count(column: &Column) -> Column {
797    column.clone().bit_count()
798}
799
800/// Bitwise NOT of an integer/boolean column (PySpark bitwise_not / bitwiseNOT).
801pub fn bitwise_not(column: &Column) -> Column {
802    column.clone().bitwise_not()
803}
804
805// --- Bitmap (PySpark 3.5+) ---
806
807/// Map integral value (0–32767) to bit position for bitmap aggregates (PySpark bitmap_bit_position).
808pub fn bitmap_bit_position(column: &Column) -> Column {
809    use polars::prelude::DataType;
810    let expr = column.expr().clone().cast(DataType::Int32);
811    Column::from_expr(expr, None)
812}
813
814/// Bucket number for distributed bitmap (PySpark bitmap_bucket_number). value / 32768.
815pub fn bitmap_bucket_number(column: &Column) -> Column {
816    use polars::prelude::DataType;
817    let expr = column.expr().clone().cast(DataType::Int64) / lit(32768i64);
818    Column::from_expr(expr, None)
819}
820
821/// Count set bits in a bitmap binary column (PySpark bitmap_count).
822pub fn bitmap_count(column: &Column) -> Column {
823    use polars::prelude::{DataType, Field};
824    let expr = column.expr().clone().map(
825        |s| crate::column::expect_col(crate::udfs::apply_bitmap_count(s)),
826        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
827    );
828    Column::from_expr(expr, None)
829}
830
831/// Aggregate: bitwise OR of bit positions into one bitmap binary (PySpark bitmap_construct_agg).
832/// Use in group_by(...).agg([bitmap_construct_agg(col)]).
833pub fn bitmap_construct_agg(column: &Column) -> polars::prelude::Expr {
834    use polars::prelude::{DataType, Field};
835    column.expr().clone().implode().map(
836        |s| crate::column::expect_col(crate::udfs::apply_bitmap_construct_agg(s)),
837        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
838    )
839}
840
841/// Aggregate: bitwise OR of bitmap binary column (PySpark bitmap_or_agg).
842pub fn bitmap_or_agg(column: &Column) -> polars::prelude::Expr {
843    use polars::prelude::{DataType, Field};
844    column.expr().clone().implode().map(
845        |s| crate::column::expect_col(crate::udfs::apply_bitmap_or_agg(s)),
846        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Binary)),
847    )
848}
849
850/// Alias for getbit (PySpark bit_get).
851pub fn bit_get(column: &Column, pos: i64) -> Column {
852    getbit(column, pos)
853}
854
855/// Assert that all boolean values are true; errors otherwise (PySpark assert_true).
856/// When err_msg is Some, it is used in the error message when assertion fails.
857pub fn assert_true(column: &Column, err_msg: Option<&str>) -> Column {
858    column.clone().assert_true(err_msg)
859}
860
861/// Raise an error when evaluated (PySpark raise_error). Always fails with the given message.
862pub fn raise_error(message: &str) -> Column {
863    let msg = message.to_string();
864    let expr = lit(0i64).map(
865        move |_col| -> PolarsResult<polars::prelude::Column> {
866            Err(PolarsError::ComputeError(msg.clone().into()))
867        },
868        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
869    );
870    Column::from_expr(expr, Some("raise_error".to_string()))
871}
872
873/// Broadcast hint - no-op that returns the same DataFrame (PySpark broadcast).
874pub fn broadcast(df: &DataFrame) -> DataFrame {
875    df.clone()
876}
877
878/// Stub partition id - always 0 (PySpark spark_partition_id).
879pub fn spark_partition_id() -> Column {
880    Column::from_expr(lit(0i32), Some("spark_partition_id".to_string()))
881}
882
883/// Stub input file name - empty string (PySpark input_file_name).
884pub fn input_file_name() -> Column {
885    Column::from_expr(lit(""), Some("input_file_name".to_string()))
886}
887
888/// Stub monotonically_increasing_id - constant 0 (PySpark monotonically_increasing_id).
889/// Note: differs from PySpark which is unique per-row; see PYSPARK_DIFFERENCES.md.
890pub fn monotonically_increasing_id() -> Column {
891    Column::from_expr(lit(0i64), Some("monotonically_increasing_id".to_string()))
892}
893
894/// Current catalog name stub (PySpark current_catalog).
895pub fn current_catalog() -> Column {
896    Column::from_expr(lit("spark_catalog"), Some("current_catalog".to_string()))
897}
898
899/// Current database/schema name stub (PySpark current_database).
900pub fn current_database() -> Column {
901    Column::from_expr(lit("default"), Some("current_database".to_string()))
902}
903
904/// Current schema name stub (PySpark current_schema).
905pub fn current_schema() -> Column {
906    Column::from_expr(lit("default"), Some("current_schema".to_string()))
907}
908
909/// Current user stub (PySpark current_user).
910pub fn current_user() -> Column {
911    Column::from_expr(lit("unknown"), Some("current_user".to_string()))
912}
913
914/// User stub (PySpark user).
915pub fn user() -> Column {
916    Column::from_expr(lit("unknown"), Some("user".to_string()))
917}
918
919/// Random uniform [0, 1) per row, with optional seed (PySpark rand).
920/// When added via with_column, generates one distinct value per row (PySpark-like).
921pub fn rand(seed: Option<u64>) -> Column {
922    Column::from_rand(seed)
923}
924
925/// Random standard normal per row, with optional seed (PySpark randn).
926/// When added via with_column, generates one distinct value per row (PySpark-like).
927pub fn randn(seed: Option<u64>) -> Column {
928    Column::from_randn(seed)
929}
930
931/// Call a registered UDF by name. PySpark: F.call_udf(udfName, *cols).
932/// Requires a session (set by get_or_create). Raises if UDF not found.
933pub fn call_udf(name: &str, cols: &[Column]) -> Result<Column, PolarsError> {
934    use polars::prelude::Column as PlColumn;
935
936    let session = crate::session::get_thread_udf_session().ok_or_else(|| {
937        PolarsError::InvalidOperation(
938            "call_udf: no session. Use SparkSession.builder().get_or_create() first.".into(),
939        )
940    })?;
941    let case_sensitive = session.is_case_sensitive();
942
943    // Rust UDF: build lazy Expr
944    let udf = session
945        .udf_registry
946        .get_rust_udf(name, case_sensitive)
947        .ok_or_else(|| {
948            PolarsError::InvalidOperation(format!("call_udf: UDF '{name}' not found").into())
949        })?;
950
951    let exprs: Vec<Expr> = cols.iter().map(|c| c.expr().clone()).collect();
952    let output_type = DataType::String; // PySpark default
953
954    let expr = if exprs.len() == 1 {
955        let udf = udf.clone();
956        exprs.into_iter().next().unwrap().map(
957            move |c| {
958                let s = c.take_materialized_series();
959                udf.apply(&[s]).map(|out| PlColumn::new("_".into(), out))
960            },
961            move |_schema, field| Ok(Field::new(field.name().clone(), output_type.clone())),
962        )
963    } else {
964        let udf = udf.clone();
965        let first = exprs[0].clone();
966        let rest: Vec<Expr> = exprs[1..].to_vec();
967        first.map_many(
968            move |columns| {
969                let series: Vec<Series> = columns
970                    .iter_mut()
971                    .map(|c| std::mem::take(c).take_materialized_series())
972                    .collect();
973                udf.apply(&series).map(|out| PlColumn::new("_".into(), out))
974            },
975            &rest,
976            move |_schema, fields| Ok(Field::new(fields[0].name().clone(), output_type.clone())),
977        )
978    };
979
980    Ok(Column::from_expr(expr, Some(format!("{name}()"))))
981}
982
983/// True if two arrays have any element in common (PySpark arrays_overlap).
984pub fn arrays_overlap(left: &Column, right: &Column) -> Column {
985    left.clone().arrays_overlap(right)
986}
987
988/// Zip arrays into array of structs (PySpark arrays_zip).
989pub fn arrays_zip(left: &Column, right: &Column) -> Column {
990    left.clone().arrays_zip(right)
991}
992
993/// Explode; null/empty yields one row with null (PySpark explode_outer).
994pub fn explode_outer(column: &Column) -> Column {
995    column.clone().explode_outer()
996}
997
998/// Posexplode with null preservation (PySpark posexplode_outer).
999pub fn posexplode_outer(column: &Column) -> (Column, Column) {
1000    column.clone().posexplode_outer()
1001}
1002
1003/// Collect to array (PySpark array_agg).
1004pub fn array_agg(column: &Column) -> Column {
1005    column.clone().array_agg()
1006}
1007
1008/// Transform map keys by expr (PySpark transform_keys).
1009pub fn transform_keys(column: &Column, key_expr: Expr) -> Column {
1010    column.clone().transform_keys(key_expr)
1011}
1012
1013/// Transform map values by expr (PySpark transform_values).
1014pub fn transform_values(column: &Column, value_expr: Expr) -> Column {
1015    column.clone().transform_values(value_expr)
1016}
1017
1018/// Parse string to map (PySpark str_to_map). Default delims: "," and ":".
1019pub fn str_to_map(
1020    column: &Column,
1021    pair_delim: Option<&str>,
1022    key_value_delim: Option<&str>,
1023) -> Column {
1024    let pd = pair_delim.unwrap_or(",");
1025    let kvd = key_value_delim.unwrap_or(":");
1026    column.clone().str_to_map(pd, kvd)
1027}
1028
1029/// Extract first match of regex (PySpark regexp_extract). group_index 0 = full match.
1030pub fn regexp_extract(column: &Column, pattern: &str, group_index: usize) -> Column {
1031    column.clone().regexp_extract(pattern, group_index)
1032}
1033
1034/// Replace first match of regex (PySpark regexp_replace)
1035pub fn regexp_replace(column: &Column, pattern: &str, replacement: &str) -> Column {
1036    column.clone().regexp_replace(pattern, replacement)
1037}
1038
1039/// Split string by delimiter (PySpark split). Optional limit: at most that many parts (remainder in last).
1040pub fn split(column: &Column, delimiter: &str, limit: Option<i32>) -> Column {
1041    column.clone().split(delimiter, limit)
1042}
1043
1044/// Title case (PySpark initcap)
1045pub fn initcap(column: &Column) -> Column {
1046    column.clone().initcap()
1047}
1048
1049/// Extract all matches of regex (PySpark regexp_extract_all).
1050pub fn regexp_extract_all(column: &Column, pattern: &str) -> Column {
1051    column.clone().regexp_extract_all(pattern)
1052}
1053
1054/// Check if string matches regex (PySpark regexp_like / rlike).
1055pub fn regexp_like(column: &Column, pattern: &str) -> Column {
1056    column.clone().regexp_like(pattern)
1057}
1058
1059/// Count of non-overlapping regex matches (PySpark regexp_count).
1060pub fn regexp_count(column: &Column, pattern: &str) -> Column {
1061    column.clone().regexp_count(pattern)
1062}
1063
1064/// First substring matching regex (PySpark regexp_substr). Null if no match.
1065pub fn regexp_substr(column: &Column, pattern: &str) -> Column {
1066    column.clone().regexp_substr(pattern)
1067}
1068
1069/// Split by delimiter and return 1-based part (PySpark split_part).
1070pub fn split_part(column: &Column, delimiter: &str, part_num: i64) -> Column {
1071    column.clone().split_part(delimiter, part_num)
1072}
1073
1074/// 1-based position of first regex match (PySpark regexp_instr).
1075pub fn regexp_instr(column: &Column, pattern: &str, group_idx: Option<usize>) -> Column {
1076    column.clone().regexp_instr(pattern, group_idx)
1077}
1078
1079/// 1-based index of str in comma-delimited set (PySpark find_in_set). 0 if not found or str contains comma.
1080pub fn find_in_set(str_column: &Column, set_column: &Column) -> Column {
1081    str_column.clone().find_in_set(set_column)
1082}
1083
1084/// Printf-style format (PySpark format_string). Supports %s, %d, %i, %f, %g, %%.
1085pub fn format_string(format: &str, columns: &[&Column]) -> Column {
1086    use polars::prelude::*;
1087    if columns.is_empty() {
1088        panic!("format_string needs at least one column");
1089    }
1090    let format_owned = format.to_string();
1091    let args: Vec<Expr> = columns.iter().skip(1).map(|c| c.expr().clone()).collect();
1092    let expr = columns[0].expr().clone().map_many(
1093        move |cols| {
1094            crate::column::expect_col(crate::udfs::apply_format_string(cols, &format_owned))
1095        },
1096        &args,
1097        |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::String)),
1098    );
1099    crate::column::Column::from_expr(expr, None)
1100}
1101
1102/// Alias for format_string (PySpark printf).
1103pub fn printf(format: &str, columns: &[&Column]) -> Column {
1104    format_string(format, columns)
1105}
1106
1107/// Repeat string n times (PySpark repeat).
1108pub fn repeat(column: &Column, n: i32) -> Column {
1109    column.clone().repeat(n)
1110}
1111
1112/// Reverse string (PySpark reverse).
1113pub fn reverse(column: &Column) -> Column {
1114    column.clone().reverse()
1115}
1116
1117/// Find substring position 1-based; 0 if not found (PySpark instr).
1118pub fn instr(column: &Column, substr: &str) -> Column {
1119    column.clone().instr(substr)
1120}
1121
1122/// Position of substring in column (PySpark position). Same as instr; (substr, col) argument order.
1123pub fn position(substr: &str, column: &Column) -> Column {
1124    column.clone().instr(substr)
1125}
1126
1127/// ASCII value of first character (PySpark ascii). Returns Int32.
1128pub fn ascii(column: &Column) -> Column {
1129    column.clone().ascii()
1130}
1131
1132/// Format numeric as string with fixed decimal places (PySpark format_number).
1133pub fn format_number(column: &Column, decimals: u32) -> Column {
1134    column.clone().format_number(decimals)
1135}
1136
1137/// Replace substring at 1-based position (PySpark overlay). replace is literal.
1138pub fn overlay(column: &Column, replace: &str, pos: i64, length: i64) -> Column {
1139    column.clone().overlay(replace, pos, length)
1140}
1141
1142/// Int to single-character string (PySpark char). Valid codepoint only.
1143pub fn char(column: &Column) -> Column {
1144    column.clone().char()
1145}
1146
1147/// Alias for char (PySpark chr).
1148pub fn chr(column: &Column) -> Column {
1149    column.clone().chr()
1150}
1151
1152/// Base64 encode string bytes (PySpark base64).
1153pub fn base64(column: &Column) -> Column {
1154    column.clone().base64()
1155}
1156
1157/// Base64 decode to string (PySpark unbase64). Invalid decode → null.
1158pub fn unbase64(column: &Column) -> Column {
1159    column.clone().unbase64()
1160}
1161
1162/// SHA1 hash of string bytes, return hex string (PySpark sha1).
1163pub fn sha1(column: &Column) -> Column {
1164    column.clone().sha1()
1165}
1166
1167/// SHA2 hash; bit_length 256, 384, or 512 (PySpark sha2).
1168pub fn sha2(column: &Column, bit_length: i32) -> Column {
1169    column.clone().sha2(bit_length)
1170}
1171
1172/// MD5 hash of string bytes, return hex string (PySpark md5).
1173pub fn md5(column: &Column) -> Column {
1174    column.clone().md5()
1175}
1176
1177/// Left-pad string to length with pad char (PySpark lpad).
1178pub fn lpad(column: &Column, length: i32, pad: &str) -> Column {
1179    column.clone().lpad(length, pad)
1180}
1181
1182/// Right-pad string to length with pad char (PySpark rpad).
1183pub fn rpad(column: &Column, length: i32, pad: &str) -> Column {
1184    column.clone().rpad(length, pad)
1185}
1186
1187/// Character-by-character translation (PySpark translate).
1188pub fn translate(column: &Column, from_str: &str, to_str: &str) -> Column {
1189    column.clone().translate(from_str, to_str)
1190}
1191
1192/// Mask string: replace upper/lower/digit/other with given chars (PySpark mask).
1193pub fn mask(
1194    column: &Column,
1195    upper_char: Option<char>,
1196    lower_char: Option<char>,
1197    digit_char: Option<char>,
1198    other_char: Option<char>,
1199) -> Column {
1200    column
1201        .clone()
1202        .mask(upper_char, lower_char, digit_char, other_char)
1203}
1204
1205/// Substring before/after nth delimiter (PySpark substring_index).
1206pub fn substring_index(column: &Column, delimiter: &str, count: i64) -> Column {
1207    column.clone().substring_index(delimiter, count)
1208}
1209
1210/// Leftmost n characters (PySpark left).
1211pub fn left(column: &Column, n: i64) -> Column {
1212    column.clone().left(n)
1213}
1214
1215/// Rightmost n characters (PySpark right).
1216pub fn right(column: &Column, n: i64) -> Column {
1217    column.clone().right(n)
1218}
1219
1220/// Replace all occurrences of search with replacement (literal). PySpark replace.
1221pub fn replace(column: &Column, search: &str, replacement: &str) -> Column {
1222    column.clone().replace(search, replacement)
1223}
1224
1225/// True if string starts with prefix (PySpark startswith).
1226pub fn startswith(column: &Column, prefix: &str) -> Column {
1227    column.clone().startswith(prefix)
1228}
1229
1230/// True if string ends with suffix (PySpark endswith).
1231pub fn endswith(column: &Column, suffix: &str) -> Column {
1232    column.clone().endswith(suffix)
1233}
1234
1235/// True if string contains substring (literal). PySpark contains.
1236pub fn contains(column: &Column, substring: &str) -> Column {
1237    column.clone().contains(substring)
1238}
1239
1240/// SQL LIKE pattern (% any, _ one char). PySpark like.
1241/// When escape_char is Some(esc), esc + char treats that char as literal.
1242pub fn like(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1243    column.clone().like(pattern, escape_char)
1244}
1245
1246/// Case-insensitive LIKE. PySpark ilike.
1247/// When escape_char is Some(esc), esc + char treats that char as literal.
1248pub fn ilike(column: &Column, pattern: &str, escape_char: Option<char>) -> Column {
1249    column.clone().ilike(pattern, escape_char)
1250}
1251
1252/// Alias for regexp_like. PySpark rlike / regexp.
1253pub fn rlike(column: &Column, pattern: &str) -> Column {
1254    column.clone().regexp_like(pattern)
1255}
1256
1257/// Alias for rlike (PySpark regexp).
1258pub fn regexp(column: &Column, pattern: &str) -> Column {
1259    rlike(column, pattern)
1260}
1261
1262/// Soundex code (PySpark soundex). Not implemented: requires element-wise UDF.
1263pub fn soundex(column: &Column) -> Column {
1264    column.clone().soundex()
1265}
1266
1267/// Levenshtein distance (PySpark levenshtein). Not implemented: requires element-wise UDF.
1268pub fn levenshtein(column: &Column, other: &Column) -> Column {
1269    column.clone().levenshtein(other)
1270}
1271
1272/// CRC32 of string bytes (PySpark crc32). Not implemented: requires element-wise UDF.
1273pub fn crc32(column: &Column) -> Column {
1274    column.clone().crc32()
1275}
1276
1277/// XXH64 hash (PySpark xxhash64). Not implemented: requires element-wise UDF.
1278pub fn xxhash64(column: &Column) -> Column {
1279    column.clone().xxhash64()
1280}
1281
1282/// Absolute value (PySpark abs)
1283pub fn abs(column: &Column) -> Column {
1284    column.clone().abs()
1285}
1286
1287/// Ceiling (PySpark ceil)
1288pub fn ceil(column: &Column) -> Column {
1289    column.clone().ceil()
1290}
1291
1292/// Floor (PySpark floor)
1293pub fn floor(column: &Column) -> Column {
1294    column.clone().floor()
1295}
1296
1297/// Round (PySpark round)
1298pub fn round(column: &Column, decimals: u32) -> Column {
1299    column.clone().round(decimals)
1300}
1301
1302/// Banker's rounding - round half to even (PySpark bround).
1303pub fn bround(column: &Column, scale: i32) -> Column {
1304    column.clone().bround(scale)
1305}
1306
1307/// Unary minus / negate (PySpark negate, negative).
1308pub fn negate(column: &Column) -> Column {
1309    column.clone().negate()
1310}
1311
1312/// Alias for negate. PySpark negative.
1313pub fn negative(column: &Column) -> Column {
1314    negate(column)
1315}
1316
1317/// Unary plus - no-op, returns column as-is (PySpark positive).
1318pub fn positive(column: &Column) -> Column {
1319    column.clone()
1320}
1321
1322/// Cotangent: 1/tan (PySpark cot).
1323pub fn cot(column: &Column) -> Column {
1324    column.clone().cot()
1325}
1326
1327/// Cosecant: 1/sin (PySpark csc).
1328pub fn csc(column: &Column) -> Column {
1329    column.clone().csc()
1330}
1331
1332/// Secant: 1/cos (PySpark sec).
1333pub fn sec(column: &Column) -> Column {
1334    column.clone().sec()
1335}
1336
1337/// Constant e = 2.718... (PySpark e).
1338pub fn e() -> Column {
1339    Column::from_expr(lit(std::f64::consts::E), Some("e".to_string()))
1340}
1341
1342/// Constant pi = 3.14159... (PySpark pi).
1343pub fn pi() -> Column {
1344    Column::from_expr(lit(std::f64::consts::PI), Some("pi".to_string()))
1345}
1346
1347/// Square root (PySpark sqrt)
1348pub fn sqrt(column: &Column) -> Column {
1349    column.clone().sqrt()
1350}
1351
1352/// Power (PySpark pow)
1353pub fn pow(column: &Column, exp: i64) -> Column {
1354    column.clone().pow(exp)
1355}
1356
1357/// Exponential (PySpark exp)
1358pub fn exp(column: &Column) -> Column {
1359    column.clone().exp()
1360}
1361
1362/// Natural logarithm (PySpark log with one arg)
1363pub fn log(column: &Column) -> Column {
1364    column.clone().log()
1365}
1366
1367/// Logarithm with given base (PySpark log(col, base)). base must be positive and not 1.
1368pub fn log_with_base(column: &Column, base: f64) -> Column {
1369    crate::column::Column::from_expr(column.expr().clone().log(lit(base)), None)
1370}
1371
1372/// Sine in radians (PySpark sin)
1373pub fn sin(column: &Column) -> Column {
1374    column.clone().sin()
1375}
1376
1377/// Cosine in radians (PySpark cos)
1378pub fn cos(column: &Column) -> Column {
1379    column.clone().cos()
1380}
1381
1382/// Tangent in radians (PySpark tan)
1383pub fn tan(column: &Column) -> Column {
1384    column.clone().tan()
1385}
1386
1387/// Arc sine (PySpark asin)
1388pub fn asin(column: &Column) -> Column {
1389    column.clone().asin()
1390}
1391
1392/// Arc cosine (PySpark acos)
1393pub fn acos(column: &Column) -> Column {
1394    column.clone().acos()
1395}
1396
1397/// Arc tangent (PySpark atan)
1398pub fn atan(column: &Column) -> Column {
1399    column.clone().atan()
1400}
1401
1402/// Two-argument arc tangent atan2(y, x) in radians (PySpark atan2)
1403pub fn atan2(y: &Column, x: &Column) -> Column {
1404    y.clone().atan2(x)
1405}
1406
1407/// Convert radians to degrees (PySpark degrees)
1408pub fn degrees(column: &Column) -> Column {
1409    column.clone().degrees()
1410}
1411
1412/// Convert degrees to radians (PySpark radians)
1413pub fn radians(column: &Column) -> Column {
1414    column.clone().radians()
1415}
1416
1417/// Sign of the number: -1, 0, or 1 (PySpark signum)
1418pub fn signum(column: &Column) -> Column {
1419    column.clone().signum()
1420}
1421
1422/// Alias for signum (PySpark sign).
1423pub fn sign(column: &Column) -> Column {
1424    signum(column)
1425}
1426
1427/// Cast column to the given type (PySpark cast). Fails on invalid conversion.
1428/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1429/// String-to-date accepts date and datetime strings (e.g. "2025-01-01 10:30:00" truncates to date) for Spark parity.
1430pub fn cast(column: &Column, type_name: &str) -> Result<Column, String> {
1431    let dtype = parse_type_name(type_name)?;
1432    if dtype == DataType::Boolean {
1433        let expr = column.expr().clone().map(
1434            |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, true)),
1435            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1436        );
1437        return Ok(Column::from_expr(expr, None));
1438    }
1439    if dtype == DataType::Date {
1440        let expr = column.expr().clone().map(
1441            |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, true)),
1442            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1443        );
1444        return Ok(Column::from_expr(expr, None));
1445    }
1446    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1447        let target = dtype.clone();
1448        // cast: strict=true – invalid strings should error (PySpark parity).
1449        let expr = column.expr().clone().map(
1450            move |col| {
1451                crate::column::expect_col(crate::udfs::apply_string_to_int(
1452                    col,
1453                    true,
1454                    target.clone(),
1455                ))
1456            },
1457            move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1458        );
1459        return Ok(Column::from_expr(expr, None));
1460    }
1461    if dtype == DataType::Float64 {
1462        // String-to-double uses custom parsing for Spark-style to_number semantics.
1463        let expr = column.expr().clone().map(
1464            |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, true)),
1465            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1466        );
1467        return Ok(Column::from_expr(expr, None));
1468    }
1469    Ok(Column::from_expr(
1470        column.expr().clone().strict_cast(dtype),
1471        None,
1472    ))
1473}
1474
1475/// Cast column to the given type, returning null on invalid conversion (PySpark try_cast).
1476/// String-to-boolean uses custom parsing ("true"/"false"/"1"/"0") since Polars does not support Utf8->Boolean.
1477/// String-to-date accepts date and datetime strings; invalid strings become null.
1478pub fn try_cast(column: &Column, type_name: &str) -> Result<Column, String> {
1479    let dtype = parse_type_name(type_name)?;
1480    if dtype == DataType::Boolean {
1481        let expr = column.expr().clone().map(
1482            |col| crate::column::expect_col(crate::udfs::apply_string_to_boolean(col, false)),
1483            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Boolean)),
1484        );
1485        return Ok(Column::from_expr(expr, None));
1486    }
1487    if dtype == DataType::Date {
1488        let expr = column.expr().clone().map(
1489            |col| crate::column::expect_col(crate::udfs::apply_string_to_date(col, false)),
1490            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1491        );
1492        return Ok(Column::from_expr(expr, None));
1493    }
1494    if dtype == DataType::Int32 || dtype == DataType::Int64 {
1495        let target = dtype.clone();
1496        let expr = column.expr().clone().map(
1497            move |col| {
1498                crate::column::expect_col(crate::udfs::apply_string_to_int(
1499                    col,
1500                    false,
1501                    target.clone(),
1502                ))
1503            },
1504            move |_schema, field| Ok(Field::new(field.name().clone(), dtype.clone())),
1505        );
1506        return Ok(Column::from_expr(expr, None));
1507    }
1508    if dtype == DataType::Float64 {
1509        let expr = column.expr().clone().map(
1510            |col| crate::column::expect_col(crate::udfs::apply_string_to_double(col, false)),
1511            |_schema, field| Ok(Field::new(field.name().clone(), DataType::Float64)),
1512        );
1513        return Ok(Column::from_expr(expr, None));
1514    }
1515    Ok(Column::from_expr(column.expr().clone().cast(dtype), None))
1516}
1517
1518/// Cast to string, optionally with format for datetime (PySpark to_char, to_varchar).
1519/// When format is Some, uses date_format for datetime columns (PySpark format → chrono strftime); otherwise cast to string.
1520/// Returns Err if the cast to string fails (invalid type name or unsupported column type).
1521pub fn to_char(column: &Column, format: Option<&str>) -> Result<Column, String> {
1522    match format {
1523        Some(fmt) => Ok(column
1524            .clone()
1525            .date_format(&crate::udfs::pyspark_format_to_chrono(fmt))),
1526        None => cast(column, "string"),
1527    }
1528}
1529
1530/// Alias for to_char (PySpark to_varchar).
1531pub fn to_varchar(column: &Column, format: Option<&str>) -> Result<Column, String> {
1532    to_char(column, format)
1533}
1534
1535/// Cast to numeric (PySpark to_number). Uses Double. Format parameter reserved for future use.
1536/// Returns Err if the cast to double fails (invalid type name or unsupported column type).
1537pub fn to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1538    cast(column, "double")
1539}
1540
1541/// Cast to numeric, null on invalid (PySpark try_to_number). Format parameter reserved for future use.
1542/// Returns Err if the try_cast setup fails (invalid type name); column values that cannot be parsed become null.
1543pub fn try_to_number(column: &Column, _format: Option<&str>) -> Result<Column, String> {
1544    try_cast(column, "double")
1545}
1546
1547/// Cast to timestamp, or parse with format when provided (PySpark to_timestamp).
1548/// When format is None, parses string columns with default format "%Y-%m-%d %H:%M:%S" (PySpark parity #273).
1549pub fn to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1550    use polars::prelude::{DataType, Field, TimeUnit};
1551    let fmt_owned = format.map(|s| s.to_string());
1552    let expr = column.expr().clone().map(
1553        move |s| {
1554            crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1555                s,
1556                fmt_owned.as_deref(),
1557                true,
1558            ))
1559        },
1560        |_schema, field| {
1561            Ok(Field::new(
1562                field.name().clone(),
1563                DataType::Datetime(TimeUnit::Microseconds, None),
1564            ))
1565        },
1566    );
1567    Ok(crate::column::Column::from_expr(expr, None))
1568}
1569
1570/// Cast to timestamp, null on invalid, or parse with format when provided (PySpark try_to_timestamp).
1571/// When format is None, parses string columns with default format (null on invalid). #273
1572pub fn try_to_timestamp(column: &Column, format: Option<&str>) -> Result<Column, String> {
1573    use polars::prelude::*;
1574    let fmt_owned = format.map(|s| s.to_string());
1575    let expr = column.expr().clone().map(
1576        move |s| {
1577            crate::column::expect_col(crate::udfs::apply_to_timestamp_format(
1578                s,
1579                fmt_owned.as_deref(),
1580                false,
1581            ))
1582        },
1583        |_schema, field| {
1584            Ok(Field::new(
1585                field.name().clone(),
1586                DataType::Datetime(TimeUnit::Microseconds, None),
1587            ))
1588        },
1589    );
1590    Ok(crate::column::Column::from_expr(expr, None))
1591}
1592
1593/// Parse as timestamp in local timezone, return UTC (PySpark to_timestamp_ltz).
1594pub fn to_timestamp_ltz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1595    use polars::prelude::{DataType, Field, TimeUnit};
1596    match format {
1597        None => crate::cast(column, "timestamp"),
1598        Some(fmt) => {
1599            let fmt_owned = fmt.to_string();
1600            let expr = column.expr().clone().map(
1601                move |s| {
1602                    crate::column::expect_col(crate::udfs::apply_to_timestamp_ltz_format(
1603                        s,
1604                        Some(&fmt_owned),
1605                        true,
1606                    ))
1607                },
1608                |_schema, field| {
1609                    Ok(Field::new(
1610                        field.name().clone(),
1611                        DataType::Datetime(TimeUnit::Microseconds, None),
1612                    ))
1613                },
1614            );
1615            Ok(crate::column::Column::from_expr(expr, None))
1616        }
1617    }
1618}
1619
1620/// Parse as timestamp without timezone (PySpark to_timestamp_ntz). Returns Datetime(_, None).
1621pub fn to_timestamp_ntz(column: &Column, format: Option<&str>) -> Result<Column, String> {
1622    use polars::prelude::{DataType, Field, TimeUnit};
1623    match format {
1624        None => crate::cast(column, "timestamp"),
1625        Some(fmt) => {
1626            let fmt_owned = fmt.to_string();
1627            let expr = column.expr().clone().map(
1628                move |s| {
1629                    crate::column::expect_col(crate::udfs::apply_to_timestamp_ntz_format(
1630                        s,
1631                        Some(&fmt_owned),
1632                        true,
1633                    ))
1634                },
1635                |_schema, field| {
1636                    Ok(Field::new(
1637                        field.name().clone(),
1638                        DataType::Datetime(TimeUnit::Microseconds, None),
1639                    ))
1640                },
1641            );
1642            Ok(crate::column::Column::from_expr(expr, None))
1643        }
1644    }
1645}
1646
1647/// Division that returns null on divide-by-zero (PySpark try_divide).
1648pub fn try_divide(left: &Column, right: &Column) -> Column {
1649    use polars::prelude::*;
1650    let zero_cond = right.expr().clone().cast(DataType::Float64).eq(lit(0.0f64));
1651    let null_expr = lit(NULL);
1652    let div_expr =
1653        left.expr().clone().cast(DataType::Float64) / right.expr().clone().cast(DataType::Float64);
1654    let expr = polars::prelude::when(zero_cond)
1655        .then(null_expr)
1656        .otherwise(div_expr);
1657    crate::column::Column::from_expr(expr, None)
1658}
1659
1660/// Add that returns null on overflow (PySpark try_add). Uses checked arithmetic.
1661pub fn try_add(left: &Column, right: &Column) -> Column {
1662    let args = [right.expr().clone()];
1663    let expr = left.expr().clone().map_many(
1664        |cols| crate::column::expect_col(crate::udfs::apply_try_add(cols)),
1665        &args,
1666        |_schema, fields| Ok(fields[0].clone()),
1667    );
1668    Column::from_expr(expr, None)
1669}
1670
1671/// Subtract that returns null on overflow (PySpark try_subtract).
1672pub fn try_subtract(left: &Column, right: &Column) -> Column {
1673    let args = [right.expr().clone()];
1674    let expr = left.expr().clone().map_many(
1675        |cols| crate::column::expect_col(crate::udfs::apply_try_subtract(cols)),
1676        &args,
1677        |_schema, fields| Ok(fields[0].clone()),
1678    );
1679    Column::from_expr(expr, None)
1680}
1681
1682/// Multiply that returns null on overflow (PySpark try_multiply).
1683pub fn try_multiply(left: &Column, right: &Column) -> Column {
1684    let args = [right.expr().clone()];
1685    let expr = left.expr().clone().map_many(
1686        |cols| crate::column::expect_col(crate::udfs::apply_try_multiply(cols)),
1687        &args,
1688        |_schema, fields| Ok(fields[0].clone()),
1689    );
1690    Column::from_expr(expr, None)
1691}
1692
1693/// Element at index, null if out of bounds (PySpark try_element_at). Same as element_at for lists.
1694pub fn try_element_at(column: &Column, index: i64) -> Column {
1695    column.clone().element_at(index)
1696}
1697
1698/// Assign value to histogram bucket (PySpark width_bucket). Returns 0 if v < min_val, num_bucket+1 if v >= max_val.
1699pub fn width_bucket(value: &Column, min_val: f64, max_val: f64, num_bucket: i64) -> Column {
1700    if num_bucket <= 0 {
1701        panic!(
1702            "width_bucket: num_bucket must be positive, got {}",
1703            num_bucket
1704        );
1705    }
1706    use polars::prelude::*;
1707    let v = value.expr().clone().cast(DataType::Float64);
1708    let min_expr = lit(min_val);
1709    let max_expr = lit(max_val);
1710    let nb = num_bucket as f64;
1711    let width = (max_val - min_val) / nb;
1712    let bucket_expr = (v.clone() - min_expr.clone()) / lit(width);
1713    let floor_bucket = bucket_expr.floor().cast(DataType::Int64) + lit(1i64);
1714    let bucket_clamped = floor_bucket.clip(lit(1i64), lit(num_bucket));
1715    let expr = polars::prelude::when(v.clone().lt(min_expr))
1716        .then(lit(0i64))
1717        .when(v.gt_eq(max_expr))
1718        .then(lit(num_bucket + 1))
1719        .otherwise(bucket_clamped);
1720    crate::column::Column::from_expr(expr, None)
1721}
1722
1723/// Return column at 1-based index (PySpark elt). elt(2, a, b, c) returns b.
1724pub fn elt(index: &Column, columns: &[&Column]) -> Column {
1725    use polars::prelude::*;
1726    if columns.is_empty() {
1727        panic!("elt requires at least one column");
1728    }
1729    let idx_expr = index.expr().clone();
1730    let null_expr = lit(NULL);
1731    let mut expr = null_expr;
1732    for (i, c) in columns.iter().enumerate().rev() {
1733        let n = (i + 1) as i64;
1734        expr = polars::prelude::when(idx_expr.clone().eq(lit(n)))
1735            .then(c.expr().clone())
1736            .otherwise(expr);
1737    }
1738    crate::column::Column::from_expr(expr, None)
1739}
1740
1741/// Bit length of string (bytes * 8) (PySpark bit_length).
1742pub fn bit_length(column: &Column) -> Column {
1743    column.clone().bit_length()
1744}
1745
1746/// Length of string in bytes (PySpark octet_length).
1747pub fn octet_length(column: &Column) -> Column {
1748    column.clone().octet_length()
1749}
1750
1751/// Length of string in characters (PySpark char_length). Alias of length().
1752pub fn char_length(column: &Column) -> Column {
1753    column.clone().char_length()
1754}
1755
1756/// Length of string in characters (PySpark character_length). Alias of length().
1757pub fn character_length(column: &Column) -> Column {
1758    column.clone().character_length()
1759}
1760
1761/// Data type of column as string (PySpark typeof). Constant per column from schema.
1762pub fn typeof_(column: &Column) -> Column {
1763    column.clone().typeof_()
1764}
1765
1766/// True where the float value is NaN (PySpark isnan).
1767pub fn isnan(column: &Column) -> Column {
1768    column.clone().is_nan()
1769}
1770
1771/// Greatest of the given columns per row (PySpark greatest). Uses element-wise UDF.
1772pub fn greatest(columns: &[&Column]) -> Result<Column, String> {
1773    if columns.is_empty() {
1774        return Err("greatest requires at least one column".to_string());
1775    }
1776    if columns.len() == 1 {
1777        return Ok((*columns[0]).clone());
1778    }
1779    let mut expr = columns[0].expr().clone();
1780    for c in columns.iter().skip(1) {
1781        let args = [c.expr().clone()];
1782        expr = expr.map_many(
1783            |cols| crate::column::expect_col(crate::udfs::apply_greatest2(cols)),
1784            &args,
1785            |_schema, fields| Ok(fields[0].clone()),
1786        );
1787    }
1788    Ok(Column::from_expr(expr, None))
1789}
1790
1791/// Least of the given columns per row (PySpark least). Uses element-wise UDF.
1792pub fn least(columns: &[&Column]) -> Result<Column, String> {
1793    if columns.is_empty() {
1794        return Err("least requires at least one column".to_string());
1795    }
1796    if columns.len() == 1 {
1797        return Ok((*columns[0]).clone());
1798    }
1799    let mut expr = columns[0].expr().clone();
1800    for c in columns.iter().skip(1) {
1801        let args = [c.expr().clone()];
1802        expr = expr.map_many(
1803            |cols| crate::column::expect_col(crate::udfs::apply_least2(cols)),
1804            &args,
1805            |_schema, fields| Ok(fields[0].clone()),
1806        );
1807    }
1808    Ok(Column::from_expr(expr, None))
1809}
1810
1811/// Extract year from datetime column (PySpark year)
1812pub fn year(column: &Column) -> Column {
1813    column.clone().year()
1814}
1815
1816/// Extract month from datetime column (PySpark month)
1817pub fn month(column: &Column) -> Column {
1818    column.clone().month()
1819}
1820
1821/// Extract day of month from datetime column (PySpark day)
1822pub fn day(column: &Column) -> Column {
1823    column.clone().day()
1824}
1825
1826/// Cast or parse to date (PySpark to_date). When format is None: cast date/datetime to date, parse string with default formats. When format is Some: parse string with given format.
1827pub fn to_date(column: &Column, format: Option<&str>) -> Result<Column, String> {
1828    let fmt = format.map(|s| s.to_string());
1829    let expr = column.expr().clone().map(
1830        move |col| {
1831            crate::column::expect_col(crate::udfs::apply_string_to_date_format(
1832                col,
1833                fmt.as_deref(),
1834                false,
1835            ))
1836        },
1837        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Date)),
1838    );
1839    Ok(Column::from_expr(expr, None))
1840}
1841
1842/// Format date/datetime as string (PySpark date_format). Accepts PySpark/Java SimpleDateFormat style (e.g. "yyyy-MM") and converts to chrono strftime internally.
1843pub fn date_format(column: &Column, format: &str) -> Column {
1844    column
1845        .clone()
1846        .date_format(&crate::udfs::pyspark_format_to_chrono(format))
1847}
1848
1849/// Current date (evaluation time). PySpark current_date.
1850pub fn current_date() -> Column {
1851    use polars::prelude::*;
1852    let today = chrono::Utc::now().date_naive();
1853    let days = (today - crate::date_utils::epoch_naive_date()).num_days() as i32;
1854    crate::column::Column::from_expr(
1855        Expr::Literal(LiteralValue::Scalar(Scalar::new_date(days))),
1856        None,
1857    )
1858}
1859
1860/// Current timestamp (evaluation time). PySpark current_timestamp.
1861pub fn current_timestamp() -> Column {
1862    use polars::prelude::*;
1863    let ts = chrono::Utc::now().timestamp_micros();
1864    crate::column::Column::from_expr(
1865        Expr::Literal(LiteralValue::Scalar(Scalar::new_datetime(
1866            ts,
1867            TimeUnit::Microseconds,
1868            None,
1869        ))),
1870        None,
1871    )
1872}
1873
1874/// Alias for current_date (PySpark curdate).
1875pub fn curdate() -> Column {
1876    current_date()
1877}
1878
1879/// Alias for current_timestamp (PySpark now).
1880pub fn now() -> Column {
1881    current_timestamp()
1882}
1883
1884/// Alias for current_timestamp (PySpark localtimestamp).
1885pub fn localtimestamp() -> Column {
1886    current_timestamp()
1887}
1888
1889/// Alias for datediff (PySpark date_diff). date_diff(end, start).
1890pub fn date_diff(end: &Column, start: &Column) -> Column {
1891    datediff(end, start)
1892}
1893
1894/// Alias for date_add (PySpark dateadd).
1895pub fn dateadd(column: &Column, n: i32) -> Column {
1896    date_add(column, n)
1897}
1898
1899/// Extract field from date/datetime (PySpark extract). field: year, month, day, hour, minute, second, quarter, week, dayofweek, dayofyear.
1900pub fn extract(column: &Column, field: &str) -> Column {
1901    column.clone().extract(field)
1902}
1903
1904/// Alias for extract (PySpark date_part).
1905pub fn date_part(column: &Column, field: &str) -> Column {
1906    extract(column, field)
1907}
1908
1909/// Alias for extract (PySpark datepart).
1910pub fn datepart(column: &Column, field: &str) -> Column {
1911    extract(column, field)
1912}
1913
1914/// Timestamp to microseconds since epoch (PySpark unix_micros).
1915pub fn unix_micros(column: &Column) -> Column {
1916    column.clone().unix_micros()
1917}
1918
1919/// Timestamp to milliseconds since epoch (PySpark unix_millis).
1920pub fn unix_millis(column: &Column) -> Column {
1921    column.clone().unix_millis()
1922}
1923
1924/// Timestamp to seconds since epoch (PySpark unix_seconds).
1925pub fn unix_seconds(column: &Column) -> Column {
1926    column.clone().unix_seconds()
1927}
1928
1929/// Weekday name "Mon","Tue",... (PySpark dayname).
1930pub fn dayname(column: &Column) -> Column {
1931    column.clone().dayname()
1932}
1933
1934/// Weekday 0=Mon, 6=Sun (PySpark weekday).
1935pub fn weekday(column: &Column) -> Column {
1936    column.clone().weekday()
1937}
1938
1939/// Extract hour from datetime column (PySpark hour).
1940pub fn hour(column: &Column) -> Column {
1941    column.clone().hour()
1942}
1943
1944/// Extract minute from datetime column (PySpark minute).
1945pub fn minute(column: &Column) -> Column {
1946    column.clone().minute()
1947}
1948
1949/// Extract second from datetime column (PySpark second).
1950pub fn second(column: &Column) -> Column {
1951    column.clone().second()
1952}
1953
1954/// Add n days to date column (PySpark date_add).
1955pub fn date_add(column: &Column, n: i32) -> Column {
1956    column.clone().date_add(n)
1957}
1958
1959/// Subtract n days from date column (PySpark date_sub).
1960pub fn date_sub(column: &Column, n: i32) -> Column {
1961    column.clone().date_sub(n)
1962}
1963
1964/// Number of days between two date columns (PySpark datediff).
1965pub fn datediff(end: &Column, start: &Column) -> Column {
1966    start.clone().datediff(end)
1967}
1968
1969/// Last day of month for date column (PySpark last_day).
1970pub fn last_day(column: &Column) -> Column {
1971    column.clone().last_day()
1972}
1973
1974/// Truncate date/datetime to unit (PySpark trunc).
1975pub fn trunc(column: &Column, format: &str) -> Column {
1976    column.clone().trunc(format)
1977}
1978
1979/// Alias for trunc (PySpark date_trunc).
1980pub fn date_trunc(format: &str, column: &Column) -> Column {
1981    trunc(column, format)
1982}
1983
1984/// Extract quarter (1-4) from date/datetime (PySpark quarter).
1985pub fn quarter(column: &Column) -> Column {
1986    column.clone().quarter()
1987}
1988
1989/// Extract ISO week of year (1-53) (PySpark weekofyear).
1990pub fn weekofyear(column: &Column) -> Column {
1991    column.clone().weekofyear()
1992}
1993
1994/// Extract day of week: 1=Sunday..7=Saturday (PySpark dayofweek).
1995pub fn dayofweek(column: &Column) -> Column {
1996    column.clone().dayofweek()
1997}
1998
1999/// Extract day of year (1-366) (PySpark dayofyear).
2000pub fn dayofyear(column: &Column) -> Column {
2001    column.clone().dayofyear()
2002}
2003
2004/// Add n months to date column (PySpark add_months).
2005pub fn add_months(column: &Column, n: i32) -> Column {
2006    column.clone().add_months(n)
2007}
2008
2009/// Months between end and start dates as fractional (PySpark months_between).
2010/// When round_off is true, rounds to 8 decimal places (PySpark default).
2011pub fn months_between(end: &Column, start: &Column, round_off: bool) -> Column {
2012    end.clone().months_between(start, round_off)
2013}
2014
2015/// Next date that is the given weekday (e.g. "Mon") (PySpark next_day).
2016pub fn next_day(column: &Column, day_of_week: &str) -> Column {
2017    column.clone().next_day(day_of_week)
2018}
2019
2020/// Current Unix timestamp in seconds (PySpark unix_timestamp with no args).
2021pub fn unix_timestamp_now() -> Column {
2022    use polars::prelude::*;
2023    let secs = chrono::Utc::now().timestamp();
2024    crate::column::Column::from_expr(lit(secs), None)
2025}
2026
2027/// Parse string timestamp to seconds since epoch (PySpark unix_timestamp). format defaults to yyyy-MM-dd HH:mm:ss.
2028pub fn unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2029    column.clone().unix_timestamp(format)
2030}
2031
2032/// Alias for unix_timestamp.
2033pub fn to_unix_timestamp(column: &Column, format: Option<&str>) -> Column {
2034    unix_timestamp(column, format)
2035}
2036
2037/// Convert seconds since epoch to formatted string (PySpark from_unixtime).
2038pub fn from_unixtime(column: &Column, format: Option<&str>) -> Column {
2039    column.clone().from_unixtime(format)
2040}
2041
2042/// Build date from year, month, day columns (PySpark make_date).
2043pub fn make_date(year: &Column, month: &Column, day: &Column) -> Column {
2044    use polars::prelude::*;
2045    let args = [month.expr().clone(), day.expr().clone()];
2046    let expr = year.expr().clone().map_many(
2047        |cols| crate::column::expect_col(crate::udfs::apply_make_date(cols)),
2048        &args,
2049        |_schema, fields| Ok(Field::new(fields[0].name().clone(), DataType::Date)),
2050    );
2051    crate::column::Column::from_expr(expr, None)
2052}
2053
2054/// make_timestamp(year, month, day, hour, min, sec, timezone?) - six columns to timestamp (PySpark make_timestamp).
2055/// When timezone is Some(tz), components are interpreted as local time in that zone, then converted to UTC.
2056pub fn make_timestamp(
2057    year: &Column,
2058    month: &Column,
2059    day: &Column,
2060    hour: &Column,
2061    minute: &Column,
2062    sec: &Column,
2063    timezone: Option<&str>,
2064) -> Column {
2065    use polars::prelude::*;
2066    let tz_owned = timezone.map(|s| s.to_string());
2067    let args = [
2068        month.expr().clone(),
2069        day.expr().clone(),
2070        hour.expr().clone(),
2071        minute.expr().clone(),
2072        sec.expr().clone(),
2073    ];
2074    let expr = year.expr().clone().map_many(
2075        move |cols| {
2076            crate::column::expect_col(crate::udfs::apply_make_timestamp(cols, tz_owned.as_deref()))
2077        },
2078        &args,
2079        |_schema, fields| {
2080            Ok(Field::new(
2081                fields[0].name().clone(),
2082                DataType::Datetime(TimeUnit::Microseconds, None),
2083            ))
2084        },
2085    );
2086    crate::column::Column::from_expr(expr, None)
2087}
2088
2089/// Add amount of unit to timestamp (PySpark timestampadd).
2090pub fn timestampadd(unit: &str, amount: &Column, ts: &Column) -> Column {
2091    ts.clone().timestampadd(unit, amount)
2092}
2093
2094/// Difference between timestamps in unit (PySpark timestampdiff).
2095pub fn timestampdiff(unit: &str, start: &Column, end: &Column) -> Column {
2096    start.clone().timestampdiff(unit, end)
2097}
2098
2099/// Interval of n days (PySpark days). For use in date_add, timestampadd, etc.
2100pub fn days(n: i64) -> Column {
2101    make_interval(0, 0, 0, n, 0, 0, 0)
2102}
2103
2104/// Interval of n hours (PySpark hours).
2105pub fn hours(n: i64) -> Column {
2106    make_interval(0, 0, 0, 0, n, 0, 0)
2107}
2108
2109/// Interval of n minutes (PySpark minutes).
2110pub fn minutes(n: i64) -> Column {
2111    make_interval(0, 0, 0, 0, 0, n, 0)
2112}
2113
2114/// Interval of n months (PySpark months). Approximated as 30*n days.
2115pub fn months(n: i64) -> Column {
2116    make_interval(0, n, 0, 0, 0, 0, 0)
2117}
2118
2119/// Interval of n years (PySpark years). Approximated as 365*n days.
2120pub fn years(n: i64) -> Column {
2121    make_interval(n, 0, 0, 0, 0, 0, 0)
2122}
2123
2124/// Interpret timestamp as UTC, convert to tz (PySpark from_utc_timestamp).
2125pub fn from_utc_timestamp(column: &Column, tz: &str) -> Column {
2126    column.clone().from_utc_timestamp(tz)
2127}
2128
2129/// Interpret timestamp as in tz, convert to UTC (PySpark to_utc_timestamp).
2130pub fn to_utc_timestamp(column: &Column, tz: &str) -> Column {
2131    column.clone().to_utc_timestamp(tz)
2132}
2133
2134/// Convert timestamp between timezones (PySpark convert_timezone).
2135pub fn convert_timezone(source_tz: &str, target_tz: &str, column: &Column) -> Column {
2136    let source_tz = source_tz.to_string();
2137    let target_tz = target_tz.to_string();
2138    let expr = column.expr().clone().map(
2139        move |s| {
2140            crate::column::expect_col(crate::udfs::apply_convert_timezone(
2141                s, &source_tz, &target_tz,
2142            ))
2143        },
2144        |_schema, field| Ok(field.clone()),
2145    );
2146    crate::column::Column::from_expr(expr, None)
2147}
2148
2149/// Current session timezone (PySpark current_timezone). Default "UTC". Returns literal column.
2150pub fn current_timezone() -> Column {
2151    use polars::prelude::*;
2152    crate::column::Column::from_expr(lit("UTC"), None)
2153}
2154
2155/// Create interval duration (PySpark make_interval). Optional args; 0 for omitted.
2156pub fn make_interval(
2157    years: i64,
2158    months: i64,
2159    weeks: i64,
2160    days: i64,
2161    hours: i64,
2162    mins: i64,
2163    secs: i64,
2164) -> Column {
2165    use polars::prelude::*;
2166    // Approximate: 1 year = 365 days, 1 month = 30 days
2167    let total_days = years * 365 + months * 30 + weeks * 7 + days;
2168    let args = DurationArgs::new()
2169        .with_days(lit(total_days))
2170        .with_hours(lit(hours))
2171        .with_minutes(lit(mins))
2172        .with_seconds(lit(secs));
2173    let dur = duration(args);
2174    crate::column::Column::from_expr(dur, None)
2175}
2176
2177/// Day-time interval: days, hours, minutes, seconds (PySpark make_dt_interval). All optional; 0 for omitted.
2178pub fn make_dt_interval(days: i64, hours: i64, minutes: i64, seconds: i64) -> Column {
2179    use polars::prelude::*;
2180    let args = DurationArgs::new()
2181        .with_days(lit(days))
2182        .with_hours(lit(hours))
2183        .with_minutes(lit(minutes))
2184        .with_seconds(lit(seconds));
2185    let dur = duration(args);
2186    crate::column::Column::from_expr(dur, None)
2187}
2188
2189/// Year-month interval (PySpark make_ym_interval). Polars has no native YM type; return months as Int32 (years*12 + months).
2190pub fn make_ym_interval(years: i32, months: i32) -> Column {
2191    use polars::prelude::*;
2192    let total_months = years * 12 + months;
2193    crate::column::Column::from_expr(lit(total_months), None)
2194}
2195
2196/// Alias for make_timestamp (PySpark make_timestamp_ntz - no timezone).
2197pub fn make_timestamp_ntz(
2198    year: &Column,
2199    month: &Column,
2200    day: &Column,
2201    hour: &Column,
2202    minute: &Column,
2203    sec: &Column,
2204) -> Column {
2205    make_timestamp(year, month, day, hour, minute, sec, None)
2206}
2207
2208/// Convert seconds since epoch to timestamp (PySpark timestamp_seconds).
2209pub fn timestamp_seconds(column: &Column) -> Column {
2210    column.clone().timestamp_seconds()
2211}
2212
2213/// Convert milliseconds since epoch to timestamp (PySpark timestamp_millis).
2214pub fn timestamp_millis(column: &Column) -> Column {
2215    column.clone().timestamp_millis()
2216}
2217
2218/// Convert microseconds since epoch to timestamp (PySpark timestamp_micros).
2219pub fn timestamp_micros(column: &Column) -> Column {
2220    column.clone().timestamp_micros()
2221}
2222
2223/// Date to days since 1970-01-01 (PySpark unix_date).
2224pub fn unix_date(column: &Column) -> Column {
2225    column.clone().unix_date()
2226}
2227
2228/// Days since epoch to date (PySpark date_from_unix_date).
2229pub fn date_from_unix_date(column: &Column) -> Column {
2230    column.clone().date_from_unix_date()
2231}
2232
2233/// Positive modulus (PySpark pmod).
2234pub fn pmod(dividend: &Column, divisor: &Column) -> Column {
2235    dividend.clone().pmod(divisor)
2236}
2237
2238/// Factorial n! (PySpark factorial). n in 0..=20; null for negative or overflow.
2239pub fn factorial(column: &Column) -> Column {
2240    column.clone().factorial()
2241}
2242
2243/// Concatenate string columns without separator (PySpark concat)
2244pub fn concat(columns: &[&Column]) -> Column {
2245    use polars::prelude::*;
2246    if columns.is_empty() {
2247        panic!("concat requires at least one column");
2248    }
2249    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2250    crate::column::Column::from_expr(concat_str(&exprs, "", false), None)
2251}
2252
2253/// Concatenate string columns with separator (PySpark concat_ws)
2254pub fn concat_ws(separator: &str, columns: &[&Column]) -> Column {
2255    use polars::prelude::*;
2256    if columns.is_empty() {
2257        panic!("concat_ws requires at least one column");
2258    }
2259    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2260    crate::column::Column::from_expr(concat_str(&exprs, separator, false), None)
2261}
2262
2263/// Row number window function (1, 2, 3 by order within partition).
2264/// Use with `.over(partition_by)` after ranking by an order column.
2265///
2266/// # Example
2267/// ```
2268/// use robin_sparkless::{col, Column};
2269/// let salary_col = col("salary");
2270/// let rn = salary_col.row_number(true).over(&["dept"]);
2271/// ```
2272pub fn row_number(column: &Column) -> Column {
2273    column.clone().row_number(false)
2274}
2275
2276/// Rank window function (ties same rank, gaps). Use with `.over(partition_by)`.
2277pub fn rank(column: &Column, descending: bool) -> Column {
2278    column.clone().rank(descending)
2279}
2280
2281/// Dense rank window function (no gaps). Use with `.over(partition_by)`.
2282pub fn dense_rank(column: &Column, descending: bool) -> Column {
2283    column.clone().dense_rank(descending)
2284}
2285
2286/// Lag: value from n rows before in partition. Use with `.over(partition_by)`.
2287pub fn lag(column: &Column, n: i64) -> Column {
2288    column.clone().lag(n)
2289}
2290
2291/// Lead: value from n rows after in partition. Use with `.over(partition_by)`.
2292pub fn lead(column: &Column, n: i64) -> Column {
2293    column.clone().lead(n)
2294}
2295
2296/// First value in partition (PySpark first_value). Use with `.over(partition_by)`.
2297pub fn first_value(column: &Column) -> Column {
2298    column.clone().first_value()
2299}
2300
2301/// Last value in partition (PySpark last_value). Use with `.over(partition_by)`.
2302pub fn last_value(column: &Column) -> Column {
2303    column.clone().last_value()
2304}
2305
2306/// Percent rank in partition: (rank - 1) / (count - 1). Window is applied.
2307pub fn percent_rank(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2308    column.clone().percent_rank(partition_by, descending)
2309}
2310
2311/// Cumulative distribution in partition: row_number / count. Window is applied.
2312pub fn cume_dist(column: &Column, partition_by: &[&str], descending: bool) -> Column {
2313    column.clone().cume_dist(partition_by, descending)
2314}
2315
2316/// Ntile: bucket 1..n by rank within partition. Window is applied.
2317pub fn ntile(column: &Column, n: u32, partition_by: &[&str], descending: bool) -> Column {
2318    column.clone().ntile(n, partition_by, descending)
2319}
2320
2321/// Nth value in partition by order (1-based n). Window is applied; do not call .over() again.
2322pub fn nth_value(column: &Column, n: i64, partition_by: &[&str], descending: bool) -> Column {
2323    column.clone().nth_value(n, partition_by, descending)
2324}
2325
2326/// Coalesce - returns the first non-null value from multiple columns.
2327///
2328/// # Example
2329/// ```
2330/// use robin_sparkless::{col, lit_i64, coalesce};
2331///
2332/// // coalesce(col("a"), col("b"), lit(0))
2333/// let expr = coalesce(&[&col("a"), &col("b"), &lit_i64(0)]);
2334/// ```
2335pub fn coalesce(columns: &[&Column]) -> Column {
2336    use polars::prelude::*;
2337    if columns.is_empty() {
2338        panic!("coalesce requires at least one column");
2339    }
2340    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2341    let expr = coalesce(&exprs);
2342    crate::column::Column::from_expr(expr, None)
2343}
2344
2345/// Alias for coalesce(col, value). PySpark nvl / ifnull.
2346pub fn nvl(column: &Column, value: &Column) -> Column {
2347    coalesce(&[column, value])
2348}
2349
2350/// Alias for nvl. PySpark ifnull.
2351pub fn ifnull(column: &Column, value: &Column) -> Column {
2352    nvl(column, value)
2353}
2354
2355/// Return null if column equals value, else column. PySpark nullif.
2356pub fn nullif(column: &Column, value: &Column) -> Column {
2357    use polars::prelude::*;
2358    let cond = column.expr().clone().eq(value.expr().clone());
2359    let null_lit = lit(NULL);
2360    let expr = when(cond).then(null_lit).otherwise(column.expr().clone());
2361    crate::column::Column::from_expr(expr, None)
2362}
2363
2364/// Replace NaN with value. PySpark nanvl.
2365pub fn nanvl(column: &Column, value: &Column) -> Column {
2366    use polars::prelude::*;
2367    let cond = column.expr().clone().is_nan();
2368    let expr = when(cond)
2369        .then(value.expr().clone())
2370        .otherwise(column.expr().clone());
2371    crate::column::Column::from_expr(expr, None)
2372}
2373
2374/// Three-arg null replacement: if col1 is not null then col2 else col3. PySpark nvl2.
2375pub fn nvl2(col1: &Column, col2: &Column, col3: &Column) -> Column {
2376    use polars::prelude::*;
2377    let cond = col1.expr().clone().is_not_null();
2378    let expr = when(cond)
2379        .then(col2.expr().clone())
2380        .otherwise(col3.expr().clone());
2381    crate::column::Column::from_expr(expr, None)
2382}
2383
2384/// Alias for substring. PySpark substr.
2385pub fn substr(column: &Column, start: i64, length: Option<i64>) -> Column {
2386    substring(column, start, length)
2387}
2388
2389/// Alias for pow. PySpark power.
2390pub fn power(column: &Column, exp: i64) -> Column {
2391    pow(column, exp)
2392}
2393
2394/// Alias for log (natural log). PySpark ln.
2395pub fn ln(column: &Column) -> Column {
2396    log(column)
2397}
2398
2399/// Alias for ceil. PySpark ceiling.
2400pub fn ceiling(column: &Column) -> Column {
2401    ceil(column)
2402}
2403
2404/// Alias for lower. PySpark lcase.
2405pub fn lcase(column: &Column) -> Column {
2406    lower(column)
2407}
2408
2409/// Alias for upper. PySpark ucase.
2410pub fn ucase(column: &Column) -> Column {
2411    upper(column)
2412}
2413
2414/// Alias for day. PySpark dayofmonth.
2415pub fn dayofmonth(column: &Column) -> Column {
2416    day(column)
2417}
2418
2419/// Alias for degrees. PySpark toDegrees.
2420pub fn to_degrees(column: &Column) -> Column {
2421    degrees(column)
2422}
2423
2424/// Alias for radians. PySpark toRadians.
2425pub fn to_radians(column: &Column) -> Column {
2426    radians(column)
2427}
2428
2429/// Hyperbolic cosine (PySpark cosh).
2430pub fn cosh(column: &Column) -> Column {
2431    column.clone().cosh()
2432}
2433/// Hyperbolic sine (PySpark sinh).
2434pub fn sinh(column: &Column) -> Column {
2435    column.clone().sinh()
2436}
2437/// Hyperbolic tangent (PySpark tanh).
2438pub fn tanh(column: &Column) -> Column {
2439    column.clone().tanh()
2440}
2441/// Inverse hyperbolic cosine (PySpark acosh).
2442pub fn acosh(column: &Column) -> Column {
2443    column.clone().acosh()
2444}
2445/// Inverse hyperbolic sine (PySpark asinh).
2446pub fn asinh(column: &Column) -> Column {
2447    column.clone().asinh()
2448}
2449/// Inverse hyperbolic tangent (PySpark atanh).
2450pub fn atanh(column: &Column) -> Column {
2451    column.clone().atanh()
2452}
2453/// Cube root (PySpark cbrt).
2454pub fn cbrt(column: &Column) -> Column {
2455    column.clone().cbrt()
2456}
2457/// exp(x) - 1 (PySpark expm1).
2458pub fn expm1(column: &Column) -> Column {
2459    column.clone().expm1()
2460}
2461/// log(1 + x) (PySpark log1p).
2462pub fn log1p(column: &Column) -> Column {
2463    column.clone().log1p()
2464}
2465/// Base-10 log (PySpark log10).
2466pub fn log10(column: &Column) -> Column {
2467    column.clone().log10()
2468}
2469/// Base-2 log (PySpark log2).
2470pub fn log2(column: &Column) -> Column {
2471    column.clone().log2()
2472}
2473/// Round to nearest integer (PySpark rint).
2474pub fn rint(column: &Column) -> Column {
2475    column.clone().rint()
2476}
2477/// sqrt(x*x + y*y) (PySpark hypot).
2478pub fn hypot(x: &Column, y: &Column) -> Column {
2479    let xx = x.expr().clone() * x.expr().clone();
2480    let yy = y.expr().clone() * y.expr().clone();
2481    crate::column::Column::from_expr((xx + yy).sqrt(), None)
2482}
2483
2484/// True if column is null. PySpark isnull.
2485pub fn isnull(column: &Column) -> Column {
2486    column.clone().is_null()
2487}
2488
2489/// True if column is not null. PySpark isnotnull.
2490pub fn isnotnull(column: &Column) -> Column {
2491    column.clone().is_not_null()
2492}
2493
2494/// Create an array column from multiple columns (PySpark array).
2495/// With no arguments, returns a column of empty arrays (one per row); PySpark parity.
2496pub fn array(columns: &[&Column]) -> Result<crate::column::Column, PolarsError> {
2497    use polars::prelude::*;
2498    if columns.is_empty() {
2499        // PySpark F.array() with no args: one empty list per row (broadcast literal).
2500        // Use .first() so the single-row literal is treated as a scalar and broadcasts to frame height.
2501        let empty_inner = Series::new("".into(), Vec::<i64>::new());
2502        let list_series = ListChunked::from_iter([Some(empty_inner)])
2503            .with_name("array".into())
2504            .into_series();
2505        let expr = lit(list_series).first();
2506        return Ok(crate::column::Column::from_expr(expr, None));
2507    }
2508    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2509    let expr = concat_list(exprs)
2510        .map_err(|e| PolarsError::ComputeError(format!("array concat_list: {e}").into()))?;
2511    Ok(crate::column::Column::from_expr(expr, None))
2512}
2513
2514/// Number of elements in list (PySpark size / array_size). Returns Int32.
2515pub fn array_size(column: &Column) -> Column {
2516    column.clone().array_size()
2517}
2518
2519/// Alias for array_size (PySpark size).
2520pub fn size(column: &Column) -> Column {
2521    column.clone().array_size()
2522}
2523
2524/// Cardinality: number of elements in array (PySpark cardinality). Alias for size/array_size.
2525pub fn cardinality(column: &Column) -> Column {
2526    column.clone().cardinality()
2527}
2528
2529/// Check if list contains value (PySpark array_contains).
2530pub fn array_contains(column: &Column, value: &Column) -> Column {
2531    column.clone().array_contains(value.expr().clone())
2532}
2533
2534/// Join list of strings with separator (PySpark array_join).
2535pub fn array_join(column: &Column, separator: &str) -> Column {
2536    column.clone().array_join(separator)
2537}
2538
2539/// Maximum element in list (PySpark array_max).
2540pub fn array_max(column: &Column) -> Column {
2541    column.clone().array_max()
2542}
2543
2544/// Minimum element in list (PySpark array_min).
2545pub fn array_min(column: &Column) -> Column {
2546    column.clone().array_min()
2547}
2548
2549/// Get element at 1-based index (PySpark element_at).
2550pub fn element_at(column: &Column, index: i64) -> Column {
2551    column.clone().element_at(index)
2552}
2553
2554/// Sort list elements (PySpark array_sort).
2555pub fn array_sort(column: &Column) -> Column {
2556    column.clone().array_sort()
2557}
2558
2559/// Distinct elements in list (PySpark array_distinct).
2560pub fn array_distinct(column: &Column) -> Column {
2561    column.clone().array_distinct()
2562}
2563
2564/// Slice list from 1-based start with optional length (PySpark slice).
2565pub fn array_slice(column: &Column, start: i64, length: Option<i64>) -> Column {
2566    column.clone().array_slice(start, length)
2567}
2568
2569/// Generate array of numbers from start to stop (inclusive) with optional step (PySpark sequence).
2570/// step defaults to 1.
2571pub fn sequence(start: &Column, stop: &Column, step: Option<&Column>) -> Column {
2572    use polars::prelude::{DataType, Field, as_struct, lit};
2573    let step_expr = step
2574        .map(|c| c.expr().clone().alias("2"))
2575        .unwrap_or_else(|| lit(1i64).alias("2"));
2576    let struct_expr = as_struct(vec![
2577        start.expr().clone().alias("0"),
2578        stop.expr().clone().alias("1"),
2579        step_expr,
2580    ]);
2581    let out_dtype = DataType::List(Box::new(DataType::Int64));
2582    let expr = struct_expr.map(
2583        |s| crate::column::expect_col(crate::udfs::apply_sequence(s)),
2584        move |_schema, field| Ok(Field::new(field.name().clone(), out_dtype.clone())),
2585    );
2586    crate::column::Column::from_expr(expr, None)
2587}
2588
2589/// Random permutation of list elements (PySpark shuffle).
2590pub fn shuffle(column: &Column) -> Column {
2591    let expr = column.expr().clone().map(
2592        |s| crate::column::expect_col(crate::udfs::apply_shuffle(s)),
2593        |_schema, field| Ok(field.clone()),
2594    );
2595    crate::column::Column::from_expr(expr, None)
2596}
2597
2598/// Explode list of structs into rows; struct fields become columns after unnest (PySpark inline).
2599/// Returns the exploded struct column; use unnest to expand struct fields to columns.
2600pub fn inline(column: &Column) -> Column {
2601    column.clone().explode()
2602}
2603
2604/// Like inline but null/empty yields one row of nulls (PySpark inline_outer).
2605pub fn inline_outer(column: &Column) -> Column {
2606    column.clone().explode_outer()
2607}
2608
2609/// Explode list into one row per element (PySpark explode).
2610pub fn explode(column: &Column) -> Column {
2611    column.clone().explode()
2612}
2613
2614/// 1-based index of first occurrence of value in list, or 0 if not found (PySpark array_position).
2615/// Implemented via Polars list.eval with col("") as element.
2616pub fn array_position(column: &Column, value: &Column) -> Column {
2617    column.clone().array_position(value.expr().clone())
2618}
2619
2620/// Remove null elements from list (PySpark array_compact).
2621pub fn array_compact(column: &Column) -> Column {
2622    column.clone().array_compact()
2623}
2624
2625/// New list with all elements equal to value removed (PySpark array_remove).
2626/// Implemented via Polars list.eval + list.drop_nulls.
2627pub fn array_remove(column: &Column, value: &Column) -> Column {
2628    column.clone().array_remove(value.expr().clone())
2629}
2630
2631/// Repeat each element n times (PySpark array_repeat). Not implemented: would require list.eval with dynamic repeat.
2632pub fn array_repeat(column: &Column, n: i64) -> Column {
2633    column.clone().array_repeat(n)
2634}
2635
2636/// Flatten list of lists to one list (PySpark flatten). Not implemented.
2637pub fn array_flatten(column: &Column) -> Column {
2638    column.clone().array_flatten()
2639}
2640
2641/// True if any list element satisfies the predicate (PySpark exists).
2642pub fn array_exists(column: &Column, predicate: Expr) -> Column {
2643    column.clone().array_exists(predicate)
2644}
2645
2646/// True if all list elements satisfy the predicate (PySpark forall).
2647pub fn array_forall(column: &Column, predicate: Expr) -> Column {
2648    column.clone().array_forall(predicate)
2649}
2650
2651/// Filter list elements by predicate (PySpark filter).
2652pub fn array_filter(column: &Column, predicate: Expr) -> Column {
2653    column.clone().array_filter(predicate)
2654}
2655
2656/// Transform list elements by expression (PySpark transform).
2657pub fn array_transform(column: &Column, f: Expr) -> Column {
2658    column.clone().array_transform(f)
2659}
2660
2661/// Sum of list elements (PySpark aggregate sum).
2662pub fn array_sum(column: &Column) -> Column {
2663    column.clone().array_sum()
2664}
2665
2666/// Array fold/aggregate (PySpark aggregate). Simplified: zero + sum(list elements).
2667pub fn aggregate(column: &Column, zero: &Column) -> Column {
2668    column.clone().array_aggregate(zero)
2669}
2670
2671/// Mean of list elements (PySpark aggregate avg).
2672pub fn array_mean(column: &Column) -> Column {
2673    column.clone().array_mean()
2674}
2675
2676/// Explode list with position (PySpark posexplode). Returns (pos_column, value_column).
2677/// pos is 1-based; implemented via list.eval(cum_count()).explode() and explode().
2678pub fn posexplode(column: &Column) -> (Column, Column) {
2679    column.clone().posexplode()
2680}
2681
2682/// Build a map column from alternating key/value expressions (PySpark create_map).
2683/// Returns List(Struct{key, value}) using Polars as_struct and concat_list.
2684/// With no args (or empty slice), returns a column of empty maps per row (PySpark parity #275).
2685pub fn create_map(key_values: &[&Column]) -> Result<Column, PolarsError> {
2686    use polars::chunked_array::StructChunked;
2687    use polars::prelude::{IntoSeries, ListChunked, as_struct, concat_list, lit};
2688    if key_values.is_empty() {
2689        // PySpark F.create_map() with no args: one empty map {} per row (broadcast literal).
2690        let key_s = Series::new("key".into(), Vec::<String>::new());
2691        let value_s = Series::new("value".into(), Vec::<String>::new());
2692        let fields: [&Series; 2] = [&key_s, &value_s];
2693        let empty_struct = StructChunked::from_series(
2694            polars::prelude::PlSmallStr::EMPTY,
2695            0,
2696            fields.iter().copied(),
2697        )
2698        .map_err(|e| PolarsError::ComputeError(format!("create_map empty struct: {e}").into()))?
2699        .into_series();
2700        let list_series = ListChunked::from_iter([Some(empty_struct)])
2701            .with_name("create_map".into())
2702            .into_series();
2703        let expr = lit(list_series).first();
2704        return Ok(crate::column::Column::from_expr(expr, None));
2705    }
2706    let mut struct_exprs: Vec<Expr> = Vec::new();
2707    for i in (0..key_values.len()).step_by(2) {
2708        if i + 1 < key_values.len() {
2709            let k = key_values[i].expr().clone().alias("key");
2710            let v = key_values[i + 1].expr().clone().alias("value");
2711            struct_exprs.push(as_struct(vec![k, v]));
2712        }
2713    }
2714    let expr = concat_list(struct_exprs)
2715        .map_err(|e| PolarsError::ComputeError(format!("create_map concat_list: {e}").into()))?;
2716    Ok(crate::column::Column::from_expr(expr, None))
2717}
2718
2719/// Extract keys from a map column (PySpark map_keys). Map is List(Struct{key, value}).
2720pub fn map_keys(column: &Column) -> Column {
2721    column.clone().map_keys()
2722}
2723
2724/// Extract values from a map column (PySpark map_values).
2725pub fn map_values(column: &Column) -> Column {
2726    column.clone().map_values()
2727}
2728
2729/// Return map as list of structs {key, value} (PySpark map_entries).
2730pub fn map_entries(column: &Column) -> Column {
2731    column.clone().map_entries()
2732}
2733
2734/// Build map from two array columns keys and values (PySpark map_from_arrays). Implemented via UDF.
2735pub fn map_from_arrays(keys: &Column, values: &Column) -> Column {
2736    keys.clone().map_from_arrays(values)
2737}
2738
2739/// Merge two map columns (PySpark map_concat). Last value wins for duplicate keys.
2740pub fn map_concat(a: &Column, b: &Column) -> Column {
2741    a.clone().map_concat(b)
2742}
2743
2744/// Array of structs {key, value} to map (PySpark map_from_entries).
2745pub fn map_from_entries(column: &Column) -> Column {
2746    column.clone().map_from_entries()
2747}
2748
2749/// True if map contains key (PySpark map_contains_key).
2750pub fn map_contains_key(map_col: &Column, key: &Column) -> Column {
2751    map_col.clone().map_contains_key(key)
2752}
2753
2754/// Get value for key from map, or null (PySpark get).
2755pub fn get(map_col: &Column, key: &Column) -> Column {
2756    map_col.clone().get(key)
2757}
2758
2759/// Filter map entries by predicate (PySpark map_filter).
2760pub fn map_filter(map_col: &Column, predicate: Expr) -> Column {
2761    map_col.clone().map_filter(predicate)
2762}
2763
2764/// Merge two maps by key with merge function (PySpark map_zip_with).
2765pub fn map_zip_with(map1: &Column, map2: &Column, merge: Expr) -> Column {
2766    map1.clone().map_zip_with(map2, merge)
2767}
2768
2769/// Convenience: zip_with with coalesce(left, right) merge.
2770pub fn zip_with_coalesce(left: &Column, right: &Column) -> Column {
2771    use polars::prelude::col;
2772    let left_field = col("").struct_().field_by_name("left");
2773    let right_field = col("").struct_().field_by_name("right");
2774    let merge = crate::column::Column::from_expr(
2775        coalesce(&[
2776            &crate::column::Column::from_expr(left_field, None),
2777            &crate::column::Column::from_expr(right_field, None),
2778        ])
2779        .into_expr(),
2780        None,
2781    );
2782    left.clone().zip_with(right, merge.into_expr())
2783}
2784
2785/// Convenience: map_zip_with with coalesce(value1, value2) merge.
2786pub fn map_zip_with_coalesce(map1: &Column, map2: &Column) -> Column {
2787    use polars::prelude::col;
2788    let v1 = col("").struct_().field_by_name("value1");
2789    let v2 = col("").struct_().field_by_name("value2");
2790    let merge = coalesce(&[
2791        &crate::column::Column::from_expr(v1, None),
2792        &crate::column::Column::from_expr(v2, None),
2793    ])
2794    .into_expr();
2795    map1.clone().map_zip_with(map2, merge)
2796}
2797
2798/// Convenience: map_filter with value > threshold predicate.
2799pub fn map_filter_value_gt(map_col: &Column, threshold: f64) -> Column {
2800    use polars::prelude::{col, lit};
2801    let pred = col("").struct_().field_by_name("value").gt(lit(threshold));
2802    map_col.clone().map_filter(pred)
2803}
2804
2805/// Create struct from columns using column names as field names (PySpark struct).
2806pub fn struct_(columns: &[&Column]) -> Column {
2807    use polars::prelude::as_struct;
2808    if columns.is_empty() {
2809        panic!("struct requires at least one column");
2810    }
2811    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2812    crate::column::Column::from_expr(as_struct(exprs), None)
2813}
2814
2815/// Create struct with explicit field names (PySpark named_struct). Pairs of (name, column).
2816pub fn named_struct(pairs: &[(&str, &Column)]) -> Column {
2817    use polars::prelude::as_struct;
2818    if pairs.is_empty() {
2819        panic!("named_struct requires at least one (name, column) pair");
2820    }
2821    let exprs: Vec<Expr> = pairs
2822        .iter()
2823        .map(|(name, col)| col.expr().clone().alias(*name))
2824        .collect();
2825    crate::column::Column::from_expr(as_struct(exprs), None)
2826}
2827
2828/// Append element to end of list (PySpark array_append).
2829pub fn array_append(array: &Column, elem: &Column) -> Column {
2830    array.clone().array_append(elem)
2831}
2832
2833/// Prepend element to start of list (PySpark array_prepend).
2834pub fn array_prepend(array: &Column, elem: &Column) -> Column {
2835    array.clone().array_prepend(elem)
2836}
2837
2838/// Insert element at 1-based position (PySpark array_insert).
2839pub fn array_insert(array: &Column, pos: &Column, elem: &Column) -> Column {
2840    array.clone().array_insert(pos, elem)
2841}
2842
2843/// Elements in first array not in second (PySpark array_except).
2844pub fn array_except(a: &Column, b: &Column) -> Column {
2845    a.clone().array_except(b)
2846}
2847
2848/// Elements in both arrays (PySpark array_intersect).
2849pub fn array_intersect(a: &Column, b: &Column) -> Column {
2850    a.clone().array_intersect(b)
2851}
2852
2853/// Distinct elements from both arrays (PySpark array_union).
2854pub fn array_union(a: &Column, b: &Column) -> Column {
2855    a.clone().array_union(b)
2856}
2857
2858/// Zip two arrays element-wise with merge function (PySpark zip_with).
2859pub fn zip_with(left: &Column, right: &Column, merge: Expr) -> Column {
2860    left.clone().zip_with(right, merge)
2861}
2862
2863/// Extract JSON path from string column (PySpark get_json_object).
2864pub fn get_json_object(column: &Column, path: &str) -> Column {
2865    column.clone().get_json_object(path)
2866}
2867
2868/// Keys of JSON object (PySpark json_object_keys). Returns list of strings.
2869pub fn json_object_keys(column: &Column) -> Column {
2870    column.clone().json_object_keys()
2871}
2872
2873/// Extract keys from JSON as struct (PySpark json_tuple). keys: e.g. ["a", "b"].
2874pub fn json_tuple(column: &Column, keys: &[&str]) -> Column {
2875    column.clone().json_tuple(keys)
2876}
2877
2878/// Parse CSV string to struct (PySpark from_csv). Minimal implementation.
2879pub fn from_csv(column: &Column) -> Column {
2880    column.clone().from_csv()
2881}
2882
2883/// Format struct as CSV string (PySpark to_csv). Minimal implementation.
2884pub fn to_csv(column: &Column) -> Column {
2885    column.clone().to_csv()
2886}
2887
2888/// Schema of CSV string (PySpark schema_of_csv). Returns literal schema string; minimal stub.
2889pub fn schema_of_csv(_column: &Column) -> Column {
2890    Column::from_expr(
2891        lit("STRUCT<_c0: STRING, _c1: STRING>".to_string()),
2892        Some("schema_of_csv".to_string()),
2893    )
2894}
2895
2896/// Schema of JSON string (PySpark schema_of_json). Returns literal schema string; minimal stub.
2897pub fn schema_of_json(_column: &Column) -> Column {
2898    Column::from_expr(
2899        lit("STRUCT<>".to_string()),
2900        Some("schema_of_json".to_string()),
2901    )
2902}
2903
2904/// Parse string column as JSON into struct (PySpark from_json).
2905pub fn from_json(column: &Column, schema: Option<polars::datatypes::DataType>) -> Column {
2906    column.clone().from_json(schema)
2907}
2908
2909/// Serialize struct column to JSON string (PySpark to_json).
2910pub fn to_json(column: &Column) -> Column {
2911    column.clone().to_json()
2912}
2913
2914/// Check if column values are in the given list (PySpark isin). Uses Polars is_in.
2915pub fn isin(column: &Column, other: &Column) -> Column {
2916    column.clone().isin(other)
2917}
2918
2919/// Check if column values are in the given i64 slice (PySpark isin with literal list).
2920pub fn isin_i64(column: &Column, values: &[i64]) -> Column {
2921    let s = Series::from_iter(values.iter().cloned());
2922    Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2923}
2924
2925/// Check if column values are in the given string slice (PySpark isin with literal list).
2926pub fn isin_str(column: &Column, values: &[&str]) -> Column {
2927    let s: Series = Series::from_iter(values.iter().copied());
2928    Column::from_expr(column.expr().clone().is_in(lit(s), false), None)
2929}
2930
2931/// Percent-decode URL-encoded string (PySpark url_decode).
2932pub fn url_decode(column: &Column) -> Column {
2933    column.clone().url_decode()
2934}
2935
2936/// Percent-encode string for URL (PySpark url_encode).
2937pub fn url_encode(column: &Column) -> Column {
2938    column.clone().url_encode()
2939}
2940
2941/// Bitwise left shift (PySpark shiftLeft). col << n.
2942pub fn shift_left(column: &Column, n: i32) -> Column {
2943    column.clone().shift_left(n)
2944}
2945
2946/// Bitwise signed right shift (PySpark shiftRight). col >> n.
2947pub fn shift_right(column: &Column, n: i32) -> Column {
2948    column.clone().shift_right(n)
2949}
2950
2951/// Bitwise unsigned right shift (PySpark shiftRightUnsigned). Logical shift for Long.
2952pub fn shift_right_unsigned(column: &Column, n: i32) -> Column {
2953    column.clone().shift_right_unsigned(n)
2954}
2955
2956/// Session/library version string (PySpark version).
2957pub fn version() -> Column {
2958    Column::from_expr(
2959        lit(concat!("robin-sparkless-", env!("CARGO_PKG_VERSION"))),
2960        None,
2961    )
2962}
2963
2964/// Null-safe equality: true if both null or both equal (PySpark equal_null). Alias for eq_null_safe.
2965pub fn equal_null(left: &Column, right: &Column) -> Column {
2966    left.clone().eq_null_safe(right)
2967}
2968
2969/// Length of JSON array at path (PySpark json_array_length).
2970pub fn json_array_length(column: &Column, path: &str) -> Column {
2971    column.clone().json_array_length(path)
2972}
2973
2974/// Parse URL and extract part: PROTOCOL, HOST, PATH, etc. (PySpark parse_url).
2975/// When key is Some(k) and part is QUERY/QUERYSTRING, returns the value for that query parameter only.
2976pub fn parse_url(column: &Column, part: &str, key: Option<&str>) -> Column {
2977    column.clone().parse_url(part, key)
2978}
2979
2980/// Hash of column values (PySpark hash). Uses Murmur3 32-bit for parity with PySpark.
2981pub fn hash(columns: &[&Column]) -> Column {
2982    use polars::prelude::*;
2983    if columns.is_empty() {
2984        return crate::column::Column::from_expr(lit(0i64), None);
2985    }
2986    if columns.len() == 1 {
2987        return columns[0].clone().hash();
2988    }
2989    let exprs: Vec<Expr> = columns.iter().map(|c| c.expr().clone()).collect();
2990    let struct_expr = polars::prelude::as_struct(exprs);
2991    let name = columns[0].name().to_string();
2992    let expr = struct_expr.map(
2993        |s| crate::column::expect_col(crate::udfs::apply_hash_struct(s)),
2994        |_schema, field| Ok(Field::new(field.name().clone(), DataType::Int64)),
2995    );
2996    crate::column::Column::from_expr(expr, Some(name))
2997}
2998
2999/// Stack columns into struct (PySpark stack). Alias for struct_.
3000pub fn stack(columns: &[&Column]) -> Column {
3001    struct_(columns)
3002}
3003
3004#[cfg(test)]
3005mod tests {
3006    use super::*;
3007    use polars::prelude::{IntoLazy, df};
3008
3009    #[test]
3010    fn test_col_creates_column() {
3011        let column = col("test");
3012        assert_eq!(column.name(), "test");
3013    }
3014
3015    #[test]
3016    fn test_lit_i32() {
3017        let column = lit_i32(42);
3018        // The column should have a default name since it's a literal
3019        assert_eq!(column.name(), "<expr>");
3020    }
3021
3022    #[test]
3023    fn test_lit_i64() {
3024        let column = lit_i64(123456789012345i64);
3025        assert_eq!(column.name(), "<expr>");
3026    }
3027
3028    #[test]
3029    fn test_lit_f64() {
3030        let column = lit_f64(std::f64::consts::PI);
3031        assert_eq!(column.name(), "<expr>");
3032    }
3033
3034    #[test]
3035    fn test_lit_bool() {
3036        let column = lit_bool(true);
3037        assert_eq!(column.name(), "<expr>");
3038    }
3039
3040    #[test]
3041    fn test_lit_str() {
3042        let column = lit_str("hello");
3043        assert_eq!(column.name(), "<expr>");
3044    }
3045
3046    #[test]
3047    fn test_create_map_empty() {
3048        // PySpark F.create_map() with no args: column of empty maps (#275).
3049        let empty_col = create_map(&[]).unwrap();
3050        let df = df!("id" => &[1i64, 2i64]).unwrap();
3051        let out = df
3052            .lazy()
3053            .with_columns([empty_col.into_expr().alias("m")])
3054            .collect()
3055            .unwrap();
3056        assert_eq!(out.height(), 2);
3057        let m = out.column("m").unwrap();
3058        assert_eq!(m.len(), 2);
3059        let list = m.list().unwrap();
3060        for i in 0..2 {
3061            let row = list.get(i).unwrap();
3062            assert_eq!(row.len(), 0);
3063        }
3064    }
3065
3066    #[test]
3067    fn test_count_aggregation() {
3068        let column = col("value");
3069        let result = count(&column);
3070        assert_eq!(result.name(), "count");
3071    }
3072
3073    #[test]
3074    fn test_sum_aggregation() {
3075        let column = col("value");
3076        let result = sum(&column);
3077        assert_eq!(result.name(), "sum");
3078    }
3079
3080    #[test]
3081    fn test_avg_aggregation() {
3082        let column = col("value");
3083        let result = avg(&column);
3084        assert_eq!(result.name(), "avg");
3085    }
3086
3087    #[test]
3088    fn test_max_aggregation() {
3089        let column = col("value");
3090        let result = max(&column);
3091        assert_eq!(result.name(), "max");
3092    }
3093
3094    #[test]
3095    fn test_min_aggregation() {
3096        let column = col("value");
3097        let result = min(&column);
3098        assert_eq!(result.name(), "min");
3099    }
3100
3101    #[test]
3102    fn test_when_then_otherwise() {
3103        // Create a simple DataFrame
3104        let df = df!(
3105            "age" => &[15, 25, 35]
3106        )
3107        .unwrap();
3108
3109        // Build a when-then-otherwise expression
3110        let age_col = col("age");
3111        let condition = age_col.gt(polars::prelude::lit(18));
3112        let result = when(&condition)
3113            .then(&lit_str("adult"))
3114            .otherwise(&lit_str("minor"));
3115
3116        // Apply the expression
3117        let result_df = df
3118            .lazy()
3119            .with_column(result.into_expr().alias("status"))
3120            .collect()
3121            .unwrap();
3122
3123        // Verify the result
3124        let status_col = result_df.column("status").unwrap();
3125        let values: Vec<Option<&str>> = status_col.str().unwrap().into_iter().collect();
3126
3127        assert_eq!(values[0], Some("minor")); // age 15 < 18
3128        assert_eq!(values[1], Some("adult")); // age 25 > 18
3129        assert_eq!(values[2], Some("adult")); // age 35 > 18
3130    }
3131
3132    #[test]
3133    fn test_coalesce_returns_first_non_null() {
3134        // Create a DataFrame with some nulls
3135        let df = df!(
3136            "a" => &[Some(1), None, None],
3137            "b" => &[None, Some(2), None],
3138            "c" => &[None, None, Some(3)]
3139        )
3140        .unwrap();
3141
3142        let col_a = col("a");
3143        let col_b = col("b");
3144        let col_c = col("c");
3145        let result = coalesce(&[&col_a, &col_b, &col_c]);
3146
3147        // Apply the expression
3148        let result_df = df
3149            .lazy()
3150            .with_column(result.into_expr().alias("coalesced"))
3151            .collect()
3152            .unwrap();
3153
3154        // Verify the result
3155        let coalesced_col = result_df.column("coalesced").unwrap();
3156        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3157
3158        assert_eq!(values[0], Some(1)); // First non-null is 'a'
3159        assert_eq!(values[1], Some(2)); // First non-null is 'b'
3160        assert_eq!(values[2], Some(3)); // First non-null is 'c'
3161    }
3162
3163    #[test]
3164    fn test_coalesce_with_literal_fallback() {
3165        // Create a DataFrame with all nulls in one row
3166        let df = df!(
3167            "a" => &[Some(1), None],
3168            "b" => &[None::<i32>, None::<i32>]
3169        )
3170        .unwrap();
3171
3172        let col_a = col("a");
3173        let col_b = col("b");
3174        let fallback = lit_i32(0);
3175        let result = coalesce(&[&col_a, &col_b, &fallback]);
3176
3177        // Apply the expression
3178        let result_df = df
3179            .lazy()
3180            .with_column(result.into_expr().alias("coalesced"))
3181            .collect()
3182            .unwrap();
3183
3184        // Verify the result
3185        let coalesced_col = result_df.column("coalesced").unwrap();
3186        let values: Vec<Option<i32>> = coalesced_col.i32().unwrap().into_iter().collect();
3187
3188        assert_eq!(values[0], Some(1)); // First non-null is 'a'
3189        assert_eq!(values[1], Some(0)); // All nulls, use fallback
3190    }
3191
3192    #[test]
3193    #[should_panic(expected = "coalesce requires at least one column")]
3194    fn test_coalesce_empty_panics() {
3195        let columns: [&Column; 0] = [];
3196        let _ = coalesce(&columns);
3197    }
3198
3199    #[test]
3200    fn test_cast_double_string_column_strict_ok() {
3201        // All values parse as doubles, so strict cast should succeed.
3202        let df = df!(
3203            "s" => &["123", " 45.5 ", "0"]
3204        )
3205        .unwrap();
3206
3207        let s_col = col("s");
3208        let cast_col = cast(&s_col, "double").unwrap();
3209
3210        let out = df
3211            .lazy()
3212            .with_column(cast_col.into_expr().alias("v"))
3213            .collect()
3214            .unwrap();
3215
3216        let v = out.column("v").unwrap();
3217        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3218        assert_eq!(vals, vec![Some(123.0), Some(45.5), Some(0.0)]);
3219    }
3220
3221    #[test]
3222    fn test_try_cast_double_string_column_invalid_to_null() {
3223        // Invalid numeric strings should become null under try_cast / try_to_number.
3224        let df = df!(
3225            "s" => &["123", " 45.5 ", "abc", ""]
3226        )
3227        .unwrap();
3228
3229        let s_col = col("s");
3230        let try_cast_col = try_cast(&s_col, "double").unwrap();
3231
3232        let out = df
3233            .lazy()
3234            .with_column(try_cast_col.into_expr().alias("v"))
3235            .collect()
3236            .unwrap();
3237
3238        let v = out.column("v").unwrap();
3239        let vals: Vec<Option<f64>> = v.f64().unwrap().into_iter().collect();
3240        assert_eq!(vals, vec![Some(123.0), Some(45.5), None, None]);
3241    }
3242
3243    #[test]
3244    fn test_to_number_and_try_to_number_numerics_and_strings() {
3245        // Mixed numeric types should be cast to double; invalid strings become null only for try_to_number.
3246        let df = df!(
3247            "i" => &[1i32, 2, 3],
3248            "f" => &[1.5f64, 2.5, 3.5],
3249            "s" => &["10", "20.5", "xyz"]
3250        )
3251        .unwrap();
3252
3253        let i_col = col("i");
3254        let f_col = col("f");
3255        let s_col = col("s");
3256
3257        let to_number_i = to_number(&i_col, None).unwrap();
3258        let to_number_f = to_number(&f_col, None).unwrap();
3259        let try_to_number_s = try_to_number(&s_col, None).unwrap();
3260
3261        let out = df
3262            .lazy()
3263            .with_columns([
3264                to_number_i.into_expr().alias("i_num"),
3265                to_number_f.into_expr().alias("f_num"),
3266                try_to_number_s.into_expr().alias("s_num"),
3267            ])
3268            .collect()
3269            .unwrap();
3270
3271        let i_num = out.column("i_num").unwrap();
3272        let f_num = out.column("f_num").unwrap();
3273        let s_num = out.column("s_num").unwrap();
3274
3275        let i_vals: Vec<Option<f64>> = i_num.f64().unwrap().into_iter().collect();
3276        let f_vals: Vec<Option<f64>> = f_num.f64().unwrap().into_iter().collect();
3277        let s_vals: Vec<Option<f64>> = s_num.f64().unwrap().into_iter().collect();
3278
3279        assert_eq!(i_vals, vec![Some(1.0), Some(2.0), Some(3.0)]);
3280        assert_eq!(f_vals, vec![Some(1.5), Some(2.5), Some(3.5)]);
3281        assert_eq!(s_vals, vec![Some(10.0), Some(20.5), None]);
3282    }
3283}